CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-k8s-io--client-go

Official Go client library for Kubernetes API - typed clients, controllers, and cluster interaction tools

Overview
Eval results
Files

edge-cases.mddocs/examples/

Edge Cases and Advanced Scenarios

This document covers advanced scenarios, edge cases, and troubleshooting patterns when using k8s.io/client-go.

Table of Contents

  1. Handling Resource Version Conflicts
  2. Dealing with Large List Responses
  3. Retrying Failed Operations
  4. Custom Resource Definitions
  5. Multi-Tenancy Patterns
  6. Rate Limiting and Throttling
  7. Handling API Deprecations
  8. Cross-Version Compatibility

Handling Resource Version Conflicts

When updating resources, you may encounter conflicts due to optimistic concurrency control.

package main

import (
    "context"
    "fmt"
    "time"

    apierrors "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/kubernetes"
)

// UpdateWithRetry retries an update operation on conflict
func UpdateWithRetry(clientset *kubernetes.Clientset, namespace, deploymentName string, 
    updateFunc func(*appsv1.Deployment)) error {
    
    return wait.ExponentialBackoff(wait.Backoff{
        Duration: 10 * time.Millisecond,
        Factor:   2.0,
        Steps:    5,
        Cap:      1 * time.Second,
    }, func() (bool, error) {
        // Get latest version
        deployment, err := clientset.AppsV1().Deployments(namespace).Get(
            context.TODO(), deploymentName, metav1.GetOptions{})
        if err != nil {
            return false, err
        }

        // Apply changes
        updateFunc(deployment)

        // Try to update
        _, err = clientset.AppsV1().Deployments(namespace).Update(
            context.TODO(), deployment, metav1.UpdateOptions{})
        
        if err != nil {
            if apierrors.IsConflict(err) {
                // Conflict, retry
                return false, nil
            }
            // Other error, fail
            return false, err
        }

        // Success
        return true, nil
    })
}

// Example usage
func Example_UpdateWithRetry(clientset *kubernetes.Clientset) {
    err := UpdateWithRetry(clientset, "default", "my-deployment", 
        func(deployment *appsv1.Deployment) {
            replicas := int32(5)
            deployment.Spec.Replicas = &replicas
        })
    
    if err != nil {
        fmt.Printf("Failed to update: %v\n", err)
    }
}

Dealing with Large List Responses

Use pagination to efficiently handle large lists.

package main

import (
    "context"
    "fmt"

    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
)

// ListPodsWithPagination lists all pods using pagination
func ListPodsWithPagination(clientset *kubernetes.Clientset, namespace string, pageSize int64) ([]corev1.Pod, error) {
    var allPods []corev1.Pod
    var continueToken string

    for {
        listOptions := metav1.ListOptions{
            Limit:    pageSize,
            Continue: continueToken,
        }

        pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), listOptions)
        if err != nil {
            return nil, err
        }

        allPods = append(allPods, pods.Items...)

        continueToken = pods.Continue
        if continueToken == "" {
            // No more pages
            break
        }

        fmt.Printf("Fetched %d pods so far...\n", len(allPods))
    }

    return allPods, nil
}

// StreamingProcessor processes large lists without loading everything into memory
func StreamingProcessor(clientset *kubernetes.Clientset, namespace string) error {
    var continueToken string
    pageSize := int64(100)

    for {
        pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
            Limit:    pageSize,
            Continue: continueToken,
        })
        if err != nil {
            return err
        }

        // Process each pod individually
        for _, pod := range pods.Items {
            processPod(&pod)
        }

        continueToken = pods.Continue
        if continueToken == "" {
            break
        }
    }

    return nil
}

func processPod(pod *corev1.Pod) {
    // Process pod...
    fmt.Printf("Processing pod: %s\n", pod.Name)
}

Retrying Failed Operations

Implement robust retry logic for transient failures.

package main

import (
    "context"
    "fmt"
    "time"

    apierrors "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/kubernetes"
)

