CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-k8s-io--client-go

Official Go client library for Kubernetes API - typed clients, controllers, and cluster interaction tools

Overview
Eval results
Files

tools.mddocs/reference/

Tools - Configuration, Leader Election, and Work Queues

Back to Index

Additional tooling for building production-grade Kubernetes controllers, including kubeconfig management, leader election for high availability, and rate-limiting work queues.

Package Information

  • Clientcmd: k8s.io/client-go/tools/clientcmd
  • Leader Election: k8s.io/client-go/tools/leaderelection
  • Work Queues: k8s.io/client-go/util/workqueue
  • Events: k8s.io/client-go/tools/record
  • Pager: k8s.io/client-go/tools/pager

Clientcmd - Kubeconfig Management

Loading Kubeconfig

import "k8s.io/client-go/tools/clientcmd"

// From default locations with merging rules
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
// Checks: $KUBECONFIG, ~/.kube/config, in-cluster config

configOverrides := &clientcmd.ConfigOverrides{}

kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
    loadingRules, configOverrides)

config, err := kubeConfig.ClientConfig()
namespace, _, err := kubeConfig.Namespace()

// From specific file
config, err := clientcmd.BuildConfigFromFlags("", "/path/to/kubeconfig")

// From master URL and kubeconfig
config, err := clientcmd.BuildConfigFromFlags("https://k8s.example.com", kubeconfig)

// From bytes
config, err := clientcmd.RESTConfigFromKubeConfig(kubeconfigBytes)

Configuration Overrides

configOverrides := &clientcmd.ConfigOverrides{
    CurrentContext: "production-context",
    Context: clientcmdapi.Context{
        Cluster:   "production-cluster",
        AuthInfo:  "admin",
        Namespace: "default",
    },
    ClusterInfo: clientcmdapi.Cluster{
        Server: "https://k8s.example.com:6443",
    },
}

kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
    loadingRules, configOverrides)

Loading Rules

// Default loading rules
rules := clientcmd.NewDefaultClientConfigLoadingRules()

// Custom loading rules
rules := &clientcmd.ClientConfigLoadingRules{
    ExplicitPath: "/path/to/kubeconfig",
    Precedence:   []string{"/path/1", "/path/2"},
}

// Recommended paths
clientcmd.RecommendedConfigPathFlag   // "kubeconfig"
clientcmd.RecommendedConfigPathEnvVar // "KUBECONFIG"
clientcmd.RecommendedHomeDir          // ".kube"
clientcmd.RecommendedFileName         // "config"
clientcmd.RecommendedHomeFile         // ~/.kube/config

Reading and Writing Kubeconfig

// Load config
config, err := clientcmd.LoadFromFile(filename)

// Write config
err := clientcmd.WriteToFile(*config, filename)

// Modify config
err := clientcmd.ModifyConfig(configAccess, newConfig, relativizePaths)

Leader Election

Enables high-availability by electing a leader among multiple controller instances:

import (
    "k8s.io/client-go/tools/leaderelection"
    "k8s.io/client-go/tools/leaderelection/resourcelock"
)

// Create resource lock
lock, err := resourcelock.New(
    resourcelock.LeasesResourceLock,
    "kube-system",
    "my-controller",
    clientset.CoreV1(),
    clientset.CoordinationV1(),
    resourcelock.ResourceLockConfig{
        Identity: "controller-instance-1",
    })
if err != nil {
    panic(err)
}

// Run leader election
ctx := context.Background()
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
    Lock:            lock,
    ReleaseOnCancel: true,
    LeaseDuration:   15 * time.Second,
    RenewDeadline:   10 * time.Second,
    RetryPeriod:     2 * time.Second,
    Callbacks: leaderelection.LeaderCallbacks{
        OnStartedLeading: func(ctx context.Context) {
            // Start controller work
            fmt.Println("Became leader, starting controller")
            runController(ctx)
        },
        OnStoppedLeading: func() {
            // Cleanup and exit
            fmt.Println("Lost leadership, exiting")
            os.Exit(0)
        },
        OnNewLeader: func(identity string) {
            if identity == "controller-instance-1" {
                return
            }
            fmt.Printf("New leader elected: %s\n", identity)
        },
    },
})

Resource Lock Types

