CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-k8s-io--client-go

Official Go client library for Kubernetes API - typed clients, controllers, and cluster interaction tools

Overview
Eval results
Files

real-world-scenarios.mddocs/examples/

Real-World Scenarios

This document provides practical examples of using k8s.io/client-go in real-world applications and use cases.

Table of Contents

  1. Building a Pod Monitor
  2. Automated Deployment Scaler
  3. Cluster Resource Reporter
  4. ConfigMap Synchronizer
  5. Custom Operator Pattern
  6. Multi-Cluster Manager
  7. Resource Cleanup Tool
  8. Log Aggregator

Building a Pod Monitor

Monitor pod status changes and send notifications.

package main

import (
    "context"
    "fmt"
    "time"

    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/watch"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

type PodMonitor struct {
    clientset *kubernetes.Clientset
    namespace string
}

func NewPodMonitor(clientset *kubernetes.Clientset, namespace string) *PodMonitor {
    return &PodMonitor{
        clientset: clientset,
        namespace: namespace,
    }
}

func (m *PodMonitor) Watch(ctx context.Context) error {
    watcher, err := m.clientset.CoreV1().Pods(m.namespace).Watch(ctx, metav1.ListOptions{})
    if err != nil {
        return err
    }
    defer watcher.Stop()

    fmt.Printf("Watching pods in namespace %s...\n", m.namespace)

    for {
        select {
        case event := <-watcher.ResultChan():
            if event.Object == nil {
                return fmt.Errorf("unexpected nil object")
            }

            pod := event.Object.(*corev1.Pod)
            
            switch event.Type {
            case watch.Added:
                fmt.Printf("[ADD] Pod %s created\n", pod.Name)
            case watch.Modified:
                m.handlePodUpdate(pod)
            case watch.Deleted:
                fmt.Printf("[DELETE] Pod %s deleted\n", pod.Name)
                m.sendNotification("Pod Deleted", pod.Name)
            case watch.Error:
                fmt.Printf("[ERROR] Watch error: %v\n", event.Object)
            }
            
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

func (m *PodMonitor) handlePodUpdate(pod *corev1.Pod) {
    // Check for failed containers
    for _, status := range pod.Status.ContainerStatuses {
        if status.State.Waiting != nil && status.State.Waiting.Reason == "CrashLoopBackOff" {
            fmt.Printf("[ALERT] Pod %s has container %s in CrashLoopBackOff\n", 
                pod.Name, status.Name)
            m.sendNotification("CrashLoopBackOff Detected", 
                fmt.Sprintf("Pod: %s, Container: %s", pod.Name, status.Name))
        }
    }

    // Check for resource issues
    if pod.Status.Phase == corev1.PodFailed {
        fmt.Printf("[ALERT] Pod %s failed: %s\n", pod.Name, pod.Status.Reason)
        m.sendNotification("Pod Failed", pod.Name)
    }
}

func (m *PodMonitor) sendNotification(subject, message string) {
    // Implement your notification logic here (email, Slack, etc.)
    fmt.Printf("[NOTIFICATION] %s: %s\n", subject, message)
}

func main() {
    config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    if err != nil {
        panic(err)
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    monitor := NewPodMonitor(clientset, "default")
    
    ctx := context.Background()
    if err := monitor.Watch(ctx); err != nil {
        panic(err)
    }
}

Automated Deployment Scaler

Scale deployments based on time or custom metrics.

package main

import (
    "context"
    "fmt"
    "time"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

type DeploymentScaler struct {
    clientset *kubernetes.Clientset
}

func NewDeploymentScaler(clientset *kubernetes.Clientset) *DeploymentScaler {
    return &DeploymentScaler{clientset: clientset}
}

// ScaleDeployment scales a deployment to the specified replica count
func (s *DeploymentScaler) ScaleDeployment(ctx context.Context, namespace, name string, replicas int32) error {
    deployment, err := s.clientset.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
    if err != nil {
        return fmt.Errorf("failed to get deployment: %w", err)
    }

    if *deployment.Spec.Replicas == replicas {
        fmt.Printf("Deployment %s already at %d replicas\n", name, replicas)
        return nil
    }

    deployment.Spec.Replicas = &replicas
    
    _, err = s.clientset.AppsV1().Deployments(namespace).Update(ctx, deployment, metav1.UpdateOptions{})
    if err != nil {
        return fmt.Errorf("failed to update deployment: %w", err)
    }

    fmt.Printf("Scaled deployment %s to %d replicas\n", name, replicas)
    return nil
}

// ScheduleScaling performs time-based scaling
func (s *DeploymentScaler) ScheduleScaling(ctx context.Context, namespace, name string) {
    ticker := time.NewTicker(1 * time.Hour)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            hour := time.Now().Hour()
            
            var replicas int32
            // Business hours: 9 AM - 6 PM
            if hour >= 9 && hour < 18 {
                replicas = 5 // Scale up during business hours
            } else {
                replicas = 1 // Scale down during off-hours
            }
            
            if err := s.ScaleDeployment(ctx, namespace, name, replicas); err != nil {
                fmt.Printf("Error scaling deployment: %v\n", err)
            }
            
        case <-ctx.Done():
            return
        }
    }
}

// ScaleBasedOnPodCount scales based on pod count in another deployment
func (s *DeploymentScaler) ScaleBasedOnPodCount(ctx context.Context, 
    watchNamespace, watchDeployment, targetNamespace, targetDeployment string, ratio float64) error {
    
    sourceDep, err := s.clientset.AppsV1().Deployments(watchNamespace).Get(ctx, watchDeployment, metav1.GetOptions{})
    if err != nil {
        return err
    }

    sourceReplicas := *sourceDep.Spec.Replicas
    targetReplicas := int32(float64(sourceReplicas) * ratio)
    
    if targetReplicas < 1 {
        targetReplicas = 1
    }

    return s.ScaleDeployment(ctx, targetNamespace, targetDeployment, targetReplicas)
}

func main() {
    config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    if err != nil {
        panic(err)
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    scaler := NewDeploymentScaler(clientset)
    
    ctx := context.Background()
    
    // Example: Schedule-based scaling
    go scaler.ScheduleScaling(ctx, "production", "api-server")
    
    // Example: Scale based on another deployment (1:2 ratio)
    if err := scaler.ScaleBasedOnPodCount(ctx, 
        "production", "frontend", 
        "production", "backend", 
        2.0); err != nil {
        fmt.Printf("Error: %v\n", err)
    }

    select {} // Keep running
}

Cluster Resource Reporter

Generate reports on cluster resource usage.

package main

import (
    "context"
    "fmt"

    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

type ResourceReport struct {
    Namespace      string
    PodCount       int
    NodeCount      int
    CPURequests    int64
    MemoryRequests int64
    CPULimits      int64
    MemoryLimits   int64
}

type ClusterReporter struct {
    clientset *kubernetes.Clientset
}

func NewClusterReporter(clientset *kubernetes.Clientset) *ClusterReporter {
    return &ClusterReporter{clientset: clientset}
}

func (r *ClusterReporter) GenerateNamespaceReport(ctx context.Context, namespace string) (*ResourceReport, error) {
    report := &ResourceReport{
        Namespace: namespace,
    }

    // Get all pods in namespace
    pods, err := r.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
    if err != nil {
        return nil, err
    }

    report.PodCount = len(pods.Items)

    // Calculate resource requests and limits
    for _, pod := range pods.Items {
        if pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
            continue
        }

        for _, container := range pod.Spec.Containers {
            if cpu := container.Resources.Requests.Cpu(); cpu != nil {
                report.CPURequests += cpu.MilliValue()
            }
            if mem := container.Resources.Requests.Memory(); mem != nil {
                report.MemoryRequests += mem.Value()
            }
            if cpu := container.Resources.Limits.Cpu(); cpu != nil {
                report.CPULimits += cpu.MilliValue()
            }
            if mem := container.Resources.Limits.Memory(); mem != nil {
                report.MemoryLimits += mem.Value()
            }
        }
    }

    return report, nil
}

func (r *ClusterReporter) GenerateClusterReport(ctx context.Context) ([]*ResourceReport, error) {
    var reports []*ResourceReport

    // Get all namespaces
    namespaces, err := r.clientset.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
    if err != nil {
        return nil, err
    }

    // Generate report for each namespace
    for _, ns := range namespaces.Items {
        report, err := r.GenerateNamespaceReport(ctx, ns.Name)
        if err != nil {
            fmt.Printf("Error generating report for namespace %s: %v\n", ns.Name, err)
            continue
        }
        reports = append(reports, report)
    }

    // Get node information
    nodes, err := r.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
    if err == nil {
        for _, report := range reports {
            report.NodeCount = len(nodes.Items)
        }
    }

    return reports, nil
}

func (r *ClusterReporter) PrintReport(report *ResourceReport) {
    fmt.Printf("\n=== Namespace: %s ===\n", report.Namespace)
    fmt.Printf("Pods: %d\n", report.PodCount)
    fmt.Printf("CPU Requests: %.2f cores\n", float64(report.CPURequests)/1000.0)
    fmt.Printf("Memory Requests: %.2f GB\n", float64(report.MemoryRequests)/(1024*1024*1024))
    fmt.Printf("CPU Limits: %.2f cores\n", float64(report.CPULimits)/1000.0)
    fmt.Printf("Memory Limits: %.2f GB\n", float64(report.MemoryLimits)/(1024*1024*1024))
}

func main() {
    config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    if err != nil {
        panic(err)
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    reporter := NewClusterReporter(clientset)
    
    ctx := context.Background()
    
    // Generate and print cluster-wide report
    reports, err := reporter.GenerateClusterReport(ctx)
    if err != nil {
        panic(err)
    }

    fmt.Println("=== CLUSTER RESOURCE REPORT ===")
    for _, report := range reports {
        if report.PodCount > 0 {
            reporter.PrintReport(report)
        }
    }
}

ConfigMap Synchronizer

Synchronize ConfigMaps across namespaces.

package main

import (
    "context"
    "fmt"

    corev1 "k8s.io/api/core/v1"
    apierrors "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

type ConfigMapSynchronizer struct {
    clientset *kubernetes.Clientset
}

func NewConfigMapSynchronizer(clientset *kubernetes.Clientset) *ConfigMapSynchronizer {
    return &ConfigMapSynchronizer{clientset: clientset}
}

func (s *ConfigMapSynchronizer) SyncConfigMap(ctx context.Context, 
    sourceNamespace, sourceName, targetNamespace, targetName string) error {
    
    // Get source ConfigMap
    sourceConfigMap, err := s.clientset.CoreV1().ConfigMaps(sourceNamespace).Get(
        ctx, sourceName, metav1.GetOptions{})
    if err != nil {
        return fmt.Errorf("failed to get source ConfigMap: %w", err)
    }

    // Create target ConfigMap
    targetConfigMap := &corev1.ConfigMap{
        ObjectMeta: metav1.ObjectMeta{
            Name:      targetName,
            Namespace: targetNamespace,
            Labels: map[string]string{
                "synced-from": fmt.Sprintf("%s/%s", sourceNamespace, sourceName),
            },
        },
        Data:       sourceConfigMap.Data,
        BinaryData: sourceConfigMap.BinaryData,
    }

    // Try to create or update
    existing, err := s.clientset.CoreV1().ConfigMaps(targetNamespace).Get(
        ctx, targetName, metav1.GetOptions{})
    
    if apierrors.IsNotFound(err) {
        // Create new ConfigMap
        _, err = s.clientset.CoreV1().ConfigMaps(targetNamespace).Create(
            ctx, targetConfigMap, metav1.CreateOptions{})
        if err != nil {
            return fmt.Errorf("failed to create ConfigMap: %w", err)
        }
        fmt.Printf("Created ConfigMap %s/%s\n", targetNamespace, targetName)
    } else if err != nil {
        return fmt.Errorf("failed to get existing ConfigMap: %w", err)
    } else {
        // Update existing ConfigMap
        existing.Data = sourceConfigMap.Data
        existing.BinaryData = sourceConfigMap.BinaryData
        _, err = s.clientset.CoreV1().ConfigMaps(targetNamespace).Update(
            ctx, existing, metav1.UpdateOptions{})
        if err != nil {
            return fmt.Errorf("failed to update ConfigMap: %w", err)
        }
        fmt.Printf("Updated ConfigMap %s/%s\n", targetNamespace, targetName)
    }

    return nil
}

func (s *ConfigMapSynchronizer) SyncToMultipleNamespaces(ctx context.Context, 
    sourceNamespace, sourceName string, targetNamespaces []string) error {
    
    for _, targetNS := range targetNamespaces {
        if err := s.SyncConfigMap(ctx, sourceNamespace, sourceName, targetNS, sourceName); err != nil {
            fmt.Printf("Failed to sync to %s: %v\n", targetNS, err)
            continue
        }
    }

    return nil
}

func main() {
    config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    if err != nil {
        panic(err)
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    syncer := NewConfigMapSynchronizer(clientset)
    
    ctx := context.Background()
    
    // Sync a ConfigMap from default to multiple namespaces
    targetNamespaces := []string{"dev", "staging", "production"}
    if err := syncer.SyncToMultipleNamespaces(ctx, "default", "app-config", targetNamespaces); err != nil {
        panic(err)
    }
}

Custom Operator Pattern

Simplified custom operator implementation.

package main

import (
    "context"
    "fmt"
    "time"

    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/watch"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/workqueue"
)

// Simple operator that ensures pods with a specific annotation have a sidecar container

type PodOperator struct {
    clientset      *kubernetes.Clientset
    informer       cache.SharedIndexInformer
    queue          workqueue.RateLimitingInterface
    sidecarImage   string
    annotationKey  string
}

func NewPodOperator(clientset *kubernetes.Clientset) *PodOperator {
    informer := cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                return clientset.CoreV1().Pods("").List(context.TODO(), options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                return clientset.CoreV1().Pods("").Watch(context.TODO(), options)
            },
        },
        &corev1.Pod{},
        time.Minute*10,
        cache.Indexers{},
    )

    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

    operator := &PodOperator{
        clientset:     clientset,
        informer:      informer,
        queue:         queue,
        sidecarImage:  "logging-sidecar:latest",
        annotationKey: "inject-sidecar",
    }

    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(newObj)
            if err == nil {
                queue.Add(key)
            }
        },
    })

    return operator
}

func (o *PodOperator) Run(ctx context.Context, workers int) {
    defer o.queue.ShutDown()

    go o.informer.Run(ctx.Done())

    if !cache.WaitForCacheSync(ctx.Done(), o.informer.HasSynced) {
        fmt.Println("Failed to sync cache")
        return
    }

    for i := 0; i < workers; i++ {
        go o.runWorker(ctx)
    }

    <-ctx.Done()
}

func (o *PodOperator) runWorker(ctx context.Context) {
    for o.processNextItem(ctx) {
    }
}

func (o *PodOperator) processNextItem(ctx context.Context) bool {
    key, quit := o.queue.Get()
    if quit {
        return false
    }
    defer o.queue.Done(key)

    err := o.syncHandler(ctx, key.(string))
    if err == nil {
        o.queue.Forget(key)
        return true
    }

    o.queue.AddRateLimited(key)
    fmt.Printf("Error processing %s: %v\n", key, err)
    return true
}

func (o *PodOperator) syncHandler(ctx context.Context, key string) error {
    obj, exists, err := o.informer.GetIndexer().GetByKey(key)
    if err != nil {
        return err
    }

    if !exists {
        return nil
    }

    pod := obj.(*corev1.Pod)

    // Check if pod needs sidecar injection
    if pod.Annotations[o.annotationKey] != "true" {
        return nil
    }

    // Check if sidecar already exists
    for _, container := range pod.Spec.Containers {
        if container.Image == o.sidecarImage {
            return nil // Sidecar already injected
        }
    }

    fmt.Printf("Pod %s/%s needs sidecar injection (annotation present)\n", 
        pod.Namespace, pod.Name)

    // In a real operator, you would modify the pod spec via a mutating webhook
    // This is just for demonstration
    return nil
}

func main() {
    config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    if err != nil {
        panic(err)
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    operator := NewPodOperator(clientset)
    
    ctx := context.Background()
    operator.Run(ctx, 2)
}

Resource Cleanup Tool

Clean up old or unused resources.

package main

import (
    "context"
    "fmt"
    "time"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

type ResourceCleaner struct {
    clientset *kubernetes.Clientset
    dryRun    bool
}

func NewResourceCleaner(clientset *kubernetes.Clientset, dryRun bool) *ResourceCleaner {
    return &ResourceCleaner{
        clientset: clientset,
        dryRun:    dryRun,
    }
}

func (c *ResourceCleaner) CleanupCompletedJobs(ctx context.Context, namespace string, olderThan time.Duration) error {
    jobs, err := c.clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{})
    if err != nil {
        return err
    }

    cutoffTime := time.Now().Add(-olderThan)

    for _, job := range jobs.Items {
        if job.Status.CompletionTime == nil {
            continue // Job not completed
        }

        if job.Status.CompletionTime.Time.Before(cutoffTime) {
            fmt.Printf("Deleting completed job: %s/%s (completed %v ago)\n",
                job.Namespace, job.Name, time.Since(job.Status.CompletionTime.Time))

            if !c.dryRun {
                deletePolicy := metav1.DeletePropagationBackground
                err := c.clientset.BatchV1().Jobs(namespace).Delete(ctx, job.Name, metav1.DeleteOptions{
                    PropagationPolicy: &deletePolicy,
                })
                if err != nil {
                    fmt.Printf("Failed to delete job %s: %v\n", job.Name, err)
                }
            }
        }
    }

    return nil
}

func (c *ResourceCleaner) CleanupFailedPods(ctx context.Context, namespace string, olderThan time.Duration) error {
    pods, err := c.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
        FieldSelector: "status.phase=Failed",
    })
    if err != nil {
        return err
    }

    cutoffTime := time.Now().Add(-olderThan)

    for _, pod := range pods.Items {
        if pod.CreationTimestamp.Time.Before(cutoffTime) {
            fmt.Printf("Deleting failed pod: %s/%s (created %v ago)\n",
                pod.Namespace, pod.Name, time.Since(pod.CreationTimestamp.Time))

            if !c.dryRun {
                err := c.clientset.CoreV1().Pods(namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
                if err != nil {
                    fmt.Printf("Failed to delete pod %s: %v\n", pod.Name, err)
                }
            }
        }
    }

    return nil
}

func main() {
    config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    if err != nil {
        panic(err)
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    cleaner := NewResourceCleaner(clientset, true) // dry-run mode
    
    ctx := context.Background()
    
    // Clean up completed jobs older than 7 days
    if err := cleaner.CleanupCompletedJobs(ctx, "", 7*24*time.Hour); err != nil {
        fmt.Printf("Error cleaning jobs: %v\n", err)
    }

    // Clean up failed pods older than 1 day
    if err := cleaner.CleanupFailedPods(ctx, "", 24*time.Hour); err != nil {
        fmt.Printf("Error cleaning pods: %v\n", err)
    }
}

Next Steps

  • Explore Edge Cases and Advanced Scenarios
  • Review Reference Documentation
  • Learn about Controllers and Informers

Install with Tessl CLI

npx tessl i tessl/golang-k8s-io--client-go

docs

examples

edge-cases.md

real-world-scenarios.md

index.md

tile.json