CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-k8s-io--client-go

Official Go client library for Kubernetes API - typed clients, controllers, and cluster interaction tools

Overview
Eval results
Files

usage-patterns.mddocs/reference/

Usage Patterns

Back to Index

This document provides common usage patterns and practical examples for working with k8s.io/client-go across different scenarios.

Quick Start Patterns

Minimal In-Cluster Application

package main

import (
    "context"
    "fmt"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func main() {
    // Get in-cluster configuration
    config, err := rest.InClusterConfig()
    if err != nil {
        panic(err)
    }

    // Create clientset
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    // List pods in current namespace
    pods, err := clientset.CoreV1().Pods("").List(
        context.TODO(),
        metav1.ListOptions{})
    if err != nil {
        panic(err)
    }

    fmt.Printf("Found %d pods\\n", len(pods.Items))
}

CLI Tool with Kubeconfig

package main

import (
    "context"
    "flag"
    "fmt"
    "path/filepath"

    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func main() {
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig",
            filepath.Join(home, ".kube", "config"),
            "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    // Build config from kubeconfig file
    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err)
    }

    // Create clientset
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    // Get server version
    version, err := clientset.Discovery().ServerVersion()
    if err != nil {
        panic(err)
    }

    fmt.Printf("Kubernetes version: %s\\n", version.GitVersion)
}

Resource Management Patterns

Create, Read, Update, Delete (CRUD)

import (
    "context"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/api/errors"
)

// Create
configMap := &corev1.ConfigMap{
    ObjectMeta: metav1.ObjectMeta{
        Name:      "my-config",
        Namespace: "default",
    },
    Data: map[string]string{
        "key": "value",
    },
}
created, err := clientset.CoreV1().ConfigMaps("default").Create(
    context.TODO(), configMap, metav1.CreateOptions{})

// Read (Get)
retrieved, err := clientset.CoreV1().ConfigMaps("default").Get(
    context.TODO(), "my-config", metav1.GetOptions{})

// Update
retrieved.Data["key"] = "new-value"
updated, err := clientset.CoreV1().ConfigMaps("default").Update(
    context.TODO(), retrieved, metav1.UpdateOptions{})

// Delete
err = clientset.CoreV1().ConfigMaps("default").Delete(
    context.TODO(), "my-config", metav1.DeleteOptions{})

// Check if not found
if errors.IsNotFound(err) {
    fmt.Println("ConfigMap not found")
}

List with Label Selector

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// List pods with specific labels
pods, err := clientset.CoreV1().Pods("default").List(
    context.TODO(),
    metav1.ListOptions{
        LabelSelector: "app=myapp,tier=frontend",
    })

// List with field selector
pods, err = clientset.CoreV1().Pods("default").List(
    context.TODO(),
    metav1.ListOptions{
        FieldSelector: "status.phase=Running",
    })

// Combined selectors
pods, err = clientset.CoreV1().Pods("default").List(
    context.TODO(),
    metav1.ListOptions{
        LabelSelector: "app=myapp",
        FieldSelector: "status.phase=Running",
        Limit:         100,
    })

Patch Operations

import (
    "k8s.io/apimachinery/pkg/types"
)

// JSON Patch
jsonPatch := []byte(`[{"op": "replace", "path": "/data/key", "value": "patched-value"}]`)
patched, err := clientset.CoreV1().ConfigMaps("default").Patch(
    context.TODO(),
    "my-config",
    types.JSONPatchType,
    jsonPatch,
    metav1.PatchOptions{})

// Merge Patch
mergePatch := []byte(`{"data": {"key": "merged-value"}}`)
patched, err = clientset.CoreV1().ConfigMaps("default").Patch(
    context.TODO(),
    "my-config",
    types.MergePatchType,
    mergePatch,
    metav1.PatchOptions{})

// Strategic Merge Patch
strategicPatch := []byte(`{"data": {"new-key": "new-value"}}`)
patched, err = clientset.CoreV1().ConfigMaps("default").Patch(
    context.TODO(),
    "my-config",
    types.StrategicMergePatchType,
    strategicPatch,
    metav1.PatchOptions{})

Custom Resources Pattern

Working with Custom Resources