const (
    // Endpoints-based lock (deprecated)
    EndpointsResourceLock = "endpoints"

    // ConfigMap-based lock (deprecated)
    ConfigMapsResourceLock = "configmaps"

    // Lease-based lock (preferred)
    LeasesResourceLock = "leases"

    // Combination locks for migration
    EndpointsLeasesResourceLock  = "endpointsleases"
    ConfigMapsLeasesResourceLock = "configmapsleases"
)

Leader Election Config

type LeaderElectionConfig struct {
    // Lock is the resource that will be used for locking
    Lock rl.Interface

    // LeaseDuration is the duration that non-leader candidates will
    // wait to force acquire leadership. This is measured against time of
    // last observed ack. Default: 15s
    LeaseDuration time.Duration

    // RenewDeadline is the duration that the acting leader will retry
    // refreshing leadership before giving up. Default: 10s
    RenewDeadline time.Duration

    // RetryPeriod is the duration the LeaderElector clients should wait
    // between tries of actions. Default: 2s
    RetryPeriod time.Duration

    // Callbacks are callbacks that are triggered during certain lifecycle
    // events of the LeaderElector
    Callbacks LeaderCallbacks

    // WatchDog is the associated health checker
    WatchDog *HealthzAdaptor

    // ReleaseOnCancel should be set true if the lock should be released
    // when the run context is cancelled
    ReleaseOnCancel bool

    // Name is the name of the resource lock for debugging
    Name string
}

Work Queues

Rate-limiting queues for processing controller events:

import "k8s.io/client-go/util/workqueue"

// Create basic queue
queue := workqueue.NewTypedQueue[string]()

// Create rate-limiting queue (most common)
queue := workqueue.NewTypedRateLimitingQueue(
    workqueue.DefaultTypedControllerRateLimiter[string]())

// Create named queue (for metrics)
queue := workqueue.NewTypedNamedRateLimitingQueue(
    workqueue.DefaultTypedControllerRateLimiter[string](),
    "my-controller")

// Add items to queue
queue.Add("key1")
queue.Add("key2")

// Process items
for {
    key, shutdown := queue.Get()
    if shutdown {
        break
    }

    err := processItem(key)
    if err != nil {
        // Retry with exponential backoff
        queue.AddRateLimited(key)
    } else {
        // Success - remove from rate limiter
        queue.Forget(key)
    }

    // Mark item as done
    queue.Done(key)
}

// Shutdown queue
queue.ShutDown()

Queue Interfaces

type TypedInterface[T comparable] interface {
    Add(item T)
    Len() int
    Get() (item T, shutdown bool)
    Done(item T)
    ShutDown()
    ShutDownWithDrain()
    ShuttingDown() bool
}

type TypedRateLimitingInterface[T comparable] interface {
    TypedDelayingInterface[T]

    // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
    AddRateLimited(item T)

    // Forget indicates that an item is finished being retried
    Forget(item T)

    // NumRequeues returns back how many times the item was requeued
    NumRequeues(item T) int
}

type TypedDelayingInterface[T comparable] interface {
    TypedInterface[T]
    // AddAfter adds an item to the workqueue after the indicated duration has passed
    AddAfter(item T, duration time.Duration)
}

Rate Limiters

// Default controller rate limiter (5ms base, 1000s max)
limiter := workqueue.DefaultTypedControllerRateLimiter[string]()

// Item exponential failure rate limiter
limiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](
    5*time.Millisecond,  // Base delay
    1000*time.Second)    // Max delay

// Item fast/slow rate limiter
limiter := workqueue.NewTypedItemFastSlowRateLimiter[string](
    5*time.Millisecond,   // Fast delay
    10*time.Second,       // Slow delay
    5)                    // Fast attempts

// Max of multiple rate limiters
limiter := workqueue.NewTypedMaxOfRateLimiter[string](
    limiter1,
    limiter2,
    limiter3)

// Bucket rate limiter (QPS based)
limiter := workqueue.NewTypedItemBucketRateLimiter[string](qps, burst)

Complete Queue Example

type Controller struct {
    queue workqueue.TypedRateLimitingInterface[string]
}

func (c *Controller) processNextItem(ctx context.Context) bool {
    key, quit := c.queue.Get()
    if quit {
        return false
    }
    defer c.queue.Done(key)

    err := c.syncHandler(ctx, key)
    if err == nil {
        // Success - forget this item for rate limiting
        c.queue.Forget(key)
        return true
    }

    // Check retry count
    if c.queue.NumRequeues(key) < 5 {
        // Retry with rate limiting
        fmt.Printf("Error syncing %s, retrying: %v\n", key, err)
        c.queue.AddRateLimited(key)
        return true
    }

    // Give up after 5 retries
    c.queue.Forget(key)
    fmt.Printf("Dropping %s out of queue: %v\n", key, err)
    return true
}

