Official Go client library for Kubernetes API - typed clients, controllers, and cluster interaction tools
This document provides common usage patterns and practical examples for working with k8s.io/client-go across different scenarios.
package main
import (
"context"
"fmt"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func main() {
// Get in-cluster configuration
config, err := rest.InClusterConfig()
if err != nil {
panic(err)
}
// Create clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// List pods in current namespace
pods, err := clientset.CoreV1().Pods("").List(
context.TODO(),
metav1.ListOptions{})
if err != nil {
panic(err)
}
fmt.Printf("Found %d pods\\n", len(pods.Items))
}package main
import (
"context"
"flag"
"fmt"
"path/filepath"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig",
filepath.Join(home, ".kube", "config"),
"(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
// Build config from kubeconfig file
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
// Create clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// Get server version
version, err := clientset.Discovery().ServerVersion()
if err != nil {
panic(err)
}
fmt.Printf("Kubernetes version: %s\\n", version.GitVersion)
}import (
"context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/api/errors"
)
// Create
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "my-config",
Namespace: "default",
},
Data: map[string]string{
"key": "value",
},
}
created, err := clientset.CoreV1().ConfigMaps("default").Create(
context.TODO(), configMap, metav1.CreateOptions{})
// Read (Get)
retrieved, err := clientset.CoreV1().ConfigMaps("default").Get(
context.TODO(), "my-config", metav1.GetOptions{})
// Update
retrieved.Data["key"] = "new-value"
updated, err := clientset.CoreV1().ConfigMaps("default").Update(
context.TODO(), retrieved, metav1.UpdateOptions{})
// Delete
err = clientset.CoreV1().ConfigMaps("default").Delete(
context.TODO(), "my-config", metav1.DeleteOptions{})
// Check if not found
if errors.IsNotFound(err) {
fmt.Println("ConfigMap not found")
}import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// List pods with specific labels
pods, err := clientset.CoreV1().Pods("default").List(
context.TODO(),
metav1.ListOptions{
LabelSelector: "app=myapp,tier=frontend",
})
// List with field selector
pods, err = clientset.CoreV1().Pods("default").List(
context.TODO(),
metav1.ListOptions{
FieldSelector: "status.phase=Running",
})
// Combined selectors
pods, err = clientset.CoreV1().Pods("default").List(
context.TODO(),
metav1.ListOptions{
LabelSelector: "app=myapp",
FieldSelector: "status.phase=Running",
Limit: 100,
})import (
"k8s.io/apimachinery/pkg/types"
)
// JSON Patch
jsonPatch := []byte(`[{"op": "replace", "path": "/data/key", "value": "patched-value"}]`)
patched, err := clientset.CoreV1().ConfigMaps("default").Patch(
context.TODO(),
"my-config",
types.JSONPatchType,
jsonPatch,
metav1.PatchOptions{})
// Merge Patch
mergePatch := []byte(`{"data": {"key": "merged-value"}}`)
patched, err = clientset.CoreV1().ConfigMaps("default").Patch(
context.TODO(),
"my-config",
types.MergePatchType,
mergePatch,
metav1.PatchOptions{})
// Strategic Merge Patch
strategicPatch := []byte(`{"data": {"new-key": "new-value"}}`)
patched, err = clientset.CoreV1().ConfigMaps("default").Patch(
context.TODO(),
"my-config",
types.StrategicMergePatchType,
strategicPatch,
metav1.PatchOptions{})import (
"k8s.io/client-go/dynamic"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// Create dynamic client
dynamicClient, err := dynamic.NewForConfig(config)
// Define CRD GroupVersionResource
crontabGVR := schema.GroupVersionResource{
Group: "stable.example.com",
Version: "v1",
Resource: "crontabs",
}
// Create custom resource
crontab := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "stable.example.com/v1",
"kind": "CronTab",
"metadata": map[string]interface{}{
"name": "my-crontab",
},
"spec": map[string]interface{}{
"cronSpec": "*/5 * * * *",
"image": "my-cron-image:latest",
"replicas": 1,
},
},
}
result, err := dynamicClient.Resource(crontabGVR).
Namespace("default").
Create(context.TODO(), crontab, metav1.CreateOptions{})
// List custom resources
list, err := dynamicClient.Resource(crontabGVR).
Namespace("default").
List(context.TODO(), metav1.ListOptions{})
for _, item := range list.Items {
spec, _ := item.Object["spec"].(map[string]interface{})
cronSpec := spec["cronSpec"].(string)
fmt.Printf("CronTab %s: %s\\n", item.GetName(), cronSpec)
}import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type Controller struct {
clientset kubernetes.Interface
podInformer cache.SharedIndexInformer
podLister cache.GenericLister
queue workqueue.TypedRateLimitingInterface[string]
}
func NewController(clientset kubernetes.Interface) *Controller {
// Create informer factory
factory := informers.NewSharedInformerFactory(clientset, time.Minute*10)
podInformer := factory.Core().V1().Pods().Informer()
podLister := factory.Core().V1().Pods().Lister()
// Create work queue
queue := workqueue.NewTypedRateLimitingQueue(
workqueue.DefaultTypedControllerRateLimiter[string]())
// Register event handlers
_, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
})
if err != nil {
panic(err)
}
return &Controller{
clientset: clientset,
podInformer: podInformer,
podLister: podLister,
queue: queue,
}
}
func (c *Controller) Run(ctx context.Context, workers int) error {
defer c.queue.ShutDown()
// Start informer
go c.podInformer.Run(ctx.Done())
// Wait for cache sync
if !cache.WaitForCacheSync(ctx.Done(), c.podInformer.HasSynced) {
return fmt.Errorf("failed to sync cache")
}
// Start workers
for i := 0; i < workers; i++ {
go c.runWorker(ctx)
}
<-ctx.Done()
return nil
}
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextItem(ctx) {
}
}
func (c *Controller) processNextItem(ctx context.Context) bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncHandler(ctx, key)
c.handleErr(err, key)
return true
}
func (c *Controller) syncHandler(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// Get pod from cache
obj, exists, err := c.podInformer.GetIndexer().GetByKey(key)
if err != nil {
return err
}
if !exists {
fmt.Printf("Pod %s/%s deleted\\n", namespace, name)
return nil
}
pod := obj.(*corev1.Pod)
fmt.Printf("Processing pod %s/%s in phase %s\\n",
pod.Namespace, pod.Name, pod.Status.Phase)
// Your reconciliation logic here
return nil
}
func (c *Controller) handleErr(err error, key string) {
if err == nil {
c.queue.Forget(key)
return
}
if c.queue.NumRequeues(key) < 5 {
fmt.Printf("Error syncing pod %s: %v\\n", key, err)
c.queue.AddRateLimited(key)
return
}
c.queue.Forget(key)
fmt.Printf("Dropping pod %s out of queue: %v\\n", key, err)
}import (
"k8s.io/apimachinery/pkg/watch"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Watch for pod changes
watcher, err := clientset.CoreV1().Pods("default").Watch(
context.TODO(),
metav1.ListOptions{
LabelSelector: "app=myapp",
})
if err != nil {
panic(err)
}
defer watcher.Stop()
// Process watch events
for event := range watcher.ResultChan() {
pod, ok := event.Object.(*corev1.Pod)
if !ok {
continue
}
switch event.Type {
case watch.Added:
fmt.Printf("Pod added: %s\\n", pod.Name)
case watch.Modified:
fmt.Printf("Pod modified: %s (phase: %s)\\n",
pod.Name, pod.Status.Phase)
case watch.Deleted:
fmt.Printf("Pod deleted: %s\\n", pod.Name)
case watch.Error:
fmt.Printf("Watch error: %v\\n", event.Object)
}
}import (
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Get deployment
deployment, err := clientset.AppsV1().Deployments("default").Get(
context.TODO(), "my-deployment", metav1.GetOptions{})
if err != nil {
panic(err)
}
// Modify status (usually done by controller)
deployment.Status.Replicas = 3
deployment.Status.ReadyReplicas = 3
deployment.Status.AvailableReplicas = 3
// Update status subresource
updatedDeployment, err := clientset.AppsV1().Deployments("default").
UpdateStatus(context.TODO(), deployment, metav1.UpdateOptions{})import (
"bytes"
"io"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
)
// Execute command in pod
req := clientset.CoreV1().RESTClient().
Post().
Namespace("default").
Resource("pods").
Name("my-pod").
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: "main",
Command: []string{"sh", "-c", "echo hello"},
Stdout: true,
Stderr: true,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
panic(err)
}
var stdout, stderr bytes.Buffer
err = exec.StreamWithContext(context.TODO(), remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: &stderr,
})
if err != nil {
panic(err)
}
fmt.Printf("Output: %s\\n", stdout.String())import (
"net/http"
"net/url"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)
// Port forward to pod
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward",
"default", "my-pod")
hostIP := strings.TrimLeft(config.Host, "htps:/")
transport, upgrader, err := spdy.RoundTripperFor(config)
if err != nil {
panic(err)
}
url := &url.URL{
Scheme: "https",
Path: path,
Host: hostIP,
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport},
http.MethodPost, url)
stopChan := make(chan struct{}, 1)
readyChan := make(chan struct{}, 1)
ports := []string{"8080:80"} // local:remote
forwarder, err := portforward.New(dialer, ports, stopChan, readyChan,
os.Stdout, os.Stderr)
if err != nil {
panic(err)
}
go func() {
if err := forwarder.ForwardPorts(); err != nil {
panic(err)
}
}()
<-readyChan
fmt.Println("Port forward ready")
// Now you can access localhost:8080import (
"context"
"time"
)
// Context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pods, err := clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{})
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
fmt.Println("Request timed out")
}
panic(err)
}// Context with cancellation
ctx, cancel := context.WithCancel(context.Background())
// Start long-running operation
go func() {
watcher, err := clientset.CoreV1().Pods("default").Watch(ctx, metav1.ListOptions{})
if err != nil {
return
}
for event := range watcher.ResultChan() {
// Process events
_ = event
}
}()
// Cancel after some time
time.Sleep(5 * time.Minute)
cancel() // This stops the watchimport (
"k8s.io/client-go/rest"
)
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err)
}
// Configure rate limiting
config.QPS = 50.0 // 50 queries per second
config.Burst = 100 // Burst up to 100 requests
clientset, err := kubernetes.NewForConfig(config)config.Timeout = 30 * time.Second
// Per-request timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
pod, err := clientset.CoreV1().Pods("default").Get(
ctx, "my-pod", metav1.GetOptions{})config.UserAgent = "my-app/v1.0.0"
clientset, err := kubernetes.NewForConfig(config)import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
)
pod, err := clientset.CoreV1().Pods("default").Get(
context.TODO(), "my-pod", metav1.GetOptions{})
if err != nil {
switch {
case apierrors.IsNotFound(err):
fmt.Println("Pod not found - create it")
case apierrors.IsAlreadyExists(err):
fmt.Println("Pod already exists")
case apierrors.IsConflict(err):
fmt.Println("Resource version conflict - retry")
case apierrors.IsForbidden(err):
fmt.Println("Access forbidden - check RBAC")
case apierrors.IsUnauthorized(err):
fmt.Println("Authentication failed")
case apierrors.IsTimeout(err):
fmt.Println("Request timed out")
case apierrors.IsServerTimeout(err):
fmt.Println("Server timeout")
case apierrors.IsInvalid(err):
fmt.Println("Invalid request:", err)
case apierrors.IsBadRequest(err):
fmt.Println("Bad request:", err)
default:
fmt.Printf("Unexpected error: %v\\n", err)
}
return
}import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
)
// Retry with exponential backoff
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Get the latest version
pod, err := clientset.CoreV1().Pods("default").Get(
context.TODO(), "my-pod", metav1.GetOptions{})
if err != nil {
return err
}
// Modify pod
pod.Labels["updated"] = "true"
// Update
_, err = clientset.CoreV1().Pods("default").Update(
context.TODO(), pod, metav1.UpdateOptions{})
return err
})
if err != nil {
fmt.Printf("Update failed: %v\\n", err)
}// Get current resource
configMap, err := clientset.CoreV1().ConfigMaps("default").Get(
context.TODO(), "my-config", metav1.GetOptions{})
if err != nil {
panic(err)
}
// Store resource version
resourceVersion := configMap.ResourceVersion
// Modify
configMap.Data["key"] = "new-value"
// Update will fail if resource was modified by another client
updated, err := clientset.CoreV1().ConfigMaps("default").Update(
context.TODO(), configMap, metav1.UpdateOptions{})
if apierrors.IsConflict(err) {
fmt.Println("Conflict: resource was modified, retry")
}apierrors package to identify specific error typesInstall with Tessl CLI
npx tessl i tessl/golang-k8s-io--client-go