CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-k8s-io--client-go

Official Go client library for Kubernetes API - typed clients, controllers, and cluster interaction tools

Overview
Eval results
Files

controllers.mddocs/reference/

Controllers - Informers, Listers, and Caching

Back to Index

The tools/cache package provides efficient caching and synchronization infrastructure for building Kubernetes controllers. It includes informers for watching resources, listers for efficient local lookups, and various queue implementations.

Package Information

  • Core Package: k8s.io/client-go/tools/cache
  • Shared Informers: k8s.io/client-go/informers
  • Listers: k8s.io/client-go/listers

Core Imports

import (
    "k8s.io/client-go/informers"
    "k8s.io/client-go/listers"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/kubernetes"
)

Architecture Overview

Controllers typically use this architecture:

  1. Reflector: Watches the API server and maintains a local cache
  2. Informer: Combines reflector with event handlers
  3. Indexer: Local cache with indexing for fast lookups
  4. Lister: Read-only interface to the indexer
  5. Work Queue: Rate-limited queue for processing events

Shared Informer Factory

The most common way to create informers is using a shared informer factory:

import (
    "time"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
)

// Create informer factory
clientset, _ := kubernetes.NewForConfig(config)
factory := informers.NewSharedInformerFactory(clientset, time.Minute*10)

// With specific namespace
factory := informers.NewSharedInformerFactoryWithOptions(
    clientset,
    time.Minute*10,
    informers.WithNamespace("default"))

// With label selector
factory := informers.NewSharedInformerFactoryWithOptions(
    clientset,
    time.Minute*10,
    informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
        opts.LabelSelector = "app=myapp"
    }))

Creating Informers

// Get Pod informer
podInformer := factory.Core().V1().Pods()

// Get Deployment informer
deploymentInformer := factory.Apps().V1().Deployments()

// Get Service informer
serviceInformer := factory.Core().V1().Services()

// Get ConfigMap informer
configMapInformer := factory.Core().V1().ConfigMaps()

// Access the underlying shared index informer
sharedInformer := podInformer.Informer()

// Access the lister
podLister := podInformer.Lister()

Event Handlers

import "k8s.io/client-go/tools/cache"

// Add event handlers to informer
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        pod := obj.(*corev1.Pod)
        fmt.Printf("Pod added: %s/%s\n", pod.Namespace, pod.Name)
    },
    UpdateFunc: func(oldObj, newObj interface{}) {
        oldPod := oldObj.(*corev1.Pod)
        newPod := newObj.(*corev1.Pod)
        if oldPod.ResourceVersion == newPod.ResourceVersion {
            // Periodic resync, not a real update
            return
        }
        fmt.Printf("Pod updated: %s/%s\n", newPod.Namespace, newPod.Name)
    },
    DeleteFunc: func(obj interface{}) {
        pod, ok := obj.(*corev1.Pod)
        if !ok {
            // Handle tombstone case
            tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
            if !ok {
                return
            }
            pod, ok = tombstone.Obj.(*corev1.Pod)
            if !ok {
                return
            }
        }
        fmt.Printf("Pod deleted: %s/%s\n", pod.Namespace, pod.Name)
    },
})

ResourceEventHandler Interface

type ResourceEventHandler interface {
    OnAdd(obj interface{}, isInInitialList bool)
    OnUpdate(oldObj, newObj interface{})
    OnDelete(obj interface{})
}

type ResourceEventHandlerFuncs struct {
    AddFunc    func(obj interface{}, isInInitialList bool)
    UpdateFunc func(oldObj, newObj interface{})
    DeleteFunc func(obj interface{})
}

Filtering Event Handler

// Filter events before processing
handler := cache.FilteringResourceEventHandler{
    FilterFunc: func(obj interface{}) bool {
        pod := obj.(*corev1.Pod)
        return pod.Status.Phase == corev1.PodRunning
    },
    Handler: cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}, _ bool) {
            pod := obj.(*corev1.Pod)
            fmt.Printf("Running pod added: %s\n", pod.Name)
        },
        // ... other handlers
    },
}

podInformer.Informer().AddEventHandler(handler)

Starting Informers

// Create stop channel
stopCh := make(chan struct{})
defer close(stopCh)

// Start all informers
factory.Start(stopCh)

// Wait for caches to sync
factory.WaitForCacheSync(stopCh)