func (c *Controller) runWorker(ctx context.Context) {
    for c.processNextItem(ctx) {
    }
}

Event Recording

Record events to Kubernetes API:

import (
    "k8s.io/client-go/tools/record"
    "k8s.io/client-go/kubernetes/scheme"
)

// Create event broadcaster
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
    Interface: clientset.CoreV1().Events(""),
})

// Create event recorder
recorder := eventBroadcaster.NewRecorder(
    scheme.Scheme,
    corev1.EventSource{Component: "my-controller"})

// Record events
recorder.Event(pod, corev1.EventTypeNormal, "Created", "Pod created successfully")
recorder.Eventf(pod, corev1.EventTypeWarning, "Failed", "Failed to create: %v", err)

// With annotations
recorder.AnnotatedEventf(
    pod,
    map[string]string{"controller": "my-controller"},
    corev1.EventTypeNormal,
    "Updated",
    "Pod updated")

EventRecorder Interface

type EventRecorder interface {
    Event(object runtime.Object, eventtype, reason, message string)
    Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
    AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

Pager

Paginate large lists efficiently:

import (
    "k8s.io/client-go/tools/pager"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Create pager
pager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
    return clientset.CoreV1().Pods("default").List(ctx, opts)
})

// Process pages
err := pager.EachListItem(context.TODO(), metav1.ListOptions{}, func(obj runtime.Object) error {
    pod := obj.(*corev1.Pod)
    fmt.Printf("Pod: %s\n", pod.Name)
    return nil
})

Complete Controller with All Tools

package main

import (
    "context"
    "fmt"
    "os"
    "time"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/tools/leaderelection"
    "k8s.io/client-go/tools/leaderelection/resourcelock"
    "k8s.io/client-go/tools/record"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/apimachinery/pkg/util/wait"
)

type Controller struct {
    clientset kubernetes.Interface
    queue     workqueue.TypedRateLimitingInterface[string]
    informer  cache.SharedIndexInformer
    recorder  record.EventRecorder
}

func NewController(clientset kubernetes.Interface) *Controller {
    factory := informers.NewSharedInformerFactory(clientset, time.Minute*10)
    podInformer := factory.Core().V1().Pods()

    queue := workqueue.NewTypedNamedRateLimitingQueue(
        workqueue.DefaultTypedControllerRateLimiter[string](),
        "pods")

    // Create event recorder
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
        Interface: clientset.CoreV1().Events(""),
    })
    recorder := eventBroadcaster.NewRecorder(
        scheme.Scheme,
        corev1.EventSource{Component: "my-controller"})

    controller := &Controller{
        clientset: clientset,
        queue:     queue,
        informer:  podInformer.Informer(),
        recorder:  recorder,
    }

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}, _ bool) {
            key, _ := cache.MetaNamespaceKeyFunc(obj)
            queue.Add(key)
        },
    })

    return controller
}

func (c *Controller) Run(ctx context.Context, workers int) {
    defer c.queue.ShutDown()

    go c.informer.Run(ctx.Done())

    if !cache.WaitForCacheSync(ctx.Done(), c.informer.HasSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, c.runWorker, time.Second)
    }

    <-ctx.Done()
}

func (c *Controller) runWorker(ctx context.Context) {
    for c.processNextItem(ctx) {
    }
}

func (c *Controller) processNextItem(ctx context.Context) bool {
    key, quit := c.queue.Get()
    if quit {
        return false
    }
    defer c.queue.Done(key)

    err := c.syncHandler(ctx, key)
    if err == nil {
        c.queue.Forget(key)
        return true
    }

    c.queue.AddRateLimited(key)
    return true
}

func (c *Controller) syncHandler(ctx context.Context, key string) error {
    namespace, name, _ := cache.SplitMetaNamespaceKey(key)

    obj, exists, err := c.informer.GetIndexer().GetByKey(key)
    if err != nil {
        return err
    }

    if !exists {
        return nil
    }

    pod := obj.(*corev1.Pod)

    // Record event
    c.recorder.Eventf(pod, corev1.EventTypeNormal, "Processed",
        "Pod %s/%s processed", namespace, name)

    return nil
}