type RetryConfig struct {
    MaxRetries  int
    InitialWait time.Duration
    MaxWait     time.Duration
    Multiplier  float64
}

// RetryOnError retries an operation with exponential backoff
func RetryOnError(config RetryConfig, operation func() error) error {
    backoff := wait.Backoff{
        Duration: config.InitialWait,
        Factor:   config.Multiplier,
        Steps:    config.MaxRetries,
        Cap:      config.MaxWait,
    }

    return wait.ExponentialBackoff(backoff, func() (bool, error) {
        err := operation()
        if err == nil {
            return true, nil // Success
        }

        // Check if error is retryable
        if isRetryableError(err) {
            fmt.Printf("Retryable error: %v, retrying...\n", err)
            return false, nil // Retry
        }

        // Non-retryable error
        return false, err
    })
}

func isRetryableError(err error) bool {
    // Retry on server errors, timeouts, and conflicts
    return apierrors.IsServerTimeout(err) ||
        apierrors.IsServiceUnavailable(err) ||
        apierrors.IsTimeout(err) ||
        apierrors.IsTooManyRequests(err) ||
        apierrors.IsInternalError(err) ||
        apierrors.IsConflict(err)
}

// Example: Create pod with retry
func CreatePodWithRetry(clientset *kubernetes.Clientset, pod *corev1.Pod) error {
    config := RetryConfig{
        MaxRetries:  5,
        InitialWait: 100 * time.Millisecond,
        MaxWait:     30 * time.Second,
        Multiplier:  2.0,
    }

    return RetryOnError(config, func() error {
        _, err := clientset.CoreV1().Pods(pod.Namespace).Create(
            context.TODO(), pod, metav1.CreateOptions{})
        return err
    })
}

Custom Resource Definitions

Working with CRDs using the dynamic client.

package main

import (
    "context"
    "fmt"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/rest"
)

type CRDManager struct {
    dynamicClient dynamic.Interface
}

func NewCRDManager(config *rest.Config) (*CRDManager, error) {
    dynamicClient, err := dynamic.NewForConfig(config)
    if err != nil {
        return nil, err
    }

    return &CRDManager{dynamicClient: dynamicClient}, nil
}

// CreateCustomResource creates a custom resource
func (m *CRDManager) CreateCustomResource(ctx context.Context, 
    group, version, resource, namespace string, obj map[string]interface{}) error {
    
    gvr := schema.GroupVersionResource{
        Group:    group,
        Version:  version,
        Resource: resource,
    }

    unstructuredObj := &unstructured.Unstructured{
        Object: obj,
    }

    _, err := m.dynamicClient.Resource(gvr).Namespace(namespace).Create(
        ctx, unstructuredObj, metav1.CreateOptions{})
    
    return err
}