// Or wait for specific informers
cache.WaitForCacheSync(stopCh,
    podInformer.Informer().HasSynced,
    deploymentInformer.Informer().HasSynced)

// With context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cache.WaitForNamedCacheSyncWithContext(ctx,
    podInformer.Informer().HasSynced,
    deploymentInformer.Informer().HasSynced)

Listers

Listers provide efficient read-only access to cached objects:

// Get lister
podLister := podInformer.Lister()

// Get pod by namespace and name
pod, err := podLister.Pods("default").Get("my-pod")
if err != nil {
    if errors.IsNotFound(err) {
        // Pod not in cache
    }
}

// List all pods in namespace
pods, err := podLister.Pods("default").List(labels.Everything())

// List with label selector
selector := labels.SelectorFromSet(labels.Set{"app": "myapp"})
pods, err := podLister.Pods("default").List(selector)

// List all pods across all namespaces
pods, err := podLister.List(labels.Everything())

Lister Interfaces

type PodLister interface {
    List(selector labels.Selector) ([]*corev1.Pod, error)
    Pods(namespace string) PodNamespaceLister
}

type PodNamespaceLister interface {
    List(selector labels.Selector) ([]*corev1.Pod, error)
    Get(name string) (*corev1.Pod, error)
}

Indexers

Indexers allow efficient lookups by custom indices:

// Default namespace index
const NamespaceIndex string = "namespace"

// Create indexer with namespace index
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
    cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})

// Add custom index
nodeNameIndexFunc := func(obj interface{}) ([]string, error) {
    pod := obj.(*corev1.Pod)
    return []string{pod.Spec.NodeName}, nil
}

indexer.AddIndexers(cache.Indexers{
    "byNodeName": nodeNameIndexFunc,
})

// Query by index
pods, err := indexer.ByIndex("byNodeName", "node-1")

Store and Indexer Interfaces

type Store interface {
    Add(obj interface{}) error
    Update(obj interface{}) error
    Delete(obj interface{}) error
    List() []interface{}
    ListKeys() []string
    Get(obj interface{}) (item interface{}, exists bool, err error)
    GetByKey(key string) (item interface{}, exists bool, err error)
    Replace([]interface{}, string) error
    Resync() error
}

type Indexer interface {
    Store
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexedValue string) ([]string, error)
    ListIndexFuncValues(indexName string) []string
    ByIndex(indexName, indexedValue string) ([]interface{}, error)
    GetIndexers() Indexers
    AddIndexers(newIndexers Indexers) error
}

type Indexers map[string]IndexFunc
type IndexFunc func(obj interface{}) ([]string, error)

Complete Controller Example

package main

import (
    "context"
    "fmt"
    "time"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/workqueue"
)

type Controller struct {
    clientset kubernetes.Interface
    podLister cache.GenericLister
    podSynced cache.InformerSynced
    queue     workqueue.TypedRateLimitingInterface[string]
}

func NewController(clientset kubernetes.Interface) *Controller {
    // Create informer factory
    factory := informers.NewSharedInformerFactory(clientset, time.Minute*10)

    // Get pod informer
    podInformer := factory.Core().V1().Pods()

    // Create work queue
    queue := workqueue.NewTypedRateLimitingQueue(
        workqueue.DefaultTypedControllerRateLimiter[string]())

    // Create controller
    controller := &Controller{
        clientset: clientset,
        podLister: podInformer.Lister(),
        podSynced: podInformer.Informer().HasSynced,
        queue:     queue,
    }

    // Add event handlers
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}, _ bool) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(newObj)
            if err == nil {
                queue.Add(key)
            }
        },
        DeleteFunc: func(obj interface{}) {
            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
    })

    return controller
}

func (c *Controller) Run(ctx context.Context, workers int) error {
    defer c.queue.ShutDown()

    fmt.Println("Starting controller")

    // Start informer
    go c.podInformer.Run(ctx.Done())

    // Wait for cache sync
    if !cache.WaitForCacheSync(ctx.Done(), c.podSynced) {
        return fmt.Errorf("failed to sync cache")
    }

    fmt.Println("Cache synced")

    // Start workers
    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, c.runWorker, time.Second)
    }

    <-ctx.Done()
    return nil
}

func (c *Controller) runWorker(ctx context.Context) {
    for c.processNextItem(ctx) {
    }
}

