Official Go client library for Kubernetes API - typed clients, controllers, and cluster interaction tools
This document provides practical examples of using k8s.io/client-go in real-world applications and use cases.
Monitor pod status changes and send notifications.
package main
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
type PodMonitor struct {
clientset *kubernetes.Clientset
namespace string
}
func NewPodMonitor(clientset *kubernetes.Clientset, namespace string) *PodMonitor {
return &PodMonitor{
clientset: clientset,
namespace: namespace,
}
}
func (m *PodMonitor) Watch(ctx context.Context) error {
watcher, err := m.clientset.CoreV1().Pods(m.namespace).Watch(ctx, metav1.ListOptions{})
if err != nil {
return err
}
defer watcher.Stop()
fmt.Printf("Watching pods in namespace %s...\n", m.namespace)
for {
select {
case event := <-watcher.ResultChan():
if event.Object == nil {
return fmt.Errorf("unexpected nil object")
}
pod := event.Object.(*corev1.Pod)
switch event.Type {
case watch.Added:
fmt.Printf("[ADD] Pod %s created\n", pod.Name)
case watch.Modified:
m.handlePodUpdate(pod)
case watch.Deleted:
fmt.Printf("[DELETE] Pod %s deleted\n", pod.Name)
m.sendNotification("Pod Deleted", pod.Name)
case watch.Error:
fmt.Printf("[ERROR] Watch error: %v\n", event.Object)
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (m *PodMonitor) handlePodUpdate(pod *corev1.Pod) {
// Check for failed containers
for _, status := range pod.Status.ContainerStatuses {
if status.State.Waiting != nil && status.State.Waiting.Reason == "CrashLoopBackOff" {
fmt.Printf("[ALERT] Pod %s has container %s in CrashLoopBackOff\n",
pod.Name, status.Name)
m.sendNotification("CrashLoopBackOff Detected",
fmt.Sprintf("Pod: %s, Container: %s", pod.Name, status.Name))
}
}
// Check for resource issues
if pod.Status.Phase == corev1.PodFailed {
fmt.Printf("[ALERT] Pod %s failed: %s\n", pod.Name, pod.Status.Reason)
m.sendNotification("Pod Failed", pod.Name)
}
}
func (m *PodMonitor) sendNotification(subject, message string) {
// Implement your notification logic here (email, Slack, etc.)
fmt.Printf("[NOTIFICATION] %s: %s\n", subject, message)
}
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
monitor := NewPodMonitor(clientset, "default")
ctx := context.Background()
if err := monitor.Watch(ctx); err != nil {
panic(err)
}
}Scale deployments based on time or custom metrics.
package main
import (
"context"
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
type DeploymentScaler struct {
clientset *kubernetes.Clientset
}
func NewDeploymentScaler(clientset *kubernetes.Clientset) *DeploymentScaler {
return &DeploymentScaler{clientset: clientset}
}
// ScaleDeployment scales a deployment to the specified replica count
func (s *DeploymentScaler) ScaleDeployment(ctx context.Context, namespace, name string, replicas int32) error {
deployment, err := s.clientset.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get deployment: %w", err)
}
if *deployment.Spec.Replicas == replicas {
fmt.Printf("Deployment %s already at %d replicas\n", name, replicas)
return nil
}
deployment.Spec.Replicas = &replicas
_, err = s.clientset.AppsV1().Deployments(namespace).Update(ctx, deployment, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update deployment: %w", err)
}
fmt.Printf("Scaled deployment %s to %d replicas\n", name, replicas)
return nil
}
// ScheduleScaling performs time-based scaling
func (s *DeploymentScaler) ScheduleScaling(ctx context.Context, namespace, name string) {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
hour := time.Now().Hour()
var replicas int32
// Business hours: 9 AM - 6 PM
if hour >= 9 && hour < 18 {
replicas = 5 // Scale up during business hours
} else {
replicas = 1 // Scale down during off-hours
}
if err := s.ScaleDeployment(ctx, namespace, name, replicas); err != nil {
fmt.Printf("Error scaling deployment: %v\n", err)
}
case <-ctx.Done():
return
}
}
}
// ScaleBasedOnPodCount scales based on pod count in another deployment
func (s *DeploymentScaler) ScaleBasedOnPodCount(ctx context.Context,
watchNamespace, watchDeployment, targetNamespace, targetDeployment string, ratio float64) error {
sourceDep, err := s.clientset.AppsV1().Deployments(watchNamespace).Get(ctx, watchDeployment, metav1.GetOptions{})
if err != nil {
return err
}
sourceReplicas := *sourceDep.Spec.Replicas
targetReplicas := int32(float64(sourceReplicas) * ratio)
if targetReplicas < 1 {
targetReplicas = 1
}
return s.ScaleDeployment(ctx, targetNamespace, targetDeployment, targetReplicas)
}
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
scaler := NewDeploymentScaler(clientset)
ctx := context.Background()
// Example: Schedule-based scaling
go scaler.ScheduleScaling(ctx, "production", "api-server")
// Example: Scale based on another deployment (1:2 ratio)
if err := scaler.ScaleBasedOnPodCount(ctx,
"production", "frontend",
"production", "backend",
2.0); err != nil {
fmt.Printf("Error: %v\n", err)
}
select {} // Keep running
}Generate reports on cluster resource usage.
package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
type ResourceReport struct {
Namespace string
PodCount int
NodeCount int
CPURequests int64
MemoryRequests int64
CPULimits int64
MemoryLimits int64
}
type ClusterReporter struct {
clientset *kubernetes.Clientset
}
func NewClusterReporter(clientset *kubernetes.Clientset) *ClusterReporter {
return &ClusterReporter{clientset: clientset}
}
func (r *ClusterReporter) GenerateNamespaceReport(ctx context.Context, namespace string) (*ResourceReport, error) {
report := &ResourceReport{
Namespace: namespace,
}
// Get all pods in namespace
pods, err := r.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
report.PodCount = len(pods.Items)
// Calculate resource requests and limits
for _, pod := range pods.Items {
if pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
continue
}
for _, container := range pod.Spec.Containers {
if cpu := container.Resources.Requests.Cpu(); cpu != nil {
report.CPURequests += cpu.MilliValue()
}
if mem := container.Resources.Requests.Memory(); mem != nil {
report.MemoryRequests += mem.Value()
}
if cpu := container.Resources.Limits.Cpu(); cpu != nil {
report.CPULimits += cpu.MilliValue()
}
if mem := container.Resources.Limits.Memory(); mem != nil {
report.MemoryLimits += mem.Value()
}
}
}
return report, nil
}
func (r *ClusterReporter) GenerateClusterReport(ctx context.Context) ([]*ResourceReport, error) {
var reports []*ResourceReport
// Get all namespaces
namespaces, err := r.clientset.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
// Generate report for each namespace
for _, ns := range namespaces.Items {
report, err := r.GenerateNamespaceReport(ctx, ns.Name)
if err != nil {
fmt.Printf("Error generating report for namespace %s: %v\n", ns.Name, err)
continue
}
reports = append(reports, report)
}
// Get node information
nodes, err := r.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err == nil {
for _, report := range reports {
report.NodeCount = len(nodes.Items)
}
}
return reports, nil
}
func (r *ClusterReporter) PrintReport(report *ResourceReport) {
fmt.Printf("\n=== Namespace: %s ===\n", report.Namespace)
fmt.Printf("Pods: %d\n", report.PodCount)
fmt.Printf("CPU Requests: %.2f cores\n", float64(report.CPURequests)/1000.0)
fmt.Printf("Memory Requests: %.2f GB\n", float64(report.MemoryRequests)/(1024*1024*1024))
fmt.Printf("CPU Limits: %.2f cores\n", float64(report.CPULimits)/1000.0)
fmt.Printf("Memory Limits: %.2f GB\n", float64(report.MemoryLimits)/(1024*1024*1024))
}
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
reporter := NewClusterReporter(clientset)
ctx := context.Background()
// Generate and print cluster-wide report
reports, err := reporter.GenerateClusterReport(ctx)
if err != nil {
panic(err)
}
fmt.Println("=== CLUSTER RESOURCE REPORT ===")
for _, report := range reports {
if report.PodCount > 0 {
reporter.PrintReport(report)
}
}
}Synchronize ConfigMaps across namespaces.
package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
type ConfigMapSynchronizer struct {
clientset *kubernetes.Clientset
}
func NewConfigMapSynchronizer(clientset *kubernetes.Clientset) *ConfigMapSynchronizer {
return &ConfigMapSynchronizer{clientset: clientset}
}
func (s *ConfigMapSynchronizer) SyncConfigMap(ctx context.Context,
sourceNamespace, sourceName, targetNamespace, targetName string) error {
// Get source ConfigMap
sourceConfigMap, err := s.clientset.CoreV1().ConfigMaps(sourceNamespace).Get(
ctx, sourceName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get source ConfigMap: %w", err)
}
// Create target ConfigMap
targetConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: targetName,
Namespace: targetNamespace,
Labels: map[string]string{
"synced-from": fmt.Sprintf("%s/%s", sourceNamespace, sourceName),
},
},
Data: sourceConfigMap.Data,
BinaryData: sourceConfigMap.BinaryData,
}
// Try to create or update
existing, err := s.clientset.CoreV1().ConfigMaps(targetNamespace).Get(
ctx, targetName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// Create new ConfigMap
_, err = s.clientset.CoreV1().ConfigMaps(targetNamespace).Create(
ctx, targetConfigMap, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create ConfigMap: %w", err)
}
fmt.Printf("Created ConfigMap %s/%s\n", targetNamespace, targetName)
} else if err != nil {
return fmt.Errorf("failed to get existing ConfigMap: %w", err)
} else {
// Update existing ConfigMap
existing.Data = sourceConfigMap.Data
existing.BinaryData = sourceConfigMap.BinaryData
_, err = s.clientset.CoreV1().ConfigMaps(targetNamespace).Update(
ctx, existing, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update ConfigMap: %w", err)
}
fmt.Printf("Updated ConfigMap %s/%s\n", targetNamespace, targetName)
}
return nil
}
func (s *ConfigMapSynchronizer) SyncToMultipleNamespaces(ctx context.Context,
sourceNamespace, sourceName string, targetNamespaces []string) error {
for _, targetNS := range targetNamespaces {
if err := s.SyncConfigMap(ctx, sourceNamespace, sourceName, targetNS, sourceName); err != nil {
fmt.Printf("Failed to sync to %s: %v\n", targetNS, err)
continue
}
}
return nil
}
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
syncer := NewConfigMapSynchronizer(clientset)
ctx := context.Background()
// Sync a ConfigMap from default to multiple namespaces
targetNamespaces := []string{"dev", "staging", "production"}
if err := syncer.SyncToMultipleNamespaces(ctx, "default", "app-config", targetNamespaces); err != nil {
panic(err)
}
}Simplified custom operator implementation.
package main
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
)
// Simple operator that ensures pods with a specific annotation have a sidecar container
type PodOperator struct {
clientset *kubernetes.Clientset
informer cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
sidecarImage string
annotationKey string
}
func NewPodOperator(clientset *kubernetes.Clientset) *PodOperator {
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.CoreV1().Pods("").List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.CoreV1().Pods("").Watch(context.TODO(), options)
},
},
&corev1.Pod{},
time.Minute*10,
cache.Indexers{},
)
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
operator := &PodOperator{
clientset: clientset,
informer: informer,
queue: queue,
sidecarImage: "logging-sidecar:latest",
annotationKey: "inject-sidecar",
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err == nil {
queue.Add(key)
}
},
})
return operator
}
func (o *PodOperator) Run(ctx context.Context, workers int) {
defer o.queue.ShutDown()
go o.informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), o.informer.HasSynced) {
fmt.Println("Failed to sync cache")
return
}
for i := 0; i < workers; i++ {
go o.runWorker(ctx)
}
<-ctx.Done()
}
func (o *PodOperator) runWorker(ctx context.Context) {
for o.processNextItem(ctx) {
}
}
func (o *PodOperator) processNextItem(ctx context.Context) bool {
key, quit := o.queue.Get()
if quit {
return false
}
defer o.queue.Done(key)
err := o.syncHandler(ctx, key.(string))
if err == nil {
o.queue.Forget(key)
return true
}
o.queue.AddRateLimited(key)
fmt.Printf("Error processing %s: %v\n", key, err)
return true
}
func (o *PodOperator) syncHandler(ctx context.Context, key string) error {
obj, exists, err := o.informer.GetIndexer().GetByKey(key)
if err != nil {
return err
}
if !exists {
return nil
}
pod := obj.(*corev1.Pod)
// Check if pod needs sidecar injection
if pod.Annotations[o.annotationKey] != "true" {
return nil
}
// Check if sidecar already exists
for _, container := range pod.Spec.Containers {
if container.Image == o.sidecarImage {
return nil // Sidecar already injected
}
}
fmt.Printf("Pod %s/%s needs sidecar injection (annotation present)\n",
pod.Namespace, pod.Name)
// In a real operator, you would modify the pod spec via a mutating webhook
// This is just for demonstration
return nil
}
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
operator := NewPodOperator(clientset)
ctx := context.Background()
operator.Run(ctx, 2)
}Clean up old or unused resources.
package main
import (
"context"
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
type ResourceCleaner struct {
clientset *kubernetes.Clientset
dryRun bool
}
func NewResourceCleaner(clientset *kubernetes.Clientset, dryRun bool) *ResourceCleaner {
return &ResourceCleaner{
clientset: clientset,
dryRun: dryRun,
}
}
func (c *ResourceCleaner) CleanupCompletedJobs(ctx context.Context, namespace string, olderThan time.Duration) error {
jobs, err := c.clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
cutoffTime := time.Now().Add(-olderThan)
for _, job := range jobs.Items {
if job.Status.CompletionTime == nil {
continue // Job not completed
}
if job.Status.CompletionTime.Time.Before(cutoffTime) {
fmt.Printf("Deleting completed job: %s/%s (completed %v ago)\n",
job.Namespace, job.Name, time.Since(job.Status.CompletionTime.Time))
if !c.dryRun {
deletePolicy := metav1.DeletePropagationBackground
err := c.clientset.BatchV1().Jobs(namespace).Delete(ctx, job.Name, metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
})
if err != nil {
fmt.Printf("Failed to delete job %s: %v\n", job.Name, err)
}
}
}
}
return nil
}
func (c *ResourceCleaner) CleanupFailedPods(ctx context.Context, namespace string, olderThan time.Duration) error {
pods, err := c.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
FieldSelector: "status.phase=Failed",
})
if err != nil {
return err
}
cutoffTime := time.Now().Add(-olderThan)
for _, pod := range pods.Items {
if pod.CreationTimestamp.Time.Before(cutoffTime) {
fmt.Printf("Deleting failed pod: %s/%s (created %v ago)\n",
pod.Namespace, pod.Name, time.Since(pod.CreationTimestamp.Time))
if !c.dryRun {
err := c.clientset.CoreV1().Pods(namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil {
fmt.Printf("Failed to delete pod %s: %v\n", pod.Name, err)
}
}
}
}
return nil
}
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
cleaner := NewResourceCleaner(clientset, true) // dry-run mode
ctx := context.Background()
// Clean up completed jobs older than 7 days
if err := cleaner.CleanupCompletedJobs(ctx, "", 7*24*time.Hour); err != nil {
fmt.Printf("Error cleaning jobs: %v\n", err)
}
// Clean up failed pods older than 1 day
if err := cleaner.CleanupFailedPods(ctx, "", 24*time.Hour); err != nil {
fmt.Printf("Error cleaning pods: %v\n", err)
}
}Install with Tessl CLI
npx tessl i tessl/golang-k8s-io--client-go@0.35.0