// WatchCustomResource watches changes to custom resources
func (m *CRDManager) WatchCustomResource(ctx context.Context, 
    group, version, resource, namespace string) error {
    
    gvr := schema.GroupVersionResource{
        Group:    group,
        Version:  version,
        Resource: resource,
    }

    watcher, err := m.dynamicClient.Resource(gvr).Namespace(namespace).Watch(
        ctx, metav1.ListOptions{})
    if err != nil {
        return err
    }
    defer watcher.Stop()

    for {
        select {
        case event := <-watcher.ResultChan():
            if event.Object == nil {
                return fmt.Errorf("unexpected nil object")
            }

            obj := event.Object.(*unstructured.Unstructured)
            fmt.Printf("Event: %s, Object: %s\n", event.Type, obj.GetName())

        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

// GetCustomResourceStatus retrieves the status of a custom resource
func (m *CRDManager) GetCustomResourceStatus(ctx context.Context,
    group, version, resource, namespace, name string) (map[string]interface{}, error) {
    
    gvr := schema.GroupVersionResource{
        Group:    group,
        Version:  version,
        Resource: resource,
    }

    obj, err := m.dynamicClient.Resource(gvr).Namespace(namespace).Get(
        ctx, name, metav1.GetOptions{})
    if err != nil {
        return nil, err
    }

    status, found, err := unstructured.NestedMap(obj.Object, "status")
    if err != nil || !found {
        return nil, fmt.Errorf("status not found")
    }

    return status, nil
}

Multi-Tenancy Patterns

Managing resources across multiple namespaces safely.

package main

import (
    "context"
    "fmt"
    "sync"

    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
)

type MultiTenantManager struct {
    clientset  *kubernetes.Clientset
    namespaces map[string]*TenantConfig
    mu         sync.RWMutex
}

type TenantConfig struct {
    Namespace     string
    ResourceQuota *corev1.ResourceQuota
    LimitRange    *corev1.LimitRange
}

func NewMultiTenantManager(clientset *kubernetes.Clientset) *MultiTenantManager {
    return &MultiTenantManager{
        clientset:  clientset,
        namespaces: make(map[string]*TenantConfig),
    }
}

// CreateTenant creates a new tenant with resource limits
func (m *MultiTenantManager) CreateTenant(ctx context.Context, tenantName string, 
    cpuQuota, memoryQuota string) error {
    
    m.mu.Lock()
    defer m.mu.Unlock()

    // Create namespace
    ns := &corev1.Namespace{
        ObjectMeta: metav1.ObjectMeta{
            Name: tenantName,
            Labels: map[string]string{
                "tenant": tenantName,
            },
        },
    }

    _, err := m.clientset.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
    if err != nil {
        return fmt.Errorf("failed to create namespace: %w", err)
    }

    // Create resource quota
    quota := &corev1.ResourceQuota{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "tenant-quota",
            Namespace: tenantName,
        },
        Spec: corev1.ResourceQuotaSpec{
            Hard: corev1.ResourceList{
                corev1.ResourceCPU:    resource.MustParse(cpuQuota),
                corev1.ResourceMemory: resource.MustParse(memoryQuota),
                corev1.ResourcePods:   resource.MustParse("100"),
            },
        },
    }

    _, err = m.clientset.CoreV1().ResourceQuotas(tenantName).Create(ctx, quota, metav1.CreateOptions{})
    if err != nil {
        return fmt.Errorf("failed to create resource quota: %w", err)
    }

    m.namespaces[tenantName] = &TenantConfig{
        Namespace:     tenantName,
        ResourceQuota: quota,
    }

    fmt.Printf("Created tenant: %s\n", tenantName)
    return nil
}

// ListTenantsResourceUsage lists resource usage for all tenants
func (m *MultiTenantManager) ListTenantsResourceUsage(ctx context.Context) error {
    m.mu.RLock()
    defer m.mu.RUnlock()

    for tenantName := range m.namespaces {
        quota, err := m.clientset.CoreV1().ResourceQuotas(tenantName).Get(
            ctx, "tenant-quota", metav1.GetOptions{})
        if err != nil {
            fmt.Printf("Error getting quota for %s: %v\n", tenantName, err)
            continue
        }

        fmt.Printf("\nTenant: %s\n", tenantName)
        fmt.Printf("  CPU: %s / %s\n", 
            quota.Status.Used[corev1.ResourceCPU],
            quota.Status.Hard[corev1.ResourceCPU])
        fmt.Printf("  Memory: %s / %s\n",
            quota.Status.Used[corev1.ResourceMemory],
            quota.Status.Hard[corev1.ResourceMemory])
        fmt.Printf("  Pods: %s / %s\n",
            quota.Status.Used[corev1.ResourcePods],
            quota.Status.Hard[corev1.ResourcePods])
    }

    return nil
}

Rate Limiting and Throttling

Properly configure and handle rate limiting.

package main

import (
    "context"
    "fmt"
    "time"

    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/util/flowcontrol"
)

// CreateRateLimitedClient creates a client with custom rate limits
func CreateRateLimitedClient(config *rest.Config, qps float32, burst int) (*kubernetes.Clientset, error) {
    // Configure rate limiting
    config.QPS = qps
    config.Burst = burst

    // Optional: Create custom rate limiter
    config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)

    return kubernetes.NewForConfig(config)
}

// AdaptiveRateLimiter adjusts rate based on server responses
type AdaptiveRateLimiter struct {
    clientset     *kubernetes.Clientset
    currentQPS    float32
    minQPS        float32
    maxQPS        float32
    adjustmentMu  sync.Mutex
}

func NewAdaptiveRateLimiter(clientset *kubernetes.Clientset) *AdaptiveRateLimiter {
    return &AdaptiveRateLimiter{
        clientset:  clientset,
        currentQPS: 50.0,
        minQPS:     10.0,
        maxQPS:     200.0,
    }
}

func (a *AdaptiveRateLimiter) ExecuteWithBackoff(ctx context.Context, operation func() error) error {
    backoff := time.Duration(100) * time.Millisecond

    for {
        err := operation()
        if err == nil {
            // Success, gradually increase rate
            a.increaseRate()
            return nil
        }

        if apierrors.IsTooManyRequests(err) {
            // Rate limited, decrease rate and retry
            a.decreaseRate()
            
            fmt.Printf("Rate limited, backing off for %v\n", backoff)
            time.Sleep(backoff)
            
            backoff *= 2
            if backoff > 30*time.Second {
                backoff = 30 * time.Second
            }
            continue
        }

        // Other error, return
        return err
    }
}

func (a *AdaptiveRateLimiter) increaseRate() {
    a.adjustmentMu.Lock()
    defer a.adjustmentMu.Unlock()

    newQPS := a.currentQPS * 1.1
    if newQPS > a.maxQPS {
        newQPS = a.maxQPS
    }
    a.currentQPS = newQPS
}

func (a *AdaptiveRateLimiter) decreaseRate() {
    a.adjustmentMu.Lock()
    defer a.adjustmentMu.Unlock()

    newQPS := a.currentQPS * 0.5
    if newQPS < a.minQPS {
        newQPS = a.minQPS
    }
    a.currentQPS = newQPS
}

Handling API Deprecations

Detect and handle deprecated API versions.

package main

import (
    "context"
    "fmt"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/discovery"
    "k8s.io/client-go/kubernetes"
)

type DeprecationChecker struct {
    discoveryClient *discovery.DiscoveryClient
}

func NewDeprecationChecker(clientset *kubernetes.Clientset) *DeprecationChecker {
    return &DeprecationChecker{
        discoveryClient: clientset.Discovery(),
    }
}

// CheckAPIVersion checks if an API version is available
func (c *DeprecationChecker) CheckAPIVersion(group, version string) (bool, error) {
    groups, err := c.discoveryClient.ServerGroups()
    if err != nil {
        return false, err
    }

    for _, g := range groups.Groups {
        if g.Name == group {
            for _, v := range g.Versions {
                if v.Version == version {
                    return true, nil
                }
            }
        }
    }

    return false, nil
}

// FindPreferredVersion finds the preferred version for a resource
func (c *DeprecationChecker) FindPreferredVersion(group string) (string, error) {
    groups, err := c.discoveryClient.ServerGroups()
    if err != nil {
        return "", err
    }

    for _, g := range groups.Groups {
        if g.Name == group {
            return g.PreferredVersion.Version, nil
        }
    }

    return "", fmt.Errorf("group %s not found", group)
}

// MigrateAPIVersion helps migrate from deprecated to newer API versions
func (c *DeprecationChecker) MigrateAPIVersion(ctx context.Context, 
    oldGroup, oldVersion, newGroup, newVersion, resource string) error {
    
    // Check if old version still exists
    oldExists, err := c.CheckAPIVersion(oldGroup, oldVersion)
    if err != nil {
        return err
    }

    // Check if new version exists
    newExists, err := c.CheckAPIVersion(newGroup, newVersion)
    if err != nil {
        return err
    }

    if !oldExists && !newExists {
        return fmt.Errorf("neither old nor new API version exists")
    }

    if !newExists {
        return fmt.Errorf("new API version not available, cannot migrate")
    }

    if oldExists {
        fmt.Printf("Warning: Using deprecated API %s/%s, should migrate to %s/%s\n",
            oldGroup, oldVersion, newGroup, newVersion)
    }

    return nil
}

Cross-Version Compatibility

Handle different Kubernetes versions.

package main

import (
    "context"
    "fmt"

    "k8s.io/apimachinery/pkg/version"
    "k8s.io/client-go/kubernetes"
)

type VersionCompatibilityChecker struct {
    clientset     *kubernetes.Clientset
    serverVersion *version.Info
}

func NewVersionCompatibilityChecker(clientset *kubernetes.Clientset) (*VersionCompatibilityChecker, error) {
    serverVersion, err := clientset.Discovery().ServerVersion()
    if err != nil {
        return nil, err
    }

    return &VersionCompatibilityChecker{
        clientset:     clientset,
        serverVersion: serverVersion,
    }, nil
}

// SupportsFeature checks if a feature is supported in the server version
func (c *VersionCompatibilityChecker) SupportsFeature(feature string) bool {
    // Parse server version and check feature support
    // This is simplified; in practice you'd use version comparison
    
    switch feature {
    case "server-side-apply":
        return c.serverVersion.Major >= "1" && c.serverVersion.Minor >= "18"
    case "ephemeral-containers":
        return c.serverVersion.Major >= "1" && c.serverVersion.Minor >= "23"
    case "pod-security-admission":
        return c.serverVersion.Major >= "1" && c.serverVersion.Minor >= "25"
    default:
        return false
    }
}

// GetServerVersion returns the Kubernetes server version
func (c *VersionCompatibilityChecker) GetServerVersion() string {
    return fmt.Sprintf("%s.%s", c.serverVersion.Major, c.serverVersion.Minor)
}

// Example: Conditional feature usage
func (c *VersionCompatibilityChecker) CreateResourceSafely(ctx context.Context) error {
    if c.SupportsFeature("server-side-apply") {
        fmt.Println("Using server-side apply (supported)")
        // Use server-side apply
    } else {
        fmt.Println("Falling back to client-side apply (server-side not supported)")
        // Use traditional update
    }

    return nil
}

Troubleshooting Common Issues

Connection Timeouts

func CreateClientWithTimeouts(config *rest.Config) (*kubernetes.Clientset, error) {
    // Set appropriate timeouts
    config.Timeout = 30 * time.Second
    config.QPS = 50
    config.Burst = 100

    // Configure dial timeout
    config.Dial = (&net.Dialer{
        Timeout:   30 * time.Second,
        KeepAlive: 30 * time.Second,
    }).DialContext

    return kubernetes.NewForConfig(config)
}

Memory Leaks in Watchers

func WatchWithProperCleanup(ctx context.Context, clientset *kubernetes.Clientset) {
    watcher, err := clientset.CoreV1().Pods("default").Watch(ctx, metav1.ListOptions{})
    if err != nil {
        panic(err)
    }

    // CRITICAL: Always stop the watcher
    defer watcher.Stop()

    // Use context for graceful shutdown
    for {
        select {
        case event := <-watcher.ResultChan():
            // Handle event
            _ = event
        case <-ctx.Done():
            // Context cancelled, return
            return
        }
    }
}

Next Steps

  • Review Real-World Scenarios
  • Explore Advanced Topics
  • Learn about Controllers

Install with Tessl CLI

npx tessl i tessl/golang-k8s-io--client-go

docs

examples

edge-cases.md

real-world-scenarios.md

index.md

tile.json