Official Go client library for Kubernetes API - typed clients, controllers, and cluster interaction tools
This document covers advanced features and low-level APIs: OpenAPI schema access, REST mapping, HTTP transport customization, feature gates, and testing utilities.
The OpenAPI packages provide access to Kubernetes API OpenAPI specifications, which describe the structure and validation rules of all API resources.
k8s.io/client-go/openapik8s.io/client-go/openapi3import (
"k8s.io/client-go/openapi"
"k8s.io/client-go/openapi3"
"k8s.io/client-go/discovery"
)import (
"k8s.io/client-go/openapi"
"k8s.io/client-go/discovery"
)
// Create discovery client
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
// Get OpenAPI V2 spec
openapiClient := openapi.NewOpenAPIGetter(discoveryClient)
spec, err := openapiClient.OpenAPISchema()
if err != nil {
panic(err)
}
// Access API definitions
for path, pathItem := range spec.Paths.Paths {
fmt.Printf("Path: %s\\n", path)
}
// Access model definitions
for defName, schema := range spec.Definitions {
fmt.Printf("Definition: %s\\n", defName)
fmt.Printf(" Description: %s\\n", schema.Description)
}import (
"k8s.io/client-go/openapi3"
)
// Create OpenAPI V3 root client
root := openapi3.NewRoot(discoveryClient)
// List available group versions
gvs, err := root.GroupVersions()
if err != nil {
panic(err)
}
for _, gv := range gvs {
fmt.Printf("Group/Version: %s\\n", gv)
// Get spec for specific group/version
spec, err := root.GVSpec(gv)
if err != nil {
continue
}
// Access paths in this group/version
for path := range spec.Paths {
fmt.Printf(" Path: %s\\n", path)
}
}1. Schema Validation
// Get schema for resource type
spec, err := openapiClient.OpenAPISchema()
podSchema, exists := spec.Definitions["io.k8s.api.core.v1.Pod"]
if exists {
// Access validation rules
for propName, propSchema := range podSchema.Properties {
if propSchema.Required {
fmt.Printf("Required field: %s\\n", propName)
}
}
}2. API Discovery
// Discover all available operations
spec, _ := openapiClient.OpenAPISchema()
for path, pathItem := range spec.Paths.Paths {
if pathItem.Get != nil {
fmt.Printf("GET %s\\n", path)
}
if pathItem.Post != nil {
fmt.Printf("POST %s\\n", path)
}
if pathItem.Put != nil {
fmt.Printf("PUT %s\\n", path)
}
if pathItem.Delete != nil {
fmt.Printf("DELETE %s\\n", path)
}
}3. Documentation Generation
// Extract field documentation
podSchema, _ := spec.Definitions["io.k8s.api.core.v1.Pod"]
specProp, _ := podSchema.Properties["spec"]
fmt.Printf("Pod.spec description: %s\\n", specProp.Description)
// Recurse into nested schemas for complete documentationRESTMapper translates between GroupVersionKind (GVK) and GroupVersionResource (GVR), and provides information about resource REST endpoints.
k8s.io/client-go/restmapperk8s.io/apimachinery/pkg/api/metaimport (
"k8s.io/client-go/restmapper"
"k8s.io/client-go/discovery"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/api/meta"
)import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/restmapper"
)
// Create discovery client
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err)
}
// Get API group resources
groupResources, err := restmapper.GetAPIGroupResources(discoveryClient)
if err != nil {
panic(err)
}
// Create REST mapper
mapper := restmapper.NewDiscoveryRESTMapper(groupResources)type RESTMapper interface {
// GVK to GVR conversion
RESTMapping(gk schema.GroupKind, versions ...string) (*RESTMapping, error)
RESTMappings(gk schema.GroupKind, versions ...string) ([]*RESTMapping, error)
// GVR to GVK conversion
KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error)
KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error)
// Resource information
ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error)
ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error)
// Singular resource name
ResourceSingularizer(resource string) (singular string, err error)
}
type RESTMapping struct {
// Resource information
Resource schema.GroupVersionResource
GroupVersionKind schema.GroupVersionKind
// REST scope
Scope meta.RESTScope
}import (
"k8s.io/apimachinery/pkg/runtime/schema"
)
// Define GVK (GroupVersionKind)
gvk := schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Pod",
}
// Get REST mapping
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
panic(err)
}
// Access GVR (GroupVersionResource)
gvr := mapping.Resource
fmt.Printf("GVR: %s/%s/%s\\n", gvr.Group, gvr.Version, gvr.Resource)
// Output: GVR: /v1/pods
// Check scope
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
fmt.Println("Resource is namespaced")
} else {
fmt.Println("Resource is cluster-scoped")
}// Define GVR
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
}
// Get GVK
gvk, err := mapper.KindFor(gvr)
if err != nil {
panic(err)
}
fmt.Printf("GVK: %s/%s/%s\\n", gvk.Group, gvk.Version, gvk.Kind)
// Output: GVK: apps/v1/Deployment// Plural to singular
singular, err := mapper.ResourceSingularizer("pods")
fmt.Println(singular) // Output: pod
singular, err = mapper.ResourceSingularizer("deployments")
fmt.Println(singular) // Output: deploymentimport "k8s.io/client-go/restmapper"
// Create shortcut expander
expander := restmapper.NewShortcutExpander(mapper, discoveryClient, nil)
// Expand shortcut to full GVR
gvr, err := expander.Expand("po") // "po" is shortcut for "pods"
// Returns: schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
gvr, err = expander.Expand("deploy") // "deploy" is shortcut for "deployments"
// Returns: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}1. Dynamic Resource Access
func getResource(kind, apiVersion, name, namespace string) (interface{}, error) {
// Parse API version
gv, err := schema.ParseGroupVersion(apiVersion)
if err != nil {
return nil, err
}
// Get GVR from GVK
gvk := gv.WithKind(kind)
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
// Use dynamic client with GVR
return dynamicClient.Resource(mapping.Resource).
Namespace(namespace).
Get(context.TODO(), name, metav1.GetOptions{})
}2. Resource Scope Detection
func isNamespaced(gvk schema.GroupVersionKind) (bool, error) {
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return false, err
}
return mapping.Scope.Name() == meta.RESTScopeNameNamespace, nil
}3. Resource Validation
func resourceExists(gvr schema.GroupVersionResource) bool {
_, err := mapper.KindFor(gvr)
return err == nil
}The transport package provides low-level HTTP transport configuration and customization.
k8s.io/client-go/transportimport (
"k8s.io/client-go/transport"
"k8s.io/client-go/rest"
)type Config struct {
// TLS configuration
TLS TLSConfig
// Authentication
Username string
Password string
BearerToken string
// Impersonation
Impersonate ImpersonateConfig
// Custom wrappers
WrapTransport transport.WrapperFunc
// User agent
UserAgent string
// Dial function
Dial func(ctx context.Context, network, address string) (net.Conn, error)
}
type TLSConfig struct {
CAFile string
CertFile string
KeyFile string
CAData []byte
CertData []byte
KeyData []byte
ServerName string
Insecure bool
NextProtos []string
}import (
"net/http"
"k8s.io/client-go/transport"
)
// Create transport config
tlsConfig := &transport.Config{
TLS: transport.TLSConfig{
CAFile: "/path/to/ca.crt",
CertFile: "/path/to/client.crt",
KeyFile: "/path/to/client.key",
},
BearerToken: "my-token",
UserAgent: "my-app/v1.0.0",
}
// Build HTTP transport
httpTransport, err := transport.New(tlsConfig)
if err != nil {
panic(err)
}
// Use with REST client
config := &rest.Config{
Host: "https://kubernetes.example.com",
Transport: httpTransport,
}type WrapperFunc func(http.RoundTripper) http.RoundTripper
// Custom logging wrapper
func loggingWrapper(rt http.RoundTripper) http.RoundTripper {
return &loggingRoundTripper{
base: rt,
}
}
type loggingRoundTripper struct {
base http.RoundTripper
}
func (rt *loggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
fmt.Printf("Request: %s %s\\n", req.Method, req.URL)
resp, err := rt.base.RoundTrip(req)
if err == nil {
fmt.Printf("Response: %d\\n", resp.StatusCode)
} else {
fmt.Printf("Error: %v\\n", err)
}
return resp, err
}
// Apply wrapper to config
config.WrapTransport = loggingWrapper1. Metrics Collection
func metricsWrapper(rt http.RoundTripper) http.RoundTripper {
return &metricsRoundTripper{base: rt}
}
type metricsRoundTripper struct {
base http.RoundTripper
}
func (rt *metricsRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
start := time.Now()
resp, err := rt.base.RoundTrip(req)
duration := time.Since(start)
// Record metrics: method, path, status, duration
recordMetric(req.Method, req.URL.Path, resp.StatusCode, duration)
return resp, err
}2. Request Retry
func retryWrapper(rt http.RoundTripper, maxRetries int) http.RoundTripper {
return &retryRoundTripper{
base: rt,
maxRetries: maxRetries,
}
}
type retryRoundTripper struct {
base http.RoundTripper
maxRetries int
}
func (rt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
var resp *http.Response
var err error
for i := 0; i <= rt.maxRetries; i++ {
resp, err = rt.base.RoundTrip(req)
if err == nil && resp.StatusCode < 500 {
return resp, nil
}
if i < rt.maxRetries {
time.Sleep(time.Second * time.Duration(i+1))
}
}
return resp, err
}3. Request Tracing
func tracingWrapper(rt http.RoundTripper) http.RoundTripper {
return &tracingRoundTripper{base: rt}
}
type tracingRoundTripper struct {
base http.RoundTripper
}
func (rt *tracingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
// Start trace span
span := startSpan(req.Context(), req.Method, req.URL.Path)
defer span.Finish()
// Add trace headers
req.Header.Set("X-Trace-ID", span.TraceID())
resp, err := rt.base.RoundTrip(req)
// Record trace data
if err != nil {
span.SetError(err)
} else {
span.SetStatus(resp.StatusCode)
}
return resp, err
}Feature gates allow enabling/disabling experimental or alpha features in client-go.
k8s.io/client-go/featuresimport (
"k8s.io/client-go/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)const (
// Enable CBOR content type support
ClientsAllowCBOR utilfeature.Feature = "ClientsAllowCBOR"
// Prefer CBOR over JSON when available
ClientsPreferCBOR utilfeature.Feature = "ClientsPreferCBOR"
// In-order delivery guarantee for informers
InOrderInformers utilfeature.Feature = "InOrderInformers"
// Informer resource version handling
InformerResourceVersion utilfeature.Feature = "InformerResourceVersion"
// Watch list support
WatchListClient utilfeature.Feature = "WatchListClient"
)import "k8s.io/client-go/features"
// Check if feature is enabled
if features.FeatureGates().Enabled(features.ClientsAllowCBOR) {
fmt.Println("CBOR support is enabled")
}
// Check multiple features
cbor := features.FeatureGates().Enabled(features.ClientsAllowCBOR)
preferCBOR := features.FeatureGates().Enabled(features.ClientsPreferCBOR)Feature gates are typically set via environment variables or command-line flags:
// Programmatic enablement (for testing)
import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/featuregate"
)
// Enable specific feature
err := features.FeatureGates().Set("ClientsAllowCBOR=true")
if err != nil {
panic(err)
}
// Enable multiple features
err = features.FeatureGates().Set("ClientsAllowCBOR=true,InOrderInformers=true")ClientsAllowCBOR:
ClientsPreferCBOR:
InOrderInformers:
InformerResourceVersion:
WatchListClient:
client-go provides fake implementations and testing utilities for writing tests without a real Kubernetes cluster.
k8s.io/client-go/kubernetes/fakek8s.io/client-go/dynamic/fakek8s.io/client-go/testingimport (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/testing"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)import (
"testing"
"k8s.io/client-go/kubernetes/fake"
corev1 "k8s.io/api/core/v1"
)
func TestMyController(t *testing.T) {
// Create initial objects
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "nginx", Image: "nginx:latest"},
},
},
}
// Create fake clientset with initial objects
clientset := fake.NewSimpleClientset(pod)
// Use clientset in tests
pods, err := clientset.CoreV1().Pods("default").List(
context.TODO(),
metav1.ListOptions{})
if err != nil {
t.Fatalf("Error listing pods: %v", err)
}
if len(pods.Items) != 1 {
t.Errorf("Expected 1 pod, got %d", len(pods.Items))
}
// Create new pod
newPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod-2",
Namespace: "default",
},
}
created, err := clientset.CoreV1().Pods("default").Create(
context.TODO(),
newPod,
metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating pod: %v", err)
}
}Reactors allow customizing fake client behavior:
import "k8s.io/client-go/testing"
func TestWithReactor(t *testing.T) {
clientset := fake.NewSimpleClientset()
// Add reactor for specific action
clientset.PrependReactor("create", "pods", func(action testing.Action) (handled bool, ret runtime.Object, err error) {
createAction := action.(testing.CreateAction)
pod := createAction.GetObject().(*corev1.Pod)
// Custom logic: reject pods without labels
if len(pod.Labels) == 0 {
return true, nil, errors.New("pods must have labels")
}
// Return false to let default handler process
return false, nil, nil
})
// Test will use custom reactor
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
// No labels - will be rejected
},
}
_, err := clientset.CoreV1().Pods("default").Create(
context.TODO(), pod, metav1.CreateOptions{})
if err == nil {
t.Error("Expected error for pod without labels")
}
}func TestActionTracking(t *testing.T) {
clientset := fake.NewSimpleClientset()
// Perform operations
clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{})
clientset.CoreV1().Pods("default").Create(context.TODO(), &corev1.Pod{}, metav1.CreateOptions{})
// Verify actions
actions := clientset.Actions()
if len(actions) != 2 {
t.Errorf("Expected 2 actions, got %d", len(actions))
}
// Check action types
listAction := actions[0]
if listAction.GetVerb() != "list" {
t.Errorf("Expected list action, got %s", listAction.GetVerb())
}
createAction := actions[1]
if createAction.GetVerb() != "create" {
t.Errorf("Expected create action, got %s", createAction.GetVerb())
}
}import (
"k8s.io/client-go/dynamic/fake"
"k8s.io/apimachinery/pkg/runtime"
)
func TestDynamicClient(t *testing.T) {
scheme := runtime.NewScheme()
// Register types if needed
// Create fake dynamic client
dynamicClient := fake.NewSimpleDynamicClient(scheme, initialObjects...)
// Use like regular dynamic client
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
}
obj, err := dynamicClient.Resource(gvr).
Namespace("default").
Get(context.TODO(), "my-deployment", metav1.GetOptions{})
}import (
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
func TestInformer(t *testing.T) {
clientset := fake.NewSimpleClientset()
// Create informer factory
factory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := factory.Core().V1().Pods().Informer()
// Add event handler
called := false
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
called = true
},
})
// Start informer
stopCh := make(chan struct{})
defer close(stopCh)
factory.Start(stopCh)
factory.WaitForCacheSync(stopCh)
// Trigger event by creating pod
clientset.CoreV1().Pods("default").Create(
context.TODO(),
&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test"}},
metav1.CreateOptions{})
// Give informer time to process
time.Sleep(100 * time.Millisecond)
if !called {
t.Error("Event handler was not called")
}
}import "k8s.io/client-go/testing"
func TestObjectTracker(t *testing.T) {
scheme := runtime.NewScheme()
corev1.AddToScheme(scheme)
// Create object tracker
tracker := testing.NewObjectTracker(scheme, codecs.UniversalDecoder())
// Add objects
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
},
}
err := tracker.Add(pod)
if err != nil {
t.Fatalf("Error adding pod: %v", err)
}
// Retrieve object
gvr := corev1.SchemeGroupVersion.WithResource("pods")
obj, err := tracker.Get(gvr, "default", "test-pod")
retrieved := obj.(*corev1.Pod)
if retrieved.Name != "test-pod" {
t.Errorf("Expected pod name test-pod, got %s", retrieved.Name)
}
}1. Use Fake Clients
2. Verify Actions
3. Test Event Handlers
4. Mock External Dependencies
5. Use Table-Driven Tests
func TestScaling(t *testing.T) {
tests := []struct {
name string
initialReplicas int32
desiredReplicas int32
expectError bool
}{
{"scale up", 1, 3, false},
{"scale down", 5, 2, false},
{"scale to zero", 3, 0, false},
{"negative replicas", 3, -1, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Test implementation
})
}
}Install with Tessl CLI
npx tessl i tessl/golang-k8s-io--client-go