k8s.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package ginkgo_utils
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "io"
  7. "strconv"
  8. "time"
  9. appsv1 "k8s.io/api/apps/v1"
  10. batchv1 "k8s.io/api/batch/v1"
  11. v1 "k8s.io/api/core/v1"
  12. apierrors "k8s.io/apimachinery/pkg/api/errors"
  13. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  14. "k8s.io/client-go/kubernetes"
  15. cv1 "k8s.io/client-go/kubernetes/typed/core/v1"
  16. "k8s.io/client-go/rest"
  17. "k8s.io/client-go/tools/clientcmd"
  18. )
  19. // Cluster initialization functions
  20. // MustBuildClusterConfig builds the Kubernetes rest API configuration handler using a
  21. // given kubeconfig file
  22. func MustBuildClusterConfig(kubeconfig string) *rest.Config {
  23. if kubeconfig == "" {
  24. panic("kubeconfig must be supplied")
  25. }
  26. config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
  27. if err != nil {
  28. panic(err.Error())
  29. }
  30. return config
  31. }
  32. // StatefulSet functions
  33. // StsScale scales a StatefulSet instance to the number of replicas
  34. func StsScale(ctx context.Context, c kubernetes.Interface, ss *appsv1.StatefulSet, count int32) (*appsv1.StatefulSet, error) {
  35. name := ss.Name
  36. ns := ss.Namespace
  37. const maxRetries = 3
  38. for i := 0; i < maxRetries; i++ {
  39. ss, err := c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{})
  40. if err != nil {
  41. return nil, fmt.Errorf("failed to get statefulset %q: %v", name, err)
  42. }
  43. if ss.Status.Replicas == count && ss.Status.AvailableReplicas == count {
  44. return ss, nil
  45. }
  46. *(ss.Spec.Replicas) = count
  47. ss, err = c.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{})
  48. if err == nil {
  49. return ss, nil
  50. }
  51. if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) {
  52. return nil, fmt.Errorf("failed to update statefulset %q: %v", name, err)
  53. }
  54. }
  55. return nil, fmt.Errorf("too many retries draining statefulset %q", name)
  56. }
  57. // StsRolloutRestart performs a rollout restart in StatefulSet instances
  58. func StsRolloutRestart(ctx context.Context, c kubernetes.Interface, ss *appsv1.StatefulSet) (*appsv1.StatefulSet, error) {
  59. name := ss.Name
  60. ns := ss.Namespace
  61. const maxRetries = 3
  62. for i := 0; i < maxRetries; i++ {
  63. ss, err := c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{})
  64. if err != nil {
  65. return nil, fmt.Errorf("failed to get statefulset %q: %v", name, err)
  66. }
  67. // Update the statefulset's annotation to trigger a restart
  68. if ss.Spec.Template.Annotations == nil {
  69. ss.Spec.Template.Annotations = make(map[string]string)
  70. }
  71. ss.Spec.Template.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339)
  72. ss, err = c.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{})
  73. if err == nil {
  74. return ss, nil
  75. }
  76. if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) {
  77. return nil, fmt.Errorf("failed to update statefulset %q: %v", name, err)
  78. }
  79. }
  80. return nil, fmt.Errorf("too many retries draining statefulset %q", name)
  81. }
  82. // StsAnnotateTemplate annotate pods. This also performs a rollout restart in StatefulSet instances
  83. func StsAnnotateTemplate(ctx context.Context, c kubernetes.Interface, ss *appsv1.StatefulSet, annotations map[string]string) (*appsv1.StatefulSet, error) {
  84. name := ss.Name
  85. ns := ss.Namespace
  86. const maxRetries = 3
  87. for i := 0; i < maxRetries; i++ {
  88. ss, err := c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{})
  89. if err != nil {
  90. return nil, fmt.Errorf("failed to get statefulset %q: %v", name, err)
  91. }
  92. // Update the statefulset's annotation to trigger a restart
  93. if ss.Spec.Template.Annotations == nil {
  94. ss.Spec.Template.Annotations = make(map[string]string)
  95. }
  96. for k, v := range annotations {
  97. ss.Spec.Template.Annotations[k] = v
  98. }
  99. ss, err = c.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{})
  100. if err == nil {
  101. return ss, nil
  102. }
  103. if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) {
  104. return nil, fmt.Errorf("failed to update statefulset %q: %v", name, err)
  105. }
  106. }
  107. return nil, fmt.Errorf("too many retries draining statefulset %q", name)
  108. }
  109. // StsGetContainerImageByName returns the container image given the container name of a StatefulSet instance
  110. func StsGetContainerImageByName(ss *appsv1.StatefulSet, name string) (string, error) {
  111. containers := ss.Spec.Template.Spec.Containers
  112. for _, c := range containers {
  113. if c.Name == name {
  114. return c.Image, nil
  115. }
  116. }
  117. return "", fmt.Errorf("container %q not found in statefulset %q", name, ss.Name)
  118. }
  119. // Deployment functions
  120. // DplScale scales a Deployment instance to the number of replicas
  121. func DplScale(ctx context.Context, c kubernetes.Interface, dpl *appsv1.Deployment, count int32) (*appsv1.Deployment, error) {
  122. name := dpl.Name
  123. ns := dpl.Namespace
  124. for i := 0; i < 3; i++ {
  125. currentDpl, err := c.AppsV1().Deployments(ns).Get(ctx, name, metav1.GetOptions{})
  126. if err != nil {
  127. return nil, fmt.Errorf("failed to get deployment %q: %v", name, err)
  128. }
  129. if currentDpl.Status.Replicas == count && currentDpl.Status.AvailableReplicas == count {
  130. return currentDpl, nil
  131. }
  132. *(currentDpl.Spec.Replicas) = count
  133. currentDpl, err = c.AppsV1().Deployments(ns).Update(ctx, currentDpl, metav1.UpdateOptions{})
  134. if err == nil {
  135. return currentDpl, nil
  136. }
  137. if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) {
  138. return nil, fmt.Errorf("failed to update statefulset %q: %v", name, err)
  139. }
  140. }
  141. return nil, fmt.Errorf("too many retries draining statefulset %q", name)
  142. }
  143. // DplGetContainerImage returns the image inside a container of a Deployment instance
  144. func DplGetContainerImage(dpl *appsv1.Deployment, name string) (string, error) {
  145. containers := dpl.Spec.Template.Spec.Containers
  146. for _, c := range containers {
  147. if c.Name == name {
  148. return c.Image, nil
  149. }
  150. }
  151. return "", fmt.Errorf("container %q not found in deployment %q", name, dpl.Name)
  152. }
  153. // Job functions
  154. // JobGetSucceededPods returs the number of succeded runs in a
  155. // Job instance
  156. func JobGetSucceededPods(j *batchv1.Job) int32 {
  157. return j.Status.Succeeded
  158. }
  159. // Service functions
  160. // SvcGetPortByName returns the port number of a svc given its name
  161. func SvcGetPortByName(svc *v1.Service, portName string) (string, error) {
  162. for _, p := range svc.Spec.Ports {
  163. if p.Name == portName {
  164. return strconv.FormatInt(int64(p.Port), 10), nil
  165. }
  166. }
  167. return "", fmt.Errorf("port %q not found in service %q", portName, svc.Name)
  168. }
  169. // Pod functions
  170. // IsPodRunning returns a boolean value indicating if a given pod is running
  171. func IsPodRunning(ctx context.Context, c cv1.PodsGetter, namespace string, podName string) (bool, error) {
  172. pod, err := c.Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
  173. if err != nil {
  174. fmt.Printf("There was an error obtaining the Pod %q", podName)
  175. return false, err
  176. }
  177. return pod.Status.Phase == "Running", nil
  178. }
  179. // GetPodsByLabelOrDie returns the list of pods wih a given selector, exiting in case of failure
  180. func GetPodsByLabelOrDie(ctx context.Context, c cv1.PodsGetter, namespace string, selector string) v1.PodList {
  181. output, err := c.Pods(namespace).List(ctx, metav1.ListOptions{
  182. LabelSelector: selector,
  183. })
  184. if err != nil {
  185. panic(err.Error())
  186. }
  187. fmt.Printf("Obtained list of pods with label %q\n", selector)
  188. return *output
  189. }
  190. // GetContainerLogsOrDie returns container logs, exiting in case of failure
  191. func GetContainerLogsOrDie(ctx context.Context, c cv1.PodsGetter, namespace string, podName string, containerName string) []string {
  192. var output []string
  193. tailLines := int64(100)
  194. readCloser, err := c.Pods(namespace).GetLogs(podName, &v1.PodLogOptions{
  195. Container: containerName,
  196. Follow: false,
  197. TailLines: &tailLines,
  198. }).Stream(ctx)
  199. if err != nil {
  200. panic(err.Error())
  201. }
  202. fmt.Printf("Obtained %q pod's logs\n", podName)
  203. defer readCloser.Close()
  204. scanner := bufio.NewScanner(interruptableReader{ctx, readCloser})
  205. for scanner.Scan() {
  206. output = append(output, scanner.Text())
  207. }
  208. if scanner.Err() != nil {
  209. panic(scanner.Err())
  210. }
  211. return output
  212. }
  213. // ContainerLogsContainPattern returns a boolean indicating if the container logs have a given pattern
  214. func ContainerLogsContainPattern(ctx context.Context, c cv1.PodsGetter, namespace string, podName string, containerName string, pattern string) (bool, error) {
  215. containerLogs := GetContainerLogsOrDie(ctx, c, namespace, podName, containerName)
  216. return containsPattern(containerLogs, pattern)
  217. }
  218. // interruptableReader is a scanner that reads from Kubernetes container logs
  219. type interruptableReader struct {
  220. ctx context.Context
  221. r io.Reader
  222. }
  223. // Read returns content from the Kubernetes logs
  224. func (r interruptableReader) Read(p []byte) (int, error) {
  225. if err := r.ctx.Err(); err != nil {
  226. return 0, err
  227. }
  228. n, err := r.r.Read(p)
  229. if err != nil {
  230. return n, err
  231. }
  232. return n, nil
  233. }