influxdb_test.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package influxdb_test
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. utils "github.com/bitnami/charts/.vib/common-tests/ginkgo-utils"
  7. . "github.com/onsi/ginkgo/v2"
  8. . "github.com/onsi/gomega"
  9. appsv1 "k8s.io/api/apps/v1"
  10. batchv1 "k8s.io/api/batch/v1"
  11. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  12. "k8s.io/client-go/kubernetes"
  13. "k8s.io/client-go/rest"
  14. )
  15. const (
  16. PollingInterval = 1 * time.Second
  17. )
  18. var _ = Describe("Influxdb", Ordered, func() {
  19. var c *kubernetes.Clientset
  20. var conf *rest.Config
  21. var ctx context.Context
  22. var cancel context.CancelFunc
  23. BeforeEach(func() {
  24. ctx, cancel = context.WithCancel(context.Background())
  25. conf = utils.MustBuildClusterConfig(kubeconfig)
  26. c = kubernetes.NewForConfigOrDie(conf)
  27. })
  28. When("time series data is written and Influxdb is scaled down to 0 replicas and back up", func() {
  29. It("should have access to query the written data", func() {
  30. getAvailableReplicas := func(deploy *appsv1.Deployment) int32 { return deploy.Status.AvailableReplicas }
  31. getSucceededJobs := func(j *batchv1.Job) int32 { return j.Status.Succeeded }
  32. getOpts := metav1.GetOptions{}
  33. By("checking all the replicas are available")
  34. deploy, err := c.AppsV1().Deployments(namespace).Get(ctx, deployName, getOpts)
  35. Expect(err).NotTo(HaveOccurred())
  36. Expect(deploy.Status.Replicas).NotTo(BeZero())
  37. origReplicas := *deploy.Spec.Replicas
  38. Eventually(func() (*appsv1.Deployment, error) {
  39. return c.AppsV1().Deployments(namespace).Get(ctx, deployName, getOpts)
  40. }, timeout, PollingInterval).Should(WithTransform(getAvailableReplicas, Equal(origReplicas)))
  41. svc, err := c.CoreV1().Services(namespace).Get(ctx, deployName, getOpts)
  42. Expect(err).NotTo(HaveOccurred())
  43. port, err := utils.SvcGetPortByName(svc, "http")
  44. Expect(err).NotTo(HaveOccurred())
  45. image, err := utils.DplGetContainerImage(deploy, "influxdb")
  46. Expect(err).NotTo(HaveOccurred())
  47. // Let's obtain the token from the InfluxDB secret
  48. secret, err := c.CoreV1().Secrets(namespace).Get(ctx, "influxdb", getOpts)
  49. Expect(err).NotTo(HaveOccurred())
  50. // The token is stored in the secret as a base64 encoded string
  51. tokenBytes, ok := secret.Data["admin-token"]
  52. Expect(ok).To(BeTrue())
  53. // We don't need to decode the string, the Go K8s client does it for us
  54. token := string(tokenBytes)
  55. // Use current time for allowing the test suite to repeat
  56. jobSuffix := time.Now().Format("20060102150405")
  57. By("creating a job to write data")
  58. createDBJobName := fmt.Sprintf("%s-write-%s",
  59. deployName, jobSuffix)
  60. err = createJob(ctx, c, createDBJobName, port, image, "write", token, `home,room=Living\ Room temp=21.1,hum=35.9,co=0i 1747036800`)
  61. Expect(err).NotTo(HaveOccurred())
  62. Eventually(func() (*batchv1.Job, error) {
  63. return c.BatchV1().Jobs(namespace).Get(ctx, createDBJobName, getOpts)
  64. }, timeout, PollingInterval).Should(WithTransform(getSucceededJobs, Equal(int32(1))))
  65. By("scaling down to 0 replicas")
  66. deploy, err = utils.DplScale(ctx, c, deploy, 0)
  67. Expect(err).NotTo(HaveOccurred())
  68. Eventually(func() (*appsv1.Deployment, error) {
  69. return c.AppsV1().Deployments(namespace).Get(ctx, deployName, getOpts)
  70. }, timeout, PollingInterval).Should(WithTransform(getAvailableReplicas, BeZero()))
  71. By("scaling up to the original replicas")
  72. deploy, err = utils.DplScale(ctx, c, deploy, origReplicas)
  73. Expect(err).NotTo(HaveOccurred())
  74. Eventually(func() (*appsv1.Deployment, error) {
  75. return c.AppsV1().Deployments(namespace).Get(ctx, deployName, getOpts)
  76. }, timeout, PollingInterval).Should(WithTransform(getAvailableReplicas, Equal(origReplicas)))
  77. By("creating a job to query the data")
  78. queryJobName := fmt.Sprintf("%s-query-%s",
  79. deployName, jobSuffix)
  80. err = createJob(ctx, c, queryJobName, port, image, "query", token, "SELECT * FROM home")
  81. Expect(err).NotTo(HaveOccurred())
  82. Eventually(func() (*batchv1.Job, error) {
  83. return c.BatchV1().Jobs(namespace).Get(ctx, queryJobName, getOpts)
  84. }, timeout, PollingInterval).Should(WithTransform(getSucceededJobs, Equal(int32(1))))
  85. })
  86. })
  87. AfterEach(func() {
  88. cancel()
  89. })
  90. })