Official Go client library for Kubernetes API - typed clients, controllers, and cluster interaction tools
Additional tooling for building production-grade Kubernetes controllers, including kubeconfig management, leader election for high availability, and rate-limiting work queues.
k8s.io/client-go/tools/clientcmdk8s.io/client-go/tools/leaderelectionk8s.io/client-go/util/workqueuek8s.io/client-go/tools/recordk8s.io/client-go/tools/pagerimport "k8s.io/client-go/tools/clientcmd"
// From default locations with merging rules
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
// Checks: $KUBECONFIG, ~/.kube/config, in-cluster config
configOverrides := &clientcmd.ConfigOverrides{}
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
loadingRules, configOverrides)
config, err := kubeConfig.ClientConfig()
namespace, _, err := kubeConfig.Namespace()
// From specific file
config, err := clientcmd.BuildConfigFromFlags("", "/path/to/kubeconfig")
// From master URL and kubeconfig
config, err := clientcmd.BuildConfigFromFlags("https://k8s.example.com", kubeconfig)
// From bytes
config, err := clientcmd.RESTConfigFromKubeConfig(kubeconfigBytes)configOverrides := &clientcmd.ConfigOverrides{
CurrentContext: "production-context",
Context: clientcmdapi.Context{
Cluster: "production-cluster",
AuthInfo: "admin",
Namespace: "default",
},
ClusterInfo: clientcmdapi.Cluster{
Server: "https://k8s.example.com:6443",
},
}
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
loadingRules, configOverrides)// Default loading rules
rules := clientcmd.NewDefaultClientConfigLoadingRules()
// Custom loading rules
rules := &clientcmd.ClientConfigLoadingRules{
ExplicitPath: "/path/to/kubeconfig",
Precedence: []string{"/path/1", "/path/2"},
}
// Recommended paths
clientcmd.RecommendedConfigPathFlag // "kubeconfig"
clientcmd.RecommendedConfigPathEnvVar // "KUBECONFIG"
clientcmd.RecommendedHomeDir // ".kube"
clientcmd.RecommendedFileName // "config"
clientcmd.RecommendedHomeFile // ~/.kube/config// Load config
config, err := clientcmd.LoadFromFile(filename)
// Write config
err := clientcmd.WriteToFile(*config, filename)
// Modify config
err := clientcmd.ModifyConfig(configAccess, newConfig, relativizePaths)Enables high-availability by electing a leader among multiple controller instances:
import (
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)
// Create resource lock
lock, err := resourcelock.New(
resourcelock.LeasesResourceLock,
"kube-system",
"my-controller",
clientset.CoreV1(),
clientset.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: "controller-instance-1",
})
if err != nil {
panic(err)
}
// Run leader election
ctx := context.Background()
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// Start controller work
fmt.Println("Became leader, starting controller")
runController(ctx)
},
OnStoppedLeading: func() {
// Cleanup and exit
fmt.Println("Lost leadership, exiting")
os.Exit(0)
},
OnNewLeader: func(identity string) {
if identity == "controller-instance-1" {
return
}
fmt.Printf("New leader elected: %s\n", identity)
},
},
})const (
// Endpoints-based lock (deprecated)
EndpointsResourceLock = "endpoints"
// ConfigMap-based lock (deprecated)
ConfigMapsResourceLock = "configmaps"
// Lease-based lock (preferred)
LeasesResourceLock = "leases"
// Combination locks for migration
EndpointsLeasesResourceLock = "endpointsleases"
ConfigMapsLeasesResourceLock = "configmapsleases"
)type LeaderElectionConfig struct {
// Lock is the resource that will be used for locking
Lock rl.Interface
// LeaseDuration is the duration that non-leader candidates will
// wait to force acquire leadership. This is measured against time of
// last observed ack. Default: 15s
LeaseDuration time.Duration
// RenewDeadline is the duration that the acting leader will retry
// refreshing leadership before giving up. Default: 10s
RenewDeadline time.Duration
// RetryPeriod is the duration the LeaderElector clients should wait
// between tries of actions. Default: 2s
RetryPeriod time.Duration
// Callbacks are callbacks that are triggered during certain lifecycle
// events of the LeaderElector
Callbacks LeaderCallbacks
// WatchDog is the associated health checker
WatchDog *HealthzAdaptor
// ReleaseOnCancel should be set true if the lock should be released
// when the run context is cancelled
ReleaseOnCancel bool
// Name is the name of the resource lock for debugging
Name string
}Rate-limiting queues for processing controller events:
import "k8s.io/client-go/util/workqueue"
// Create basic queue
queue := workqueue.NewTypedQueue[string]()
// Create rate-limiting queue (most common)
queue := workqueue.NewTypedRateLimitingQueue(
workqueue.DefaultTypedControllerRateLimiter[string]())
// Create named queue (for metrics)
queue := workqueue.NewTypedNamedRateLimitingQueue(
workqueue.DefaultTypedControllerRateLimiter[string](),
"my-controller")
// Add items to queue
queue.Add("key1")
queue.Add("key2")
// Process items
for {
key, shutdown := queue.Get()
if shutdown {
break
}
err := processItem(key)
if err != nil {
// Retry with exponential backoff
queue.AddRateLimited(key)
} else {
// Success - remove from rate limiter
queue.Forget(key)
}
// Mark item as done
queue.Done(key)
}
// Shutdown queue
queue.ShutDown()type TypedInterface[T comparable] interface {
Add(item T)
Len() int
Get() (item T, shutdown bool)
Done(item T)
ShutDown()
ShutDownWithDrain()
ShuttingDown() bool
}
type TypedRateLimitingInterface[T comparable] interface {
TypedDelayingInterface[T]
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item T)
// Forget indicates that an item is finished being retried
Forget(item T)
// NumRequeues returns back how many times the item was requeued
NumRequeues(item T) int
}
type TypedDelayingInterface[T comparable] interface {
TypedInterface[T]
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item T, duration time.Duration)
}// Default controller rate limiter (5ms base, 1000s max)
limiter := workqueue.DefaultTypedControllerRateLimiter[string]()
// Item exponential failure rate limiter
limiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](
5*time.Millisecond, // Base delay
1000*time.Second) // Max delay
// Item fast/slow rate limiter
limiter := workqueue.NewTypedItemFastSlowRateLimiter[string](
5*time.Millisecond, // Fast delay
10*time.Second, // Slow delay
5) // Fast attempts
// Max of multiple rate limiters
limiter := workqueue.NewTypedMaxOfRateLimiter[string](
limiter1,
limiter2,
limiter3)
// Bucket rate limiter (QPS based)
limiter := workqueue.NewTypedItemBucketRateLimiter[string](qps, burst)type Controller struct {
queue workqueue.TypedRateLimitingInterface[string]
}
func (c *Controller) processNextItem(ctx context.Context) bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncHandler(ctx, key)
if err == nil {
// Success - forget this item for rate limiting
c.queue.Forget(key)
return true
}
// Check retry count
if c.queue.NumRequeues(key) < 5 {
// Retry with rate limiting
fmt.Printf("Error syncing %s, retrying: %v\n", key, err)
c.queue.AddRateLimited(key)
return true
}
// Give up after 5 retries
c.queue.Forget(key)
fmt.Printf("Dropping %s out of queue: %v\n", key, err)
return true
}
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextItem(ctx) {
}
}Record events to Kubernetes API:
import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/kubernetes/scheme"
)
// Create event broadcaster
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
Interface: clientset.CoreV1().Events(""),
})
// Create event recorder
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
corev1.EventSource{Component: "my-controller"})
// Record events
recorder.Event(pod, corev1.EventTypeNormal, "Created", "Pod created successfully")
recorder.Eventf(pod, corev1.EventTypeWarning, "Failed", "Failed to create: %v", err)
// With annotations
recorder.AnnotatedEventf(
pod,
map[string]string{"controller": "my-controller"},
corev1.EventTypeNormal,
"Updated",
"Pod updated")type EventRecorder interface {
Event(object runtime.Object, eventtype, reason, message string)
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}Paginate large lists efficiently:
import (
"k8s.io/client-go/tools/pager"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Create pager
pager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return clientset.CoreV1().Pods("default").List(ctx, opts)
})
// Process pages
err := pager.EachListItem(context.TODO(), metav1.ListOptions{}, func(obj runtime.Object) error {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod: %s\n", pod.Name)
return nil
})package main
import (
"context"
"fmt"
"os"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/apimachinery/pkg/util/wait"
)
type Controller struct {
clientset kubernetes.Interface
queue workqueue.TypedRateLimitingInterface[string]
informer cache.SharedIndexInformer
recorder record.EventRecorder
}
func NewController(clientset kubernetes.Interface) *Controller {
factory := informers.NewSharedInformerFactory(clientset, time.Minute*10)
podInformer := factory.Core().V1().Pods()
queue := workqueue.NewTypedNamedRateLimitingQueue(
workqueue.DefaultTypedControllerRateLimiter[string](),
"pods")
// Create event recorder
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
Interface: clientset.CoreV1().Events(""),
})
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
corev1.EventSource{Component: "my-controller"})
controller := &Controller{
clientset: clientset,
queue: queue,
informer: podInformer.Informer(),
recorder: recorder,
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}, _ bool) {
key, _ := cache.MetaNamespaceKeyFunc(obj)
queue.Add(key)
},
})
return controller
}
func (c *Controller) Run(ctx context.Context, workers int) {
defer c.queue.ShutDown()
go c.informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), c.informer.HasSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
<-ctx.Done()
}
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextItem(ctx) {
}
}
func (c *Controller) processNextItem(ctx context.Context) bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncHandler(ctx, key)
if err == nil {
c.queue.Forget(key)
return true
}
c.queue.AddRateLimited(key)
return true
}
func (c *Controller) syncHandler(ctx context.Context, key string) error {
namespace, name, _ := cache.SplitMetaNamespaceKey(key)
obj, exists, err := c.informer.GetIndexer().GetByKey(key)
if err != nil {
return err
}
if !exists {
return nil
}
pod := obj.(*corev1.Pod)
// Record event
c.recorder.Eventf(pod, corev1.EventTypeNormal, "Processed",
"Pod %s/%s processed", namespace, name)
return nil
}
func main() {
config, _ := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
clientset, _ := kubernetes.NewForConfig(config)
controller := NewController(clientset)
// Setup leader election
id := os.Getenv("HOSTNAME")
lock, _ := resourcelock.New(
resourcelock.LeasesResourceLock,
"kube-system",
"my-controller",
clientset.CoreV1(),
clientset.CoordinationV1(),
resourcelock.ResourceLockConfig{Identity: id})
ctx := context.Background()
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
controller.Run(ctx, 2)
},
OnStoppedLeading: func() {
fmt.Println("Lost leadership")
os.Exit(0)
},
},
})
}For efficiently processing large lists:
import "k8s.io/client-go/tools/pager"
// Create pager for listing pods
pager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return clientset.CoreV1().Pods("default").List(ctx, opts)
})
// Process each page
err := pager.EachListItem(context.TODO(), metav1.ListOptions{}, func(obj runtime.Object) error {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod: %s\\n", pod.Name)
return nil
})Execute commands in pod containers:
import (
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/kubernetes/scheme"
)
// Create exec request
req := clientset.CoreV1().RESTClient().
Post().
Resource("pods").
Name("my-pod").
Namespace("default").
SubResource("exec")
// Configure exec options
req.VersionedParams(&corev1.PodExecOptions{
Command: []string{"sh", "-c", "echo hello"},
Container: "main",
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
// Create executor
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
panic(err)
}
// Execute command
var stdout, stderr bytes.Buffer
err = exec.StreamWithContext(context.TODO(), remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: &stderr,
})
fmt.Printf("Output: %s\\n", stdout.String())Forward local ports to pods:
import "k8s.io/client-go/tools/portforward"
import "k8s.io/client-go/transport/spdy"
// Create port forwarder
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", "default", "my-pod")
transport, upgrader, err := spdy.RoundTripperFor(config)
dialer := spdy.NewDialer(upgrader,
&http.Client{Transport: transport},
"POST",
&url.URL{Scheme: "https", Path: path, Host: config.Host})
stopChan := make(chan struct{}, 1)
readyChan := make(chan struct{})
forwarder, err := portforward.New(dialer,
[]string{"8080:80"}, // local:remote
stopChan, readyChan,
os.Stdout, os.Stderr)
go forwarder.ForwardPorts()
<-readyChan // Wait until readyModern event recording with events.k8s.io/v1:
import eventsv1 "k8s.io/client-go/kubernetes/typed/events/v1"
// Create event recorder
eventRecorder := events.NewEventRecorderAdapter(
clientset.EventsV1())
// Record structured event
eventRecorder.Eventf(pod, nil, corev1.EventTypeNormal,
"Started", "ContainerStarted",
"Container %s started successfully", containerName)Install with Tessl CLI
npx tessl i tessl/golang-k8s-io--client-go