func main() {
    config, _ := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    clientset, _ := kubernetes.NewForConfig(config)

    controller := NewController(clientset)

    // Setup leader election
    id := os.Getenv("HOSTNAME")
    lock, _ := resourcelock.New(
        resourcelock.LeasesResourceLock,
        "kube-system",
        "my-controller",
        clientset.CoreV1(),
        clientset.CoordinationV1(),
        resourcelock.ResourceLockConfig{Identity: id})

    ctx := context.Background()
    leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
        Lock:            lock,
        ReleaseOnCancel: true,
        LeaseDuration:   15 * time.Second,
        RenewDeadline:   10 * time.Second,
        RetryPeriod:     2 * time.Second,
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: func(ctx context.Context) {
                controller.Run(ctx, 2)
            },
            OnStoppedLeading: func() {
                fmt.Println("Lost leadership")
                os.Exit(0)
            },
        },
    })
}

Additional Tools

List Pagination (tools/pager)

For efficiently processing large lists:

import "k8s.io/client-go/tools/pager"

// Create pager for listing pods
pager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
    return clientset.CoreV1().Pods("default").List(ctx, opts)
})

// Process each page
err := pager.EachListItem(context.TODO(), metav1.ListOptions{}, func(obj runtime.Object) error {
    pod := obj.(*corev1.Pod)
    fmt.Printf("Pod: %s\\n", pod.Name)
    return nil
})

Remote Command Execution (tools/remotecommand)

Execute commands in pod containers:

import (
    "k8s.io/client-go/tools/remotecommand"
    "k8s.io/client-go/kubernetes/scheme"
)

// Create exec request
req := clientset.CoreV1().RESTClient().
    Post().
    Resource("pods").
    Name("my-pod").
    Namespace("default").
    SubResource("exec")

// Configure exec options
req.VersionedParams(&corev1.PodExecOptions{
    Command:   []string{"sh", "-c", "echo hello"},
    Container: "main",
    Stdin:     false,
    Stdout:    true,
    Stderr:    true,
    TTY:       false,
}, scheme.ParameterCodec)

// Create executor
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
    panic(err)
}

// Execute command
var stdout, stderr bytes.Buffer
err = exec.StreamWithContext(context.TODO(), remotecommand.StreamOptions{
    Stdout: &stdout,
    Stderr: &stderr,
})

fmt.Printf("Output: %s\\n", stdout.String())

Port Forwarding (tools/portforward)

Forward local ports to pods:

import "k8s.io/client-go/tools/portforward"
import "k8s.io/client-go/transport/spdy"

// Create port forwarder
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", "default", "my-pod")
transport, upgrader, err := spdy.RoundTripperFor(config)

dialer := spdy.NewDialer(upgrader,
    &http.Client{Transport: transport},
    "POST",
    &url.URL{Scheme: "https", Path: path, Host: config.Host})

stopChan := make(chan struct{}, 1)
readyChan := make(chan struct{})

forwarder, err := portforward.New(dialer,
    []string{"8080:80"}, // local:remote
    stopChan, readyChan,
    os.Stdout, os.Stderr)

go forwarder.ForwardPorts()
<-readyChan // Wait until ready

Structured Events (tools/events)

Modern event recording with events.k8s.io/v1:

import eventsv1 "k8s.io/client-go/kubernetes/typed/events/v1"

// Create event recorder
eventRecorder := events.NewEventRecorderAdapter(
    clientset.EventsV1())

// Record structured event
eventRecorder.Eventf(pod, nil, corev1.EventTypeNormal,
    "Started", "ContainerStarted",
    "Container %s started successfully", containerName)

Best Practices

  1. Use Leader Election: Always use leader election for HA deployments
  2. Named Queues: Use named queues for better metrics
  3. Rate Limiting: Configure appropriate retry delays
  4. Event Recording: Record significant events for debugging
  5. Graceful Shutdown: Handle shutdown signals properly
  6. Metrics: Enable queue metrics for monitoring
  7. Retry Limits: Set maximum retry counts to prevent infinite loops
  8. Context Propagation: Pass contexts through all operations

Related Documentation

Back to Index

Install with Tessl CLI

npx tessl i tessl/golang-k8s-io--client-go

docs

index.md

tile.json