func (c *Controller) processNextItem(ctx context.Context) bool {
    key, quit := c.queue.Get()
    if quit {
        return false
    }
    defer c.queue.Done(key)

    err := c.syncHandler(ctx, key)
    if err == nil {
        c.queue.Forget(key)
        return true
    }

    // Retry with rate limiting
    c.queue.AddRateLimited(key)
    fmt.Printf("Error syncing %s: %v\n", key, err)
    return true
}

func (c *Controller) syncHandler(ctx context.Context, key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }

    // Get pod from cache
    pod, err := c.podLister.ByNamespace(namespace).Get(name)
    if err != nil {
        if errors.IsNotFound(err) {
            fmt.Printf("Pod %s/%s deleted\n", namespace, name)
            return nil
        }
        return err
    }

    // Process pod
    fmt.Printf("Processing pod: %s/%s (phase: %s)\n",
        pod.Namespace, pod.Name, pod.Status.Phase)

    return nil
}

func main() {
    config, _ := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    clientset, _ := kubernetes.NewForConfig(config)

    controller := NewController(clientset)

    ctx := context.Background()
    controller.Run(ctx, 2)
}

Key Functions

// Key generation
func MetaNamespaceKeyFunc(obj interface{}) (string, error)
// Returns: "namespace/name" or just "name" for cluster-scoped

func SplitMetaNamespaceKey(key string) (namespace, name string, err error)
// Splits "namespace/name" back into components

func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error)
// Handles DeletedFinalStateUnknown tombstone objects

// Indexing
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error)
// Index function for namespace

// Waiting for sync
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
func WaitForNamedCacheSyncWithContext(ctx context.Context, cacheSyncs ...InformerSynced) bool

Reflector

Low-level component that watches the API and updates a store:

import "k8s.io/client-go/tools/cache"

// Create reflector manually (usually not needed)
lw := &cache.ListWatch{
    ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
        return clientset.CoreV1().Pods("default").List(context.TODO(), options)
    },
    WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
        return clientset.CoreV1().Pods("default").Watch(context.TODO(), options)
    },
}

store := cache.NewStore(cache.MetaNamespaceKeyFunc)

reflector := cache.NewReflector(lw, &corev1.Pod{}, store, time.Minute*10)

stopCh := make(chan struct{})
go reflector.Run(stopCh)

DeltaFIFO

Queue used internally by informers to track changes:

type Delta struct {
    Type   DeltaType
    Object interface{}
}

type DeltaType string

const (
    Added    DeltaType = "Added"
    Updated  DeltaType = "Updated"
    Deleted  DeltaType = "Deleted"
    Replaced DeltaType = "Replaced"
    Sync     DeltaType = "Sync"
)

type Deltas []Delta

// Create DeltaFIFO
fifo := cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
    KeyFunction:  cache.MetaNamespaceKeyFunc,
    KnownObjects: indexer,
})

SharedIndexInformer Interface

type SharedIndexInformer interface {
    AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
    AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
    RemoveEventHandler(handle ResourceEventHandlerRegistration) error
    GetStore() Store
    GetController() Controller
    Run(stopCh <-chan struct{})
    HasSynced() bool
    LastSyncResourceVersion() string
    SetWatchErrorHandler(handler WatchErrorHandler) error
    SetTransform(handler TransformFunc) error
    IsStopped() bool
    AddIndexers(indexers Indexers) error
    GetIndexer() Indexer
}

Best Practices

  1. Use Shared Informers: Always use shared informer factory to avoid duplicate watches
  2. Wait for Cache Sync: Always wait for cache sync before processing events
  3. Use Listers: Use listers for reads instead of calling the API
  4. Handle Tombstones: Handle DeletedFinalStateUnknown in delete handlers
  5. Check Resource Version: Compare resource versions to detect real updates vs resyncs
  6. Use Work Queues: Decouple event handling from processing with work queues
  7. Rate Limiting: Use rate-limiting queues to handle failures gracefully
  8. Resync Period: Set appropriate resync periods (typically 10-30 minutes)
  9. Custom Indices: Add custom indices for efficient lookups
  10. Graceful Shutdown: Close stop channels to cleanly shut down informers

Related Documentation

  • Clientsets - Typed clients used with informers
  • Tools - Work queues and other controller tools
  • Dynamic Client - Dynamic informers for custom resources

Back to Index

Install with Tessl CLI

npx tessl i tessl/golang-k8s-io--client-go

docs

index.md

tile.json