Official Go client library for Kubernetes API - typed clients, controllers, and cluster interaction tools
The tools/cache package provides efficient caching and synchronization infrastructure for building Kubernetes controllers. It includes informers for watching resources, listers for efficient local lookups, and various queue implementations.
k8s.io/client-go/tools/cachek8s.io/client-go/informersk8s.io/client-go/listersimport (
"k8s.io/client-go/informers"
"k8s.io/client-go/listers"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/kubernetes"
)Controllers typically use this architecture:
The most common way to create informers is using a shared informer factory:
import (
"time"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
)
// Create informer factory
clientset, _ := kubernetes.NewForConfig(config)
factory := informers.NewSharedInformerFactory(clientset, time.Minute*10)
// With specific namespace
factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
time.Minute*10,
informers.WithNamespace("default"))
// With label selector
factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
time.Minute*10,
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.LabelSelector = "app=myapp"
}))// Get Pod informer
podInformer := factory.Core().V1().Pods()
// Get Deployment informer
deploymentInformer := factory.Apps().V1().Deployments()
// Get Service informer
serviceInformer := factory.Core().V1().Services()
// Get ConfigMap informer
configMapInformer := factory.Core().V1().ConfigMaps()
// Access the underlying shared index informer
sharedInformer := podInformer.Informer()
// Access the lister
podLister := podInformer.Lister()import "k8s.io/client-go/tools/cache"
// Add event handlers to informer
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod added: %s/%s\n", pod.Namespace, pod.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
if oldPod.ResourceVersion == newPod.ResourceVersion {
// Periodic resync, not a real update
return
}
fmt.Printf("Pod updated: %s/%s\n", newPod.Namespace, newPod.Name)
},
DeleteFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
// Handle tombstone case
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
pod, ok = tombstone.Obj.(*corev1.Pod)
if !ok {
return
}
}
fmt.Printf("Pod deleted: %s/%s\n", pod.Namespace, pod.Name)
},
})type ResourceEventHandler interface {
OnAdd(obj interface{}, isInInitialList bool)
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{}, isInInitialList bool)
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}// Filter events before processing
handler := cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
pod := obj.(*corev1.Pod)
return pod.Status.Phase == corev1.PodRunning
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}, _ bool) {
pod := obj.(*corev1.Pod)
fmt.Printf("Running pod added: %s\n", pod.Name)
},
// ... other handlers
},
}
podInformer.Informer().AddEventHandler(handler)// Create stop channel
stopCh := make(chan struct{})
defer close(stopCh)
// Start all informers
factory.Start(stopCh)
// Wait for caches to sync
factory.WaitForCacheSync(stopCh)
// Or wait for specific informers
cache.WaitForCacheSync(stopCh,
podInformer.Informer().HasSynced,
deploymentInformer.Informer().HasSynced)
// With context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cache.WaitForNamedCacheSyncWithContext(ctx,
podInformer.Informer().HasSynced,
deploymentInformer.Informer().HasSynced)Listers provide efficient read-only access to cached objects:
// Get lister
podLister := podInformer.Lister()
// Get pod by namespace and name
pod, err := podLister.Pods("default").Get("my-pod")
if err != nil {
if errors.IsNotFound(err) {
// Pod not in cache
}
}
// List all pods in namespace
pods, err := podLister.Pods("default").List(labels.Everything())
// List with label selector
selector := labels.SelectorFromSet(labels.Set{"app": "myapp"})
pods, err := podLister.Pods("default").List(selector)
// List all pods across all namespaces
pods, err := podLister.List(labels.Everything())type PodLister interface {
List(selector labels.Selector) ([]*corev1.Pod, error)
Pods(namespace string) PodNamespaceLister
}
type PodNamespaceLister interface {
List(selector labels.Selector) ([]*corev1.Pod, error)
Get(name string) (*corev1.Pod, error)
}Indexers allow efficient lookups by custom indices:
// Default namespace index
const NamespaceIndex string = "namespace"
// Create indexer with namespace index
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
// Add custom index
nodeNameIndexFunc := func(obj interface{}) ([]string, error) {
pod := obj.(*corev1.Pod)
return []string{pod.Spec.NodeName}, nil
}
indexer.AddIndexers(cache.Indexers{
"byNodeName": nodeNameIndexFunc,
})
// Query by index
pods, err := indexer.ByIndex("byNodeName", "node-1")type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}
type Indexer interface {
Store
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
ListIndexFuncValues(indexName string) []string
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
}
type Indexers map[string]IndexFunc
type IndexFunc func(obj interface{}) ([]string, error)package main
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
)
type Controller struct {
clientset kubernetes.Interface
podLister cache.GenericLister
podSynced cache.InformerSynced
queue workqueue.TypedRateLimitingInterface[string]
}
func NewController(clientset kubernetes.Interface) *Controller {
// Create informer factory
factory := informers.NewSharedInformerFactory(clientset, time.Minute*10)
// Get pod informer
podInformer := factory.Core().V1().Pods()
// Create work queue
queue := workqueue.NewTypedRateLimitingQueue(
workqueue.DefaultTypedControllerRateLimiter[string]())
// Create controller
controller := &Controller{
clientset: clientset,
podLister: podInformer.Lister(),
podSynced: podInformer.Informer().HasSynced,
queue: queue,
}
// Add event handlers
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}, _ bool) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
})
return controller
}
func (c *Controller) Run(ctx context.Context, workers int) error {
defer c.queue.ShutDown()
fmt.Println("Starting controller")
// Start informer
go c.podInformer.Run(ctx.Done())
// Wait for cache sync
if !cache.WaitForCacheSync(ctx.Done(), c.podSynced) {
return fmt.Errorf("failed to sync cache")
}
fmt.Println("Cache synced")
// Start workers
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
<-ctx.Done()
return nil
}
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextItem(ctx) {
}
}
func (c *Controller) processNextItem(ctx context.Context) bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncHandler(ctx, key)
if err == nil {
c.queue.Forget(key)
return true
}
// Retry with rate limiting
c.queue.AddRateLimited(key)
fmt.Printf("Error syncing %s: %v\n", key, err)
return true
}
func (c *Controller) syncHandler(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// Get pod from cache
pod, err := c.podLister.ByNamespace(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
fmt.Printf("Pod %s/%s deleted\n", namespace, name)
return nil
}
return err
}
// Process pod
fmt.Printf("Processing pod: %s/%s (phase: %s)\n",
pod.Namespace, pod.Name, pod.Status.Phase)
return nil
}
func main() {
config, _ := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
clientset, _ := kubernetes.NewForConfig(config)
controller := NewController(clientset)
ctx := context.Background()
controller.Run(ctx, 2)
}// Key generation
func MetaNamespaceKeyFunc(obj interface{}) (string, error)
// Returns: "namespace/name" or just "name" for cluster-scoped
func SplitMetaNamespaceKey(key string) (namespace, name string, err error)
// Splits "namespace/name" back into components
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error)
// Handles DeletedFinalStateUnknown tombstone objects
// Indexing
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error)
// Index function for namespace
// Waiting for sync
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
func WaitForNamedCacheSyncWithContext(ctx context.Context, cacheSyncs ...InformerSynced) boolLow-level component that watches the API and updates a store:
import "k8s.io/client-go/tools/cache"
// Create reflector manually (usually not needed)
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.CoreV1().Pods("default").List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.CoreV1().Pods("default").Watch(context.TODO(), options)
},
}
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
reflector := cache.NewReflector(lw, &corev1.Pod{}, store, time.Minute*10)
stopCh := make(chan struct{})
go reflector.Run(stopCh)Queue used internally by informers to track changes:
type Delta struct {
Type DeltaType
Object interface{}
}
type DeltaType string
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Replaced DeltaType = "Replaced"
Sync DeltaType = "Sync"
)
type Deltas []Delta
// Create DeltaFIFO
fifo := cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
KeyFunction: cache.MetaNamespaceKeyFunc,
KnownObjects: indexer,
})type SharedIndexInformer interface {
AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
RemoveEventHandler(handle ResourceEventHandlerRegistration) error
GetStore() Store
GetController() Controller
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
SetWatchErrorHandler(handler WatchErrorHandler) error
SetTransform(handler TransformFunc) error
IsStopped() bool
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}Install with Tessl CLI
npx tessl i tessl/golang-k8s-io--client-go