Official Go client library for Kubernetes API - typed clients, controllers, and cluster interaction tools
This document covers advanced scenarios, edge cases, and troubleshooting patterns when using k8s.io/client-go.
When updating resources, you may encounter conflicts due to optimistic concurrency control.
package main
import (
"context"
"fmt"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)
// UpdateWithRetry retries an update operation on conflict
func UpdateWithRetry(clientset *kubernetes.Clientset, namespace, deploymentName string,
updateFunc func(*appsv1.Deployment)) error {
return wait.ExponentialBackoff(wait.Backoff{
Duration: 10 * time.Millisecond,
Factor: 2.0,
Steps: 5,
Cap: 1 * time.Second,
}, func() (bool, error) {
// Get latest version
deployment, err := clientset.AppsV1().Deployments(namespace).Get(
context.TODO(), deploymentName, metav1.GetOptions{})
if err != nil {
return false, err
}
// Apply changes
updateFunc(deployment)
// Try to update
_, err = clientset.AppsV1().Deployments(namespace).Update(
context.TODO(), deployment, metav1.UpdateOptions{})
if err != nil {
if apierrors.IsConflict(err) {
// Conflict, retry
return false, nil
}
// Other error, fail
return false, err
}
// Success
return true, nil
})
}
// Example usage
func Example_UpdateWithRetry(clientset *kubernetes.Clientset) {
err := UpdateWithRetry(clientset, "default", "my-deployment",
func(deployment *appsv1.Deployment) {
replicas := int32(5)
deployment.Spec.Replicas = &replicas
})
if err != nil {
fmt.Printf("Failed to update: %v\n", err)
}
}Use pagination to efficiently handle large lists.
package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
// ListPodsWithPagination lists all pods using pagination
func ListPodsWithPagination(clientset *kubernetes.Clientset, namespace string, pageSize int64) ([]corev1.Pod, error) {
var allPods []corev1.Pod
var continueToken string
for {
listOptions := metav1.ListOptions{
Limit: pageSize,
Continue: continueToken,
}
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), listOptions)
if err != nil {
return nil, err
}
allPods = append(allPods, pods.Items...)
continueToken = pods.Continue
if continueToken == "" {
// No more pages
break
}
fmt.Printf("Fetched %d pods so far...\n", len(allPods))
}
return allPods, nil
}
// StreamingProcessor processes large lists without loading everything into memory
func StreamingProcessor(clientset *kubernetes.Clientset, namespace string) error {
var continueToken string
pageSize := int64(100)
for {
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
Limit: pageSize,
Continue: continueToken,
})
if err != nil {
return err
}
// Process each pod individually
for _, pod := range pods.Items {
processPod(&pod)
}
continueToken = pods.Continue
if continueToken == "" {
break
}
}
return nil
}
func processPod(pod *corev1.Pod) {
// Process pod...
fmt.Printf("Processing pod: %s\n", pod.Name)
}Implement robust retry logic for transient failures.
package main
import (
"context"
"fmt"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)
type RetryConfig struct {
MaxRetries int
InitialWait time.Duration
MaxWait time.Duration
Multiplier float64
}
// RetryOnError retries an operation with exponential backoff
func RetryOnError(config RetryConfig, operation func() error) error {
backoff := wait.Backoff{
Duration: config.InitialWait,
Factor: config.Multiplier,
Steps: config.MaxRetries,
Cap: config.MaxWait,
}
return wait.ExponentialBackoff(backoff, func() (bool, error) {
err := operation()
if err == nil {
return true, nil // Success
}
// Check if error is retryable
if isRetryableError(err) {
fmt.Printf("Retryable error: %v, retrying...\n", err)
return false, nil // Retry
}
// Non-retryable error
return false, err
})
}
func isRetryableError(err error) bool {
// Retry on server errors, timeouts, and conflicts
return apierrors.IsServerTimeout(err) ||
apierrors.IsServiceUnavailable(err) ||
apierrors.IsTimeout(err) ||
apierrors.IsTooManyRequests(err) ||
apierrors.IsInternalError(err) ||
apierrors.IsConflict(err)
}
// Example: Create pod with retry
func CreatePodWithRetry(clientset *kubernetes.Clientset, pod *corev1.Pod) error {
config := RetryConfig{
MaxRetries: 5,
InitialWait: 100 * time.Millisecond,
MaxWait: 30 * time.Second,
Multiplier: 2.0,
}
return RetryOnError(config, func() error {
_, err := clientset.CoreV1().Pods(pod.Namespace).Create(
context.TODO(), pod, metav1.CreateOptions{})
return err
})
}Working with CRDs using the dynamic client.
package main
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)
type CRDManager struct {
dynamicClient dynamic.Interface
}
func NewCRDManager(config *rest.Config) (*CRDManager, error) {
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
return &CRDManager{dynamicClient: dynamicClient}, nil
}
// CreateCustomResource creates a custom resource
func (m *CRDManager) CreateCustomResource(ctx context.Context,
group, version, resource, namespace string, obj map[string]interface{}) error {
gvr := schema.GroupVersionResource{
Group: group,
Version: version,
Resource: resource,
}
unstructuredObj := &unstructured.Unstructured{
Object: obj,
}
_, err := m.dynamicClient.Resource(gvr).Namespace(namespace).Create(
ctx, unstructuredObj, metav1.CreateOptions{})
return err
}
// WatchCustomResource watches changes to custom resources
func (m *CRDManager) WatchCustomResource(ctx context.Context,
group, version, resource, namespace string) error {
gvr := schema.GroupVersionResource{
Group: group,
Version: version,
Resource: resource,
}
watcher, err := m.dynamicClient.Resource(gvr).Namespace(namespace).Watch(
ctx, metav1.ListOptions{})
if err != nil {
return err
}
defer watcher.Stop()
for {
select {
case event := <-watcher.ResultChan():
if event.Object == nil {
return fmt.Errorf("unexpected nil object")
}
obj := event.Object.(*unstructured.Unstructured)
fmt.Printf("Event: %s, Object: %s\n", event.Type, obj.GetName())
case <-ctx.Done():
return ctx.Err()
}
}
}
// GetCustomResourceStatus retrieves the status of a custom resource
func (m *CRDManager) GetCustomResourceStatus(ctx context.Context,
group, version, resource, namespace, name string) (map[string]interface{}, error) {
gvr := schema.GroupVersionResource{
Group: group,
Version: version,
Resource: resource,
}
obj, err := m.dynamicClient.Resource(gvr).Namespace(namespace).Get(
ctx, name, metav1.GetOptions{})
if err != nil {
return nil, err
}
status, found, err := unstructured.NestedMap(obj.Object, "status")
if err != nil || !found {
return nil, fmt.Errorf("status not found")
}
return status, nil
}Managing resources across multiple namespaces safely.
package main
import (
"context"
"fmt"
"sync"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
type MultiTenantManager struct {
clientset *kubernetes.Clientset
namespaces map[string]*TenantConfig
mu sync.RWMutex
}
type TenantConfig struct {
Namespace string
ResourceQuota *corev1.ResourceQuota
LimitRange *corev1.LimitRange
}
func NewMultiTenantManager(clientset *kubernetes.Clientset) *MultiTenantManager {
return &MultiTenantManager{
clientset: clientset,
namespaces: make(map[string]*TenantConfig),
}
}
// CreateTenant creates a new tenant with resource limits
func (m *MultiTenantManager) CreateTenant(ctx context.Context, tenantName string,
cpuQuota, memoryQuota string) error {
m.mu.Lock()
defer m.mu.Unlock()
// Create namespace
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: tenantName,
Labels: map[string]string{
"tenant": tenantName,
},
},
}
_, err := m.clientset.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create namespace: %w", err)
}
// Create resource quota
quota := &corev1.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{
Name: "tenant-quota",
Namespace: tenantName,
},
Spec: corev1.ResourceQuotaSpec{
Hard: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(cpuQuota),
corev1.ResourceMemory: resource.MustParse(memoryQuota),
corev1.ResourcePods: resource.MustParse("100"),
},
},
}
_, err = m.clientset.CoreV1().ResourceQuotas(tenantName).Create(ctx, quota, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create resource quota: %w", err)
}
m.namespaces[tenantName] = &TenantConfig{
Namespace: tenantName,
ResourceQuota: quota,
}
fmt.Printf("Created tenant: %s\n", tenantName)
return nil
}
// ListTenantsResourceUsage lists resource usage for all tenants
func (m *MultiTenantManager) ListTenantsResourceUsage(ctx context.Context) error {
m.mu.RLock()
defer m.mu.RUnlock()
for tenantName := range m.namespaces {
quota, err := m.clientset.CoreV1().ResourceQuotas(tenantName).Get(
ctx, "tenant-quota", metav1.GetOptions{})
if err != nil {
fmt.Printf("Error getting quota for %s: %v\n", tenantName, err)
continue
}
fmt.Printf("\nTenant: %s\n", tenantName)
fmt.Printf(" CPU: %s / %s\n",
quota.Status.Used[corev1.ResourceCPU],
quota.Status.Hard[corev1.ResourceCPU])
fmt.Printf(" Memory: %s / %s\n",
quota.Status.Used[corev1.ResourceMemory],
quota.Status.Hard[corev1.ResourceMemory])
fmt.Printf(" Pods: %s / %s\n",
quota.Status.Used[corev1.ResourcePods],
quota.Status.Hard[corev1.ResourcePods])
}
return nil
}Properly configure and handle rate limiting.
package main
import (
"context"
"fmt"
"time"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
)
// CreateRateLimitedClient creates a client with custom rate limits
func CreateRateLimitedClient(config *rest.Config, qps float32, burst int) (*kubernetes.Clientset, error) {
// Configure rate limiting
config.QPS = qps
config.Burst = burst
// Optional: Create custom rate limiter
config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
return kubernetes.NewForConfig(config)
}
// AdaptiveRateLimiter adjusts rate based on server responses
type AdaptiveRateLimiter struct {
clientset *kubernetes.Clientset
currentQPS float32
minQPS float32
maxQPS float32
adjustmentMu sync.Mutex
}
func NewAdaptiveRateLimiter(clientset *kubernetes.Clientset) *AdaptiveRateLimiter {
return &AdaptiveRateLimiter{
clientset: clientset,
currentQPS: 50.0,
minQPS: 10.0,
maxQPS: 200.0,
}
}
func (a *AdaptiveRateLimiter) ExecuteWithBackoff(ctx context.Context, operation func() error) error {
backoff := time.Duration(100) * time.Millisecond
for {
err := operation()
if err == nil {
// Success, gradually increase rate
a.increaseRate()
return nil
}
if apierrors.IsTooManyRequests(err) {
// Rate limited, decrease rate and retry
a.decreaseRate()
fmt.Printf("Rate limited, backing off for %v\n", backoff)
time.Sleep(backoff)
backoff *= 2
if backoff > 30*time.Second {
backoff = 30 * time.Second
}
continue
}
// Other error, return
return err
}
}
func (a *AdaptiveRateLimiter) increaseRate() {
a.adjustmentMu.Lock()
defer a.adjustmentMu.Unlock()
newQPS := a.currentQPS * 1.1
if newQPS > a.maxQPS {
newQPS = a.maxQPS
}
a.currentQPS = newQPS
}
func (a *AdaptiveRateLimiter) decreaseRate() {
a.adjustmentMu.Lock()
defer a.adjustmentMu.Unlock()
newQPS := a.currentQPS * 0.5
if newQPS < a.minQPS {
newQPS = a.minQPS
}
a.currentQPS = newQPS
}Detect and handle deprecated API versions.
package main
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
)
type DeprecationChecker struct {
discoveryClient *discovery.DiscoveryClient
}
func NewDeprecationChecker(clientset *kubernetes.Clientset) *DeprecationChecker {
return &DeprecationChecker{
discoveryClient: clientset.Discovery(),
}
}
// CheckAPIVersion checks if an API version is available
func (c *DeprecationChecker) CheckAPIVersion(group, version string) (bool, error) {
groups, err := c.discoveryClient.ServerGroups()
if err != nil {
return false, err
}
for _, g := range groups.Groups {
if g.Name == group {
for _, v := range g.Versions {
if v.Version == version {
return true, nil
}
}
}
}
return false, nil
}
// FindPreferredVersion finds the preferred version for a resource
func (c *DeprecationChecker) FindPreferredVersion(group string) (string, error) {
groups, err := c.discoveryClient.ServerGroups()
if err != nil {
return "", err
}
for _, g := range groups.Groups {
if g.Name == group {
return g.PreferredVersion.Version, nil
}
}
return "", fmt.Errorf("group %s not found", group)
}
// MigrateAPIVersion helps migrate from deprecated to newer API versions
func (c *DeprecationChecker) MigrateAPIVersion(ctx context.Context,
oldGroup, oldVersion, newGroup, newVersion, resource string) error {
// Check if old version still exists
oldExists, err := c.CheckAPIVersion(oldGroup, oldVersion)
if err != nil {
return err
}
// Check if new version exists
newExists, err := c.CheckAPIVersion(newGroup, newVersion)
if err != nil {
return err
}
if !oldExists && !newExists {
return fmt.Errorf("neither old nor new API version exists")
}
if !newExists {
return fmt.Errorf("new API version not available, cannot migrate")
}
if oldExists {
fmt.Printf("Warning: Using deprecated API %s/%s, should migrate to %s/%s\n",
oldGroup, oldVersion, newGroup, newVersion)
}
return nil
}Handle different Kubernetes versions.
package main
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/kubernetes"
)
type VersionCompatibilityChecker struct {
clientset *kubernetes.Clientset
serverVersion *version.Info
}
func NewVersionCompatibilityChecker(clientset *kubernetes.Clientset) (*VersionCompatibilityChecker, error) {
serverVersion, err := clientset.Discovery().ServerVersion()
if err != nil {
return nil, err
}
return &VersionCompatibilityChecker{
clientset: clientset,
serverVersion: serverVersion,
}, nil
}
// SupportsFeature checks if a feature is supported in the server version
func (c *VersionCompatibilityChecker) SupportsFeature(feature string) bool {
// Parse server version and check feature support
// This is simplified; in practice you'd use version comparison
switch feature {
case "server-side-apply":
return c.serverVersion.Major >= "1" && c.serverVersion.Minor >= "18"
case "ephemeral-containers":
return c.serverVersion.Major >= "1" && c.serverVersion.Minor >= "23"
case "pod-security-admission":
return c.serverVersion.Major >= "1" && c.serverVersion.Minor >= "25"
default:
return false
}
}
// GetServerVersion returns the Kubernetes server version
func (c *VersionCompatibilityChecker) GetServerVersion() string {
return fmt.Sprintf("%s.%s", c.serverVersion.Major, c.serverVersion.Minor)
}
// Example: Conditional feature usage
func (c *VersionCompatibilityChecker) CreateResourceSafely(ctx context.Context) error {
if c.SupportsFeature("server-side-apply") {
fmt.Println("Using server-side apply (supported)")
// Use server-side apply
} else {
fmt.Println("Falling back to client-side apply (server-side not supported)")
// Use traditional update
}
return nil
}func CreateClientWithTimeouts(config *rest.Config) (*kubernetes.Clientset, error) {
// Set appropriate timeouts
config.Timeout = 30 * time.Second
config.QPS = 50
config.Burst = 100
// Configure dial timeout
config.Dial = (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext
return kubernetes.NewForConfig(config)
}func WatchWithProperCleanup(ctx context.Context, clientset *kubernetes.Clientset) {
watcher, err := clientset.CoreV1().Pods("default").Watch(ctx, metav1.ListOptions{})
if err != nil {
panic(err)
}
// CRITICAL: Always stop the watcher
defer watcher.Stop()
// Use context for graceful shutdown
for {
select {
case event := <-watcher.ResultChan():
// Handle event
_ = event
case <-ctx.Done():
// Context cancelled, return
return
}
}
}Install with Tessl CLI
npx tessl i tessl/golang-k8s-io--client-go@0.35.0