The Kubernetes client package for Go provides developers with a vast range of functions to access data and resources in a cluster. Taking advantage of its capabilities can allow the opportunity to build powerful controllers, monitoring and managing your cluster, beyond the scope of what is offered by stock OpenShift or Kubernetes setups.

For example, the PodInterface allows you to list, update, delete, or get specific pods either by namespace or across all namespaces. This interface is complemented by similar implementations for many other cluster resource types such as ReplicationControllers and ResourceQuotas.

// These are the imports used throughout this article
import (
   "log"
   "time"

   "github.com/openshift/origin/pkg/client/cache"
   "github.com/openshift/origin/pkg/cmd/util/clientcmd"

   "github.com/spf13/pflag"
   kapi "k8s.io/kubernetes/pkg/api"
   kcache "k8s.io/kubernetes/pkg/client/cache"
   kclient "k8s.io/kubernetes/pkg/client/unversioned"
   ctlresource "k8s.io/kubernetes/pkg/kubectl/resource"
   "k8s.io/kubernetes/pkg/runtime"
   "k8s.io/kubernetes/pkg/watch"
)

func main() {
   var kubeClient kclient.Interface
   config, err := clientcmd.DefaultClientConfig(pflag.NewFlagSet("empty", pflag.ContinueOnError)).ClientConfig()
   if err != nil {
      log.Printf("Error creating cluster config: %s", err)
   }

   kubeClient, err = kclient.New(config)
   podInterface := kubeClient.Pods(namespace)
   podList, err := podInterface.List(kapi.ListOptions{})

   if err != nil {
      return err
   }
}

And that PodInterface can be used to directly operate on resources in the cluster, such as deleting pods:

for _, pod := range podList.Items {
   err = podInterface.Delete(pod.Name, &kapi.DeleteOptions{})

   if err != nil {
      log.Printf(“Error: %s”, err)
   }
}

When combined with a ListWatch from Kubernetes’ cache package, you can easily monitor and handle incoming events in the cluster related to that type of object. To store and process these events at your leisure, Kubernetes provides a DeltaFIFO struct, while OpenShift’s cache package provides an EventQueue struct, which just expands on the use cases of DeltaFIFO when processing object change events.

podQueue := cache.NewEventQueue(kcache.MetaNamespaceKeyFunc)

podLW := &kcache.ListWatch{
   ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
      return s.kubeClient.Pods(kapi.NamespaceAll).List(options)
   },
   WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
      return s.kubeClient.Pods(kapi.NamespaceAll).Watch(options)
   },
}
kcache.NewReflector(podLW, &kapi.Pod{}, podQueue, 0).Run()

go func() {
   for {
      event, pod, err := podQueue.Pop()
      err = handlePod(event, pod.(*kapi.Pod))
      if err != nil {
         log.Fatalf("Error capturing pod event: %s", err)
      }
   }
}()

These different event types also let you handle events that create, modify, or delete resources differently:

func handlePod(eventType watch.EventType, pod *kapi.Pod) {
   switch eventType {
   case watch.Added:
      log.Printf(“Pod %s added!”, pod.Name)
   case watch.Modified:
      log.Printf(“Pod %s modified!”, pod.Name)
   case watch.Deleted:
      log.Printf(“Pod %s deleted!”, pod.Name)
   }
}

Putting it all together in an example, say you wanted to restrict a certain namespace from creating new pods during specific hours. The full code could look like this:

import (
   "log"
   "time"

   "github.com/openshift/origin/pkg/client/cache"
   "github.com/openshift/origin/pkg/cmd/util/clientcmd"

   "github.com/spf13/pflag"
   kapi "k8s.io/kubernetes/pkg/api"
   kcache "k8s.io/kubernetes/pkg/client/cache"
   kclient "k8s.io/kubernetes/pkg/client/unversioned"
   ctlresource "k8s.io/kubernetes/pkg/kubectl/resource"
   "k8s.io/kubernetes/pkg/runtime"
   "k8s.io/kubernetes/pkg/watch"
)

func main() {
   var kubeClient kclient.Interface
   config, err := clientcmd.DefaultClientConfig(pflag.NewFlagSet("empty", pflag.ContinueOnError)).ClientConfig()
   if err != nil {
      log.Printf("Error creating cluster config: %s", err)
   }

   kubeClient, err = kclient.New(config)
   podQueue := cache.NewEventQueue(kcache.MetaNamespaceKeyFunc)

   podLW := &kcache.ListWatch{
      ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
         return kubeClient.Pods(kapi.NamespaceAll).List(options)
      },
      WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
         return kubeClient.Pods(kapi.NamespaceAll).Watch(options)
      },
   }
   kcache.NewReflector(podLW, &kapi.Pod{}, podQueue, 0).Run()

   go func() {
      for {
         event, pod, err := podQueue.Pop()
         err = handlePod(event, pod.(*kapi.Pod), kubeClient)
         if err != nil {
            log.Fatalf("Error capturing pod event: %s", err)
         }
      }
   }()
}

func handlePod(eventType watch.EventType, pod *kapi.Pod, kubeClient kclient.Interface) {
   switch eventType {
   case watch.Added:
      log.Printf(“Pod %s added!”, pod.Name)
      if pod.Namespace == “namespaceWeWantToRestrict” {
         hour := time.Now().Hour()
         if hour >= 5 && hour <= 10 {
            err := kubeClient.Pods(pod.Namespace).Delete(pod.Name, &kapi.DeleteOptions{})
            if err != nil {
               log.Printf(“Error deleting pod %s: %s”, pod.Name, err)
            }
         }
      }
   case watch.Modified:
      log.Printf(“Pod %s modified!”, pod.Name)
   case watch.Deleted:
      log.Printf(“Pod %s deleted!”, pod.Name)
   }
}

Of course, if this project is using a ReplicationController the pod we deleted will just be recreated causing a loop for the next 5 hours, which is undesirable. In that case, you might also want to use a ReplicationControllerInterface to also scale down any RCs in this project:

if pod.Namespace == “namespaceWeWantToRestrict” {
   hour := time.Now().Hour()
   if hour >= 5 && hour <= 10 {
      rcList, err := kubeClient.ReplicationControllers(pod.Namespace).List(kapi.ListOptions{})
      if err != nil {
         log.Printf(“Error getting RCs in namespace %s: %s”, pod.Namespace, err)
      }

      for _, rc := range rcList.Items {
         rc.Spec.Replicas = 0
         _, err := kubeClient.ReplicationControllers(pod.Namespace).Update(rc)
         if err != nil {
            log.Printf(“Error scaling RC %s: %s”, rc.Name, err)
         }
      }

      err = kubeClient.Pods(pod.Namespace).Delete(pod.Name, &kapi.DeleteOptions{})
      if err != nil {
         log.Printf(“Error deleting pod %s: %s”, pod.Name, err)
      }
   }
}

You may find more practical uses for the Kubernetes client than this, but it’s a good showcase of just how easy it is to interact with your cluster in a small (or maybe large) controller you own. Using a ListWatch allows you to react more dynamically to incoming events, rather than having to try to predict situations you need to control.

Last updated: September 3, 2019