nats_test.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package nats_test
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. utils "github.com/bitnami/charts/.vib/common-tests/ginkgo-utils"
  7. . "github.com/onsi/ginkgo/v2"
  8. . "github.com/onsi/gomega"
  9. appsv1 "k8s.io/api/apps/v1"
  10. batchv1 "k8s.io/api/batch/v1"
  11. v1 "k8s.io/api/core/v1"
  12. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  13. "k8s.io/client-go/kubernetes"
  14. )
  15. const (
  16. PollingInterval = 1 * time.Second
  17. )
  18. var _ = Describe("NATS", Ordered, func() {
  19. var c *kubernetes.Clientset
  20. var ctx context.Context
  21. var cancel context.CancelFunc
  22. BeforeEach(func() {
  23. ctx, cancel = context.WithCancel(context.Background())
  24. conf := utils.MustBuildClusterConfig(kubeconfig)
  25. c = kubernetes.NewForConfigOrDie(conf)
  26. })
  27. When("a bucket is created and is scaled down to 0 replicas and back up", func() {
  28. It("should have access to NATS", func() {
  29. getAvailableReplicas := func(ss *appsv1.StatefulSet) int32 { return ss.Status.AvailableReplicas }
  30. getRestartedAtAnnotation := func(pod *v1.Pod) string { return pod.Annotations["kubectl.kubernetes.io/restartedAt"] }
  31. getSucceededJobs := func(j *batchv1.Job) int32 { return j.Status.Succeeded }
  32. getOpts := metav1.GetOptions{}
  33. By("checking all the replicas are available")
  34. ss, err := c.AppsV1().StatefulSets(namespace).Get(ctx, releaseName, getOpts)
  35. Expect(err).NotTo(HaveOccurred())
  36. Expect(ss.Status.Replicas).NotTo(BeZero())
  37. origReplicas := *ss.Spec.Replicas
  38. Eventually(func() (*appsv1.StatefulSet, error) {
  39. return c.AppsV1().StatefulSets(namespace).Get(ctx, releaseName, getOpts)
  40. }, timeout, PollingInterval).Should(WithTransform(getAvailableReplicas, Equal(origReplicas)))
  41. svc, err := c.CoreV1().Services(namespace).Get(ctx, releaseName, getOpts)
  42. Expect(err).NotTo(HaveOccurred())
  43. port, err := utils.SvcGetPortByName(svc, "tcp-client")
  44. Expect(err).NotTo(HaveOccurred())
  45. // Use current time for allowing the test suite to repeat
  46. jobSuffix := time.Now().Format("20060102150405")
  47. By("creating a job to create a new KV Store Bucket")
  48. addKVBucketJobName := fmt.Sprintf("%s-add-kv-bucket-%s",
  49. releaseName, jobSuffix)
  50. storeBucketName := fmt.Sprintf("test%s", jobSuffix)
  51. err = createJob(ctx, c, addKVBucketJobName, port, "add", storeBucketName)
  52. Expect(err).NotTo(HaveOccurred())
  53. Eventually(func() (*batchv1.Job, error) {
  54. return c.BatchV1().Jobs(namespace).Get(ctx, addKVBucketJobName, getOpts)
  55. }, timeout, PollingInterval).Should(WithTransform(getSucceededJobs, Equal(int32(1))))
  56. By("deleting the job once it has succeeded")
  57. err = c.BatchV1().Jobs(namespace).Delete(ctx, addKVBucketJobName, metav1.DeleteOptions{})
  58. Expect(err).NotTo(HaveOccurred())
  59. By("creating a job to put some key-value pair")
  60. putKVJobName := fmt.Sprintf("%s-put-kv-%s",
  61. releaseName, jobSuffix)
  62. err = createJob(ctx, c, putKVJobName, port, "put", storeBucketName, "testKey", "testValue")
  63. Expect(err).NotTo(HaveOccurred())
  64. Eventually(func() (*batchv1.Job, error) {
  65. return c.BatchV1().Jobs(namespace).Get(ctx, putKVJobName, getOpts)
  66. }, timeout, PollingInterval).Should(WithTransform(getSucceededJobs, Equal(int32(1))))
  67. By("deleting the job once it has succeeded")
  68. err = c.BatchV1().Jobs(namespace).Delete(ctx, putKVJobName, metav1.DeleteOptions{})
  69. Expect(err).NotTo(HaveOccurred())
  70. // Give the application some time to sync the data
  71. time.Sleep(10 * time.Second)
  72. By("rollout restart the statefulset")
  73. _, err = utils.StsRolloutRestart(ctx, c, ss)
  74. Expect(err).NotTo(HaveOccurred())
  75. for i := int(origReplicas) - 1; i >= 0; i-- {
  76. Eventually(func() (*v1.Pod, error) {
  77. return c.CoreV1().Pods(namespace).Get(ctx, fmt.Sprintf("%s-%d", releaseName, i), getOpts)
  78. }, timeout, PollingInterval).Should(WithTransform(getRestartedAtAnnotation, Not(BeEmpty())))
  79. }
  80. Eventually(func() (*appsv1.StatefulSet, error) {
  81. return c.AppsV1().StatefulSets(namespace).Get(ctx, releaseName, getOpts)
  82. }, timeout, PollingInterval).Should(WithTransform(getAvailableReplicas, Equal(origReplicas)))
  83. By("creating a job to get a value for a key")
  84. getKVJobName := fmt.Sprintf("%s-get-key-%s",
  85. releaseName, jobSuffix)
  86. err = createJob(ctx, c, getKVJobName, port, "get", storeBucketName, "testKey")
  87. Expect(err).NotTo(HaveOccurred())
  88. Eventually(func() (*batchv1.Job, error) {
  89. return c.BatchV1().Jobs(namespace).Get(ctx, getKVJobName, getOpts)
  90. }, timeout, PollingInterval).Should(WithTransform(getSucceededJobs, Equal(int32(1))))
  91. By("deleting the job once it has succeeded")
  92. err = c.BatchV1().Jobs(namespace).Delete(ctx, getKVJobName, metav1.DeleteOptions{})
  93. Expect(err).NotTo(HaveOccurred())
  94. By("creating a job to get the delete the KV Store Bucket")
  95. deleteKVBucketJobName := fmt.Sprintf("%s-del-kv-bucket-%s",
  96. releaseName, jobSuffix)
  97. err = createJob(ctx, c, deleteKVBucketJobName, port, "del", storeBucketName, "-f")
  98. Expect(err).NotTo(HaveOccurred())
  99. Eventually(func() (*batchv1.Job, error) {
  100. return c.BatchV1().Jobs(namespace).Get(ctx, deleteKVBucketJobName, getOpts)
  101. }, timeout, PollingInterval).Should(WithTransform(getSucceededJobs, Equal(int32(1))))
  102. By("deleting the job once it has succeeded")
  103. err = c.BatchV1().Jobs(namespace).Delete(ctx, deleteKVBucketJobName, metav1.DeleteOptions{})
  104. Expect(err).NotTo(HaveOccurred())
  105. })
  106. })
  107. AfterEach(func() {
  108. cancel()
  109. })
  110. })