123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- package nats_test
- import (
- "context"
- "fmt"
- "time"
- utils "github.com/bitnami/charts/.vib/common-tests/ginkgo-utils"
- . "github.com/onsi/ginkgo/v2"
- . "github.com/onsi/gomega"
- appsv1 "k8s.io/api/apps/v1"
- batchv1 "k8s.io/api/batch/v1"
- v1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/client-go/kubernetes"
- )
- const (
- PollingInterval = 1 * time.Second
- )
- var _ = Describe("NATS", Ordered, func() {
- var c *kubernetes.Clientset
- var ctx context.Context
- var cancel context.CancelFunc
- BeforeEach(func() {
- ctx, cancel = context.WithCancel(context.Background())
- conf := utils.MustBuildClusterConfig(kubeconfig)
- c = kubernetes.NewForConfigOrDie(conf)
- })
- When("a bucket is created and is scaled down to 0 replicas and back up", func() {
- It("should have access to NATS", func() {
- getAvailableReplicas := func(ss *appsv1.StatefulSet) int32 { return ss.Status.AvailableReplicas }
- getRestartedAtAnnotation := func(pod *v1.Pod) string { return pod.Annotations["kubectl.kubernetes.io/restartedAt"] }
- getSucceededJobs := func(j *batchv1.Job) int32 { return j.Status.Succeeded }
- getOpts := metav1.GetOptions{}
- By("checking all the replicas are available")
- ss, err := c.AppsV1().StatefulSets(namespace).Get(ctx, releaseName, getOpts)
- Expect(err).NotTo(HaveOccurred())
- Expect(ss.Status.Replicas).NotTo(BeZero())
- origReplicas := *ss.Spec.Replicas
- Eventually(func() (*appsv1.StatefulSet, error) {
- return c.AppsV1().StatefulSets(namespace).Get(ctx, releaseName, getOpts)
- }, timeout, PollingInterval).Should(WithTransform(getAvailableReplicas, Equal(origReplicas)))
- svc, err := c.CoreV1().Services(namespace).Get(ctx, releaseName, getOpts)
- Expect(err).NotTo(HaveOccurred())
- port, err := utils.SvcGetPortByName(svc, "tcp-client")
- Expect(err).NotTo(HaveOccurred())
- // Use current time for allowing the test suite to repeat
- jobSuffix := time.Now().Format("20060102150405")
- By("creating a job to create a new KV Store Bucket")
- addKVBucketJobName := fmt.Sprintf("%s-add-kv-bucket-%s",
- releaseName, jobSuffix)
- storeBucketName := fmt.Sprintf("test%s", jobSuffix)
- err = createJob(ctx, c, addKVBucketJobName, port, "add", storeBucketName)
- Expect(err).NotTo(HaveOccurred())
- Eventually(func() (*batchv1.Job, error) {
- return c.BatchV1().Jobs(namespace).Get(ctx, addKVBucketJobName, getOpts)
- }, timeout, PollingInterval).Should(WithTransform(getSucceededJobs, Equal(int32(1))))
- By("deleting the job once it has succeeded")
- err = c.BatchV1().Jobs(namespace).Delete(ctx, addKVBucketJobName, metav1.DeleteOptions{})
- Expect(err).NotTo(HaveOccurred())
- By("creating a job to put some key-value pair")
- putKVJobName := fmt.Sprintf("%s-put-kv-%s",
- releaseName, jobSuffix)
- err = createJob(ctx, c, putKVJobName, port, "put", storeBucketName, "testKey", "testValue")
- Expect(err).NotTo(HaveOccurred())
- Eventually(func() (*batchv1.Job, error) {
- return c.BatchV1().Jobs(namespace).Get(ctx, putKVJobName, getOpts)
- }, timeout, PollingInterval).Should(WithTransform(getSucceededJobs, Equal(int32(1))))
- By("deleting the job once it has succeeded")
- err = c.BatchV1().Jobs(namespace).Delete(ctx, putKVJobName, metav1.DeleteOptions{})
- Expect(err).NotTo(HaveOccurred())
- // Give the application some time to sync the data
- time.Sleep(10 * time.Second)
- By("rollout restart the statefulset")
- _, err = utils.StsRolloutRestart(ctx, c, ss)
- Expect(err).NotTo(HaveOccurred())
- for i := int(origReplicas) - 1; i >= 0; i-- {
- Eventually(func() (*v1.Pod, error) {
- return c.CoreV1().Pods(namespace).Get(ctx, fmt.Sprintf("%s-%d", releaseName, i), getOpts)
- }, timeout, PollingInterval).Should(WithTransform(getRestartedAtAnnotation, Not(BeEmpty())))
- }
- Eventually(func() (*appsv1.StatefulSet, error) {
- return c.AppsV1().StatefulSets(namespace).Get(ctx, releaseName, getOpts)
- }, timeout, PollingInterval).Should(WithTransform(getAvailableReplicas, Equal(origReplicas)))
- By("creating a job to get a value for a key")
- getKVJobName := fmt.Sprintf("%s-get-key-%s",
- releaseName, jobSuffix)
- err = createJob(ctx, c, getKVJobName, port, "get", storeBucketName, "testKey")
- Expect(err).NotTo(HaveOccurred())
- Eventually(func() (*batchv1.Job, error) {
- return c.BatchV1().Jobs(namespace).Get(ctx, getKVJobName, getOpts)
- }, timeout, PollingInterval).Should(WithTransform(getSucceededJobs, Equal(int32(1))))
- By("deleting the job once it has succeeded")
- err = c.BatchV1().Jobs(namespace).Delete(ctx, getKVJobName, metav1.DeleteOptions{})
- Expect(err).NotTo(HaveOccurred())
- By("creating a job to get the delete the KV Store Bucket")
- deleteKVBucketJobName := fmt.Sprintf("%s-del-kv-bucket-%s",
- releaseName, jobSuffix)
- err = createJob(ctx, c, deleteKVBucketJobName, port, "del", storeBucketName, "-f")
- Expect(err).NotTo(HaveOccurred())
- Eventually(func() (*batchv1.Job, error) {
- return c.BatchV1().Jobs(namespace).Get(ctx, deleteKVBucketJobName, getOpts)
- }, timeout, PollingInterval).Should(WithTransform(getSucceededJobs, Equal(int32(1))))
- By("deleting the job once it has succeeded")
- err = c.BatchV1().Jobs(namespace).Delete(ctx, deleteKVBucketJobName, metav1.DeleteOptions{})
- Expect(err).NotTo(HaveOccurred())
- })
- })
- AfterEach(func() {
- cancel()
- })
- })
|