seaweedfs_test.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package seaweedfs_test
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. utils "github.com/bitnami/charts/.vib/common-tests/ginkgo-utils"
  7. . "github.com/onsi/ginkgo/v2"
  8. . "github.com/onsi/gomega"
  9. appsv1 "k8s.io/api/apps/v1"
  10. batchv1 "k8s.io/api/batch/v1"
  11. v1 "k8s.io/api/core/v1"
  12. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  13. "k8s.io/client-go/kubernetes"
  14. )
  15. const (
  16. PollingInterval = 1 * time.Second
  17. )
  18. var _ = Describe("SeaweedFS", Ordered, func() {
  19. var c *kubernetes.Clientset
  20. var ctx context.Context
  21. var cancel context.CancelFunc
  22. BeforeEach(func() {
  23. ctx, cancel = context.WithCancel(context.Background())
  24. conf := utils.MustBuildClusterConfig(kubeconfig)
  25. c = kubernetes.NewForConfigOrDie(conf)
  26. })
  27. When("a file is uploaded and SeaweedFS is scaled down to 0 replicas and back up", func() {
  28. It("should have access to the uploaded file", func() {
  29. getAvailableReplicas := func(ss *appsv1.StatefulSet) int32 { return ss.Status.AvailableReplicas }
  30. getRestartedAtAnnotation := func(pod *v1.Pod) string { return pod.Annotations["kubectl.kubernetes.io/restartedAt"] }
  31. getSucceededJobs := func(j *batchv1.Job) int32 { return j.Status.Succeeded }
  32. getOpts := metav1.GetOptions{}
  33. By("checking all the replicas are available")
  34. masterStsName := fmt.Sprintf("%s-master", releaseName)
  35. masterSts, err := c.AppsV1().StatefulSets(namespace).Get(ctx, masterStsName, getOpts)
  36. Expect(err).NotTo(HaveOccurred())
  37. fsGroup := masterSts.Spec.Template.Spec.SecurityContext.FSGroup
  38. runAsUser := masterSts.Spec.Template.Spec.Containers[0].SecurityContext.RunAsUser
  39. volumeStsName := fmt.Sprintf("%s-volume", releaseName)
  40. volumeSts, err := c.AppsV1().StatefulSets(namespace).Get(ctx, volumeStsName, getOpts)
  41. Expect(err).NotTo(HaveOccurred())
  42. Expect(masterSts.Status.Replicas).NotTo(BeZero())
  43. Expect(volumeSts.Status.Replicas).NotTo(BeZero())
  44. masterOrigReplicas := *masterSts.Spec.Replicas
  45. volumeOrigReplicas := *volumeSts.Spec.Replicas
  46. Eventually(func() (*appsv1.StatefulSet, error) {
  47. return c.AppsV1().StatefulSets(namespace).Get(ctx, masterStsName, getOpts)
  48. }, timeout, PollingInterval).Should(WithTransform(getAvailableReplicas, Equal(masterOrigReplicas)))
  49. Eventually(func() (*appsv1.StatefulSet, error) {
  50. return c.AppsV1().StatefulSets(namespace).Get(ctx, volumeStsName, getOpts)
  51. }, timeout, PollingInterval).Should(WithTransform(getAvailableReplicas, Equal(volumeOrigReplicas)))
  52. masterHeadlessSvcName := fmt.Sprintf("%s-master-headless", releaseName)
  53. svc, err := c.CoreV1().Services(namespace).Get(ctx, masterHeadlessSvcName, getOpts)
  54. Expect(err).NotTo(HaveOccurred())
  55. port, err := utils.SvcGetPortByName(svc, "http")
  56. Expect(err).NotTo(HaveOccurred())
  57. image, err := utils.StsGetContainerImageByName(masterSts, "seaweedfs")
  58. Expect(err).NotTo(HaveOccurred())
  59. jobSuffix := time.Now().Format("20060102150405")
  60. By("creating a pvc")
  61. pvcName := fmt.Sprintf("weed-%s", jobSuffix)
  62. err = createPVC(ctx, c, pvcName, "1G")
  63. Expect(err).NotTo(HaveOccurred())
  64. By("creating a job to upload a file")
  65. uploadJobName := fmt.Sprintf("weed-upload-%s", jobSuffix)
  66. err = createJob(ctx, c, uploadJobName, port, image, pvcName, kindUpload, fsGroup, runAsUser)
  67. Expect(err).NotTo(HaveOccurred())
  68. Eventually(func() (*batchv1.Job, error) {
  69. return c.BatchV1().Jobs(namespace).Get(ctx, uploadJobName, getOpts)
  70. }, timeout, PollingInterval).Should(WithTransform(getSucceededJobs, Equal(int32(1))))
  71. By("rollout restart the master & volume servers")
  72. _, err = utils.StsRolloutRestart(ctx, c, masterSts)
  73. Expect(err).NotTo(HaveOccurred())
  74. for i := int(masterOrigReplicas) - 1; i >= 0; i-- {
  75. Eventually(func() (*v1.Pod, error) {
  76. return c.CoreV1().Pods(namespace).Get(ctx, fmt.Sprintf("%s-%d", masterStsName, i), getOpts)
  77. }, timeout, PollingInterval).Should(WithTransform(getRestartedAtAnnotation, Not(BeEmpty())))
  78. }
  79. Eventually(func() (*appsv1.StatefulSet, error) {
  80. return c.AppsV1().StatefulSets(namespace).Get(ctx, masterStsName, getOpts)
  81. }, timeout, PollingInterval).Should(WithTransform(getAvailableReplicas, Equal(masterOrigReplicas)))
  82. _, err = utils.StsRolloutRestart(ctx, c, volumeSts)
  83. Expect(err).NotTo(HaveOccurred())
  84. for i := int(volumeOrigReplicas) - 1; i >= 0; i-- {
  85. Eventually(func() (*v1.Pod, error) {
  86. return c.CoreV1().Pods(namespace).Get(ctx, fmt.Sprintf("%s-%d", volumeStsName, i), getOpts)
  87. }, timeout, PollingInterval).Should(WithTransform(getRestartedAtAnnotation, Not(BeEmpty())))
  88. }
  89. Eventually(func() (*appsv1.StatefulSet, error) {
  90. return c.AppsV1().StatefulSets(namespace).Get(ctx, volumeStsName, getOpts)
  91. }, timeout, PollingInterval).Should(WithTransform(getAvailableReplicas, Equal(volumeOrigReplicas)))
  92. By("creating a job to download the file")
  93. downloadJobName := fmt.Sprintf("weed-download-%s", jobSuffix)
  94. err = createJob(ctx, c, downloadJobName, port, image, pvcName, kindDownload, fsGroup, runAsUser)
  95. Expect(err).NotTo(HaveOccurred())
  96. Eventually(func() (*batchv1.Job, error) {
  97. return c.BatchV1().Jobs(namespace).Get(ctx, downloadJobName, getOpts)
  98. }, timeout, PollingInterval).Should(WithTransform(getSucceededJobs, Equal(int32(1))))
  99. })
  100. })
  101. AfterEach(func() {
  102. cancel()
  103. })
  104. })