import (
    "k8s.io/client-go/dynamic"
    "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    "k8s.io/apimachinery/pkg/runtime/schema"
)

// Create dynamic client
dynamicClient, err := dynamic.NewForConfig(config)

// Define CRD GroupVersionResource
crontabGVR := schema.GroupVersionResource{
    Group:    "stable.example.com",
    Version:  "v1",
    Resource: "crontabs",
}

// Create custom resource
crontab := &unstructured.Unstructured{
    Object: map[string]interface{}{
        "apiVersion": "stable.example.com/v1",
        "kind":       "CronTab",
        "metadata": map[string]interface{}{
            "name": "my-crontab",
        },
        "spec": map[string]interface{}{
            "cronSpec": "*/5 * * * *",
            "image":    "my-cron-image:latest",
            "replicas": 1,
        },
    },
}

result, err := dynamicClient.Resource(crontabGVR).
    Namespace("default").
    Create(context.TODO(), crontab, metav1.CreateOptions{})

// List custom resources
list, err := dynamicClient.Resource(crontabGVR).
    Namespace("default").
    List(context.TODO(), metav1.ListOptions{})

for _, item := range list.Items {
    spec, _ := item.Object["spec"].(map[string]interface{})
    cronSpec := spec["cronSpec"].(string)
    fmt.Printf("CronTab %s: %s\\n", item.GetName(), cronSpec)
}

Controller Pattern

Basic Controller Implementation

import (
    "context"
    "fmt"
    "time"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
)

type Controller struct {
    clientset      kubernetes.Interface
    podInformer    cache.SharedIndexInformer
    podLister      cache.GenericLister
    queue          workqueue.TypedRateLimitingInterface[string]
}

func NewController(clientset kubernetes.Interface) *Controller {
    // Create informer factory
    factory := informers.NewSharedInformerFactory(clientset, time.Minute*10)
    podInformer := factory.Core().V1().Pods().Informer()
    podLister := factory.Core().V1().Pods().Lister()

    // Create work queue
    queue := workqueue.NewTypedRateLimitingQueue(
        workqueue.DefaultTypedControllerRateLimiter[string]())

    // Register event handlers
    _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(newObj)
            if err == nil {
                queue.Add(key)
            }
        },
        DeleteFunc: func(obj interface{}) {
            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
    })
    if err != nil {
        panic(err)
    }

    return &Controller{
        clientset:   clientset,
        podInformer: podInformer,
        podLister:   podLister,
        queue:       queue,
    }
}

func (c *Controller) Run(ctx context.Context, workers int) error {
    defer c.queue.ShutDown()

    // Start informer
    go c.podInformer.Run(ctx.Done())

    // Wait for cache sync
    if !cache.WaitForCacheSync(ctx.Done(), c.podInformer.HasSynced) {
        return fmt.Errorf("failed to sync cache")
    }

    // Start workers
    for i := 0; i < workers; i++ {
        go c.runWorker(ctx)
    }

    <-ctx.Done()
    return nil
}

func (c *Controller) runWorker(ctx context.Context) {
    for c.processNextItem(ctx) {
    }
}

func (c *Controller) processNextItem(ctx context.Context) bool {
    key, quit := c.queue.Get()
    if quit {
        return false
    }
    defer c.queue.Done(key)

    err := c.syncHandler(ctx, key)
    c.handleErr(err, key)
    return true
}

func (c *Controller) syncHandler(ctx context.Context, key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }

    // Get pod from cache
    obj, exists, err := c.podInformer.GetIndexer().GetByKey(key)
    if err != nil {
        return err
    }

    if !exists {
        fmt.Printf("Pod %s/%s deleted\\n", namespace, name)
        return nil
    }

    pod := obj.(*corev1.Pod)
    fmt.Printf("Processing pod %s/%s in phase %s\\n",
        pod.Namespace, pod.Name, pod.Status.Phase)

    // Your reconciliation logic here
    return nil
}

func (c *Controller) handleErr(err error, key string) {
    if err == nil {
        c.queue.Forget(key)
        return
    }

    if c.queue.NumRequeues(key) < 5 {
        fmt.Printf("Error syncing pod %s: %v\\n", key, err)
        c.queue.AddRateLimited(key)
        return
    }

    c.queue.Forget(key)
    fmt.Printf("Dropping pod %s out of queue: %v\\n", key, err)
}

Watch Pattern

Streaming Updates

import (
    "k8s.io/apimachinery/pkg/watch"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Watch for pod changes
watcher, err := clientset.CoreV1().Pods("default").Watch(
    context.TODO(),
    metav1.ListOptions{
        LabelSelector: "app=myapp",
    })
if err != nil {
    panic(err)
}
defer watcher.Stop()

// Process watch events
for event := range watcher.ResultChan() {
    pod, ok := event.Object.(*corev1.Pod)
    if !ok {
        continue
    }

    switch event.Type {
    case watch.Added:
        fmt.Printf("Pod added: %s\\n", pod.Name)
    case watch.Modified:
        fmt.Printf("Pod modified: %s (phase: %s)\\n",
            pod.Name, pod.Status.Phase)
    case watch.Deleted:
        fmt.Printf("Pod deleted: %s\\n", pod.Name)
    case watch.Error:
        fmt.Printf("Watch error: %v\\n", event.Object)
    }
}

Status Subresource Pattern

Updating Resource Status

import (
    appsv1 "k8s.io/api/apps/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Get deployment
deployment, err := clientset.AppsV1().Deployments("default").Get(
    context.TODO(), "my-deployment", metav1.GetOptions{})
if err != nil {
    panic(err)
}

// Modify status (usually done by controller)
deployment.Status.Replicas = 3
deployment.Status.ReadyReplicas = 3
deployment.Status.AvailableReplicas = 3

// Update status subresource
updatedDeployment, err := clientset.AppsV1().Deployments("default").
    UpdateStatus(context.TODO(), deployment, metav1.UpdateOptions{})

Exec into Pod Pattern

Execute Command in Container

import (
    "bytes"
    "io"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/tools/remotecommand"
)

// Execute command in pod
req := clientset.CoreV1().RESTClient().
    Post().
    Namespace("default").
    Resource("pods").
    Name("my-pod").
    SubResource("exec").
    VersionedParams(&corev1.PodExecOptions{
        Container: "main",
        Command:   []string{"sh", "-c", "echo hello"},
        Stdout:    true,
        Stderr:    true,
    }, scheme.ParameterCodec)

exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
    panic(err)
}

var stdout, stderr bytes.Buffer
err = exec.StreamWithContext(context.TODO(), remotecommand.StreamOptions{
    Stdout: &stdout,
    Stderr: &stderr,
})
if err != nil {
    panic(err)
}

fmt.Printf("Output: %s\\n", stdout.String())

Port Forward Pattern

Forward Local Port to Pod

import (
    "net/http"
    "net/url"

    "k8s.io/client-go/tools/portforward"
    "k8s.io/client-go/transport/spdy"
)

// Port forward to pod
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward",
    "default", "my-pod")
hostIP := strings.TrimLeft(config.Host, "htps:/")

transport, upgrader, err := spdy.RoundTripperFor(config)
if err != nil {
    panic(err)
}

url := &url.URL{
    Scheme: "https",
    Path:   path,
    Host:   hostIP,
}

dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport},
    http.MethodPost, url)

stopChan := make(chan struct{}, 1)
readyChan := make(chan struct{}, 1)

ports := []string{"8080:80"} // local:remote
forwarder, err := portforward.New(dialer, ports, stopChan, readyChan,
    os.Stdout, os.Stderr)
if err != nil {
    panic(err)
}

go func() {
    if err := forwarder.ForwardPorts(); err != nil {
        panic(err)
    }
}()

<-readyChan
fmt.Println("Port forward ready")
// Now you can access localhost:8080

Context and Timeout Patterns

Request Timeouts

import (
    "context"
    "time"
)

// Context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

pods, err := clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{})
if err != nil {
    if ctx.Err() == context.DeadlineExceeded {
        fmt.Println("Request timed out")
    }
    panic(err)
}

Request Cancellation

// Context with cancellation
ctx, cancel := context.WithCancel(context.Background())

// Start long-running operation
go func() {
    watcher, err := clientset.CoreV1().Pods("default").Watch(ctx, metav1.ListOptions{})
    if err != nil {
        return
    }
    for event := range watcher.ResultChan() {
        // Process events
        _ = event
    }
}()

// Cancel after some time
time.Sleep(5 * time.Minute)
cancel() // This stops the watch

Configuration Patterns

Custom Rate Limiting

import (
    "k8s.io/client-go/rest"
)

config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
    panic(err)
}

// Configure rate limiting
config.QPS = 50.0   // 50 queries per second
config.Burst = 100  // Burst up to 100 requests

clientset, err := kubernetes.NewForConfig(config)

Custom Timeouts

config.Timeout = 30 * time.Second

// Per-request timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

pod, err := clientset.CoreV1().Pods("default").Get(
    ctx, "my-pod", metav1.GetOptions{})

Custom User Agent

config.UserAgent = "my-app/v1.0.0"

clientset, err := kubernetes.NewForConfig(config)

Error Handling Patterns

Comprehensive Error Checking

import (
    apierrors "k8s.io/apimachinery/pkg/api/errors"
)

pod, err := clientset.CoreV1().Pods("default").Get(
    context.TODO(), "my-pod", metav1.GetOptions{})

if err != nil {
    switch {
    case apierrors.IsNotFound(err):
        fmt.Println("Pod not found - create it")
    case apierrors.IsAlreadyExists(err):
        fmt.Println("Pod already exists")
    case apierrors.IsConflict(err):
        fmt.Println("Resource version conflict - retry")
    case apierrors.IsForbidden(err):
        fmt.Println("Access forbidden - check RBAC")
    case apierrors.IsUnauthorized(err):
        fmt.Println("Authentication failed")
    case apierrors.IsTimeout(err):
        fmt.Println("Request timed out")
    case apierrors.IsServerTimeout(err):
        fmt.Println("Server timeout")
    case apierrors.IsInvalid(err):
        fmt.Println("Invalid request:", err)
    case apierrors.IsBadRequest(err):
        fmt.Println("Bad request:", err)
    default:
        fmt.Printf("Unexpected error: %v\\n", err)
    }
    return
}

Retry with Backoff

import (
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/util/retry"
)

// Retry with exponential backoff
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
    // Get the latest version
    pod, err := clientset.CoreV1().Pods("default").Get(
        context.TODO(), "my-pod", metav1.GetOptions{})
    if err != nil {
        return err
    }

    // Modify pod
    pod.Labels["updated"] = "true"

    // Update
    _, err = clientset.CoreV1().Pods("default").Update(
        context.TODO(), pod, metav1.UpdateOptions{})
    return err
})

if err != nil {
    fmt.Printf("Update failed: %v\\n", err)
}

Resource Version Pattern

Optimistic Concurrency Control

// Get current resource
configMap, err := clientset.CoreV1().ConfigMaps("default").Get(
    context.TODO(), "my-config", metav1.GetOptions{})
if err != nil {
    panic(err)
}

// Store resource version
resourceVersion := configMap.ResourceVersion

// Modify
configMap.Data["key"] = "new-value"

// Update will fail if resource was modified by another client
updated, err := clientset.CoreV1().ConfigMaps("default").Update(
    context.TODO(), configMap, metav1.UpdateOptions{})

if apierrors.IsConflict(err) {
    fmt.Println("Conflict: resource was modified, retry")
}

Best Practices

  1. Always use contexts: Pass context to all API calls for proper cancellation and timeout handling
  2. Use informers for controllers: Don't poll the API server - use informers for efficient caching
  3. Implement proper error handling: Use apierrors package to identify specific error types
  4. Configure rate limiting: Set appropriate QPS and Burst values to avoid overwhelming the API server
  5. Use resource versions: Implement optimistic concurrency control to prevent conflicts
  6. Leverage Server-Side Apply: Use SSA for declarative management and multi-controller scenarios
  7. Use work queues: Implement rate-limited work queues for reliable event processing
  8. Implement graceful shutdown: Handle signals properly and drain work queues before exiting
  9. Use listers over Get: For repeated reads, use listers that read from cache instead of API calls
  10. Be namespace-aware: Always specify namespace explicitly to avoid confusion

Install with Tessl CLI

npx tessl i tessl/golang-k8s-io--client-go@0.35.0

docs

index.md

tile.json