CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-k8s-io--client-go

Official Go client library for Kubernetes API - typed clients, controllers, and cluster interaction tools

Overview
Eval results
Files

advanced-topics.mddocs/reference/

Advanced Topics

Back to Index

This document covers advanced features and low-level APIs: OpenAPI schema access, REST mapping, HTTP transport customization, feature gates, and testing utilities.

OpenAPI Schema Access

Overview

The OpenAPI packages provide access to Kubernetes API OpenAPI specifications, which describe the structure and validation rules of all API resources.

Package Information

  • OpenAPI V2: k8s.io/client-go/openapi
  • OpenAPI V3: k8s.io/client-go/openapi3

Core Imports

import (
    "k8s.io/client-go/openapi"
    "k8s.io/client-go/openapi3"
    "k8s.io/client-go/discovery"
)

OpenAPI V2 Client

import (
    "k8s.io/client-go/openapi"
    "k8s.io/client-go/discovery"
)

// Create discovery client
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)

// Get OpenAPI V2 spec
openapiClient := openapi.NewOpenAPIGetter(discoveryClient)
spec, err := openapiClient.OpenAPISchema()

if err != nil {
    panic(err)
}

// Access API definitions
for path, pathItem := range spec.Paths.Paths {
    fmt.Printf("Path: %s\\n", path)
}

// Access model definitions
for defName, schema := range spec.Definitions {
    fmt.Printf("Definition: %s\\n", defName)
    fmt.Printf("  Description: %s\\n", schema.Description)
}

OpenAPI V3 Client

import (
    "k8s.io/client-go/openapi3"
)

// Create OpenAPI V3 root client
root := openapi3.NewRoot(discoveryClient)

// List available group versions
gvs, err := root.GroupVersions()
if err != nil {
    panic(err)
}

for _, gv := range gvs {
    fmt.Printf("Group/Version: %s\\n", gv)

    // Get spec for specific group/version
    spec, err := root.GVSpec(gv)
    if err != nil {
        continue
    }

    // Access paths in this group/version
    for path := range spec.Paths {
        fmt.Printf("  Path: %s\\n", path)
    }
}

Use Cases

1. Schema Validation

// Get schema for resource type
spec, err := openapiClient.OpenAPISchema()
podSchema, exists := spec.Definitions["io.k8s.api.core.v1.Pod"]

if exists {
    // Access validation rules
    for propName, propSchema := range podSchema.Properties {
        if propSchema.Required {
            fmt.Printf("Required field: %s\\n", propName)
        }
    }
}

2. API Discovery

// Discover all available operations
spec, _ := openapiClient.OpenAPISchema()

for path, pathItem := range spec.Paths.Paths {
    if pathItem.Get != nil {
        fmt.Printf("GET %s\\n", path)
    }
    if pathItem.Post != nil {
        fmt.Printf("POST %s\\n", path)
    }
    if pathItem.Put != nil {
        fmt.Printf("PUT %s\\n", path)
    }
    if pathItem.Delete != nil {
        fmt.Printf("DELETE %s\\n", path)
    }
}

3. Documentation Generation

// Extract field documentation
podSchema, _ := spec.Definitions["io.k8s.api.core.v1.Pod"]
specProp, _ := podSchema.Properties["spec"]

fmt.Printf("Pod.spec description: %s\\n", specProp.Description)

// Recurse into nested schemas for complete documentation

REST Mapping

Overview

RESTMapper translates between GroupVersionKind (GVK) and GroupVersionResource (GVR), and provides information about resource REST endpoints.

Package Information

  • Package: k8s.io/client-go/restmapper
  • Related: k8s.io/apimachinery/pkg/api/meta

Core Imports

import (
    "k8s.io/client-go/restmapper"
    "k8s.io/client-go/discovery"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/apimachinery/pkg/api/meta"
)

Creating a RESTMapper

import (
    "k8s.io/client-go/discovery"
    "k8s.io/client-go/restmapper"
)

// Create discovery client
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
    panic(err)
}

// Get API group resources
groupResources, err := restmapper.GetAPIGroupResources(discoveryClient)
if err != nil {
    panic(err)
}

// Create REST mapper
mapper := restmapper.NewDiscoveryRESTMapper(groupResources)

RESTMapper Interface

type RESTMapper interface {
    // GVK to GVR conversion
    RESTMapping(gk schema.GroupKind, versions ...string) (*RESTMapping, error)
    RESTMappings(gk schema.GroupKind, versions ...string) ([]*RESTMapping, error)

    // GVR to GVK conversion
    KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error)
    KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error)

    // Resource information
    ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error)
    ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error)

    // Singular resource name
    ResourceSingularizer(resource string) (singular string, err error)
}

type RESTMapping struct {
    // Resource information
    Resource         schema.GroupVersionResource
    GroupVersionKind schema.GroupVersionKind

    // REST scope
    Scope meta.RESTScope
}

GVK to GVR Mapping

import (
    "k8s.io/apimachinery/pkg/runtime/schema"
)

// Define GVK (GroupVersionKind)
gvk := schema.GroupVersionKind{
    Group:   "",
    Version: "v1",
    Kind:    "Pod",
}

// Get REST mapping
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
    panic(err)
}

// Access GVR (GroupVersionResource)
gvr := mapping.Resource
fmt.Printf("GVR: %s/%s/%s\\n", gvr.Group, gvr.Version, gvr.Resource)
// Output: GVR: /v1/pods

// Check scope
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
    fmt.Println("Resource is namespaced")
} else {
    fmt.Println("Resource is cluster-scoped")
}

GVR to GVK Mapping

// Define GVR
gvr := schema.GroupVersionResource{
    Group:    "apps",
    Version:  "v1",
    Resource: "deployments",
}

// Get GVK
gvk, err := mapper.KindFor(gvr)
if err != nil {
    panic(err)
}

fmt.Printf("GVK: %s/%s/%s\\n", gvk.Group, gvk.Version, gvk.Kind)
// Output: GVK: apps/v1/Deployment

Resource Name Conversion

// Plural to singular
singular, err := mapper.ResourceSingularizer("pods")
fmt.Println(singular) // Output: pod

singular, err = mapper.ResourceSingularizer("deployments")
fmt.Println(singular) // Output: deployment

Shortcut Expansion

import "k8s.io/client-go/restmapper"

// Create shortcut expander
expander := restmapper.NewShortcutExpander(mapper, discoveryClient, nil)

// Expand shortcut to full GVR
gvr, err := expander.Expand("po") // "po" is shortcut for "pods"
// Returns: schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}

gvr, err = expander.Expand("deploy") // "deploy" is shortcut for "deployments"
// Returns: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}

Use Cases

1. Dynamic Resource Access

func getResource(kind, apiVersion, name, namespace string) (interface{}, error) {
    // Parse API version
    gv, err := schema.ParseGroupVersion(apiVersion)
    if err != nil {
        return nil, err
    }

    // Get GVR from GVK
    gvk := gv.WithKind(kind)
    mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
    if err != nil {
        return nil, err
    }

    // Use dynamic client with GVR
    return dynamicClient.Resource(mapping.Resource).
        Namespace(namespace).
        Get(context.TODO(), name, metav1.GetOptions{})
}

2. Resource Scope Detection

func isNamespaced(gvk schema.GroupVersionKind) (bool, error) {
    mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
    if err != nil {
        return false, err
    }

    return mapping.Scope.Name() == meta.RESTScopeNameNamespace, nil
}

3. Resource Validation

func resourceExists(gvr schema.GroupVersionResource) bool {
    _, err := mapper.KindFor(gvr)
    return err == nil
}

HTTP Transport Customization

Overview

The transport package provides low-level HTTP transport configuration and customization.

Package Information

  • Package: k8s.io/client-go/transport

Core Imports

import (
    "k8s.io/client-go/transport"
    "k8s.io/client-go/rest"
)

Transport Configuration

type Config struct {
    // TLS configuration
    TLS TLSConfig

    // Authentication
    Username    string
    Password    string
    BearerToken string

    // Impersonation
    Impersonate ImpersonateConfig

    // Custom wrappers
    WrapTransport transport.WrapperFunc

    // User agent
    UserAgent string

    // Dial function
    Dial func(ctx context.Context, network, address string) (net.Conn, error)
}

type TLSConfig struct {
    CAFile   string
    CertFile string
    KeyFile  string

    CAData   []byte
    CertData []byte
    KeyData  []byte

    ServerName string
    Insecure   bool

    NextProtos []string
}

Creating Custom Transport

import (
    "net/http"
    "k8s.io/client-go/transport"
)

// Create transport config
tlsConfig := &transport.Config{
    TLS: transport.TLSConfig{
        CAFile:   "/path/to/ca.crt",
        CertFile: "/path/to/client.crt",
        KeyFile:  "/path/to/client.key",
    },
    BearerToken: "my-token",
    UserAgent:   "my-app/v1.0.0",
}

// Build HTTP transport
httpTransport, err := transport.New(tlsConfig)
if err != nil {
    panic(err)
}

// Use with REST client
config := &rest.Config{
    Host:      "https://kubernetes.example.com",
    Transport: httpTransport,
}

Transport Wrappers

type WrapperFunc func(http.RoundTripper) http.RoundTripper

// Custom logging wrapper
func loggingWrapper(rt http.RoundTripper) http.RoundTripper {
    return &loggingRoundTripper{
        base: rt,
    }
}

type loggingRoundTripper struct {
    base http.RoundTripper
}

func (rt *loggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
    fmt.Printf("Request: %s %s\\n", req.Method, req.URL)

    resp, err := rt.base.RoundTrip(req)

    if err == nil {
        fmt.Printf("Response: %d\\n", resp.StatusCode)
    } else {
        fmt.Printf("Error: %v\\n", err)
    }

    return resp, err
}

// Apply wrapper to config
config.WrapTransport = loggingWrapper

Common Transport Patterns

1. Metrics Collection

func metricsWrapper(rt http.RoundTripper) http.RoundTripper {
    return &metricsRoundTripper{base: rt}
}

type metricsRoundTripper struct {
    base http.RoundTripper
}

func (rt *metricsRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
    start := time.Now()

    resp, err := rt.base.RoundTrip(req)

    duration := time.Since(start)
    // Record metrics: method, path, status, duration
    recordMetric(req.Method, req.URL.Path, resp.StatusCode, duration)

    return resp, err
}

2. Request Retry

func retryWrapper(rt http.RoundTripper, maxRetries int) http.RoundTripper {
    return &retryRoundTripper{
        base:       rt,
        maxRetries: maxRetries,
    }
}

type retryRoundTripper struct {
    base       http.RoundTripper
    maxRetries int
}

func (rt *retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
    var resp *http.Response
    var err error

    for i := 0; i <= rt.maxRetries; i++ {
        resp, err = rt.base.RoundTrip(req)

        if err == nil && resp.StatusCode < 500 {
            return resp, nil
        }

        if i < rt.maxRetries {
            time.Sleep(time.Second * time.Duration(i+1))
        }
    }

    return resp, err
}

3. Request Tracing

func tracingWrapper(rt http.RoundTripper) http.RoundTripper {
    return &tracingRoundTripper{base: rt}
}

type tracingRoundTripper struct {
    base http.RoundTripper
}

func (rt *tracingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
    // Start trace span
    span := startSpan(req.Context(), req.Method, req.URL.Path)
    defer span.Finish()

    // Add trace headers
    req.Header.Set("X-Trace-ID", span.TraceID())

    resp, err := rt.base.RoundTrip(req)

    // Record trace data
    if err != nil {
        span.SetError(err)
    } else {
        span.SetStatus(resp.StatusCode)
    }

    return resp, err
}

Feature Gates

Overview

Feature gates allow enabling/disabling experimental or alpha features in client-go.

Package Information

  • Package: k8s.io/client-go/features

Core Imports

import (
    "k8s.io/client-go/features"
    utilfeature "k8s.io/apiserver/pkg/util/feature"
)

Available Feature Gates

const (
    // Enable CBOR content type support
    ClientsAllowCBOR utilfeature.Feature = "ClientsAllowCBOR"

    // Prefer CBOR over JSON when available
    ClientsPreferCBOR utilfeature.Feature = "ClientsPreferCBOR"

    // In-order delivery guarantee for informers
    InOrderInformers utilfeature.Feature = "InOrderInformers"

    // Informer resource version handling
    InformerResourceVersion utilfeature.Feature = "InformerResourceVersion"

    // Watch list support
    WatchListClient utilfeature.Feature = "WatchListClient"
)

Checking Feature Status

import "k8s.io/client-go/features"

// Check if feature is enabled
if features.FeatureGates().Enabled(features.ClientsAllowCBOR) {
    fmt.Println("CBOR support is enabled")
}

// Check multiple features
cbor := features.FeatureGates().Enabled(features.ClientsAllowCBOR)
preferCBOR := features.FeatureGates().Enabled(features.ClientsPreferCBOR)

Enabling Feature Gates

Feature gates are typically set via environment variables or command-line flags:

// Programmatic enablement (for testing)
import (
    utilfeature "k8s.io/apiserver/pkg/util/feature"
    "k8s.io/component-base/featuregate"
)

// Enable specific feature
err := features.FeatureGates().Set("ClientsAllowCBOR=true")
if err != nil {
    panic(err)
}

// Enable multiple features
err = features.FeatureGates().Set("ClientsAllowCBOR=true,InOrderInformers=true")

Feature Gate Descriptions

ClientsAllowCBOR:

  • Allows CBOR (Concise Binary Object Representation) encoding
  • More efficient than JSON for large payloads
  • Requires server support

ClientsPreferCBOR:

  • Prefer CBOR over JSON when both are available
  • Falls back to JSON if server doesn't support CBOR

InOrderInformers:

  • Guarantees in-order delivery of informer events
  • May impact throughput
  • Useful for strict ordering requirements

InformerResourceVersion:

  • Enhanced resource version handling in informers
  • Improved consistency guarantees

WatchListClient:

  • Combines initial list and watch into single operation
  • Reduces API server load
  • More efficient for large resource sets

Testing Utilities

Overview

client-go provides fake implementations and testing utilities for writing tests without a real Kubernetes cluster.

Package Information

  • Main: k8s.io/client-go/kubernetes/fake
  • Dynamic: k8s.io/client-go/dynamic/fake
  • Testing: k8s.io/client-go/testing

Core Imports

import (
    "k8s.io/client-go/kubernetes/fake"
    "k8s.io/client-go/testing"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Fake Clientset

import (
    "testing"
    "k8s.io/client-go/kubernetes/fake"
    corev1 "k8s.io/api/core/v1"
)

func TestMyController(t *testing.T) {
    // Create initial objects
    pod := &corev1.Pod{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "test-pod",
            Namespace: "default",
        },
        Spec: corev1.PodSpec{
            Containers: []corev1.Container{
                {Name: "nginx", Image: "nginx:latest"},
            },
        },
    }

    // Create fake clientset with initial objects
    clientset := fake.NewSimpleClientset(pod)

    // Use clientset in tests
    pods, err := clientset.CoreV1().Pods("default").List(
        context.TODO(),
        metav1.ListOptions{})

    if err != nil {
        t.Fatalf("Error listing pods: %v", err)
    }

    if len(pods.Items) != 1 {
        t.Errorf("Expected 1 pod, got %d", len(pods.Items))
    }

    // Create new pod
    newPod := &corev1.Pod{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "test-pod-2",
            Namespace: "default",
        },
    }

    created, err := clientset.CoreV1().Pods("default").Create(
        context.TODO(),
        newPod,
        metav1.CreateOptions{})

    if err != nil {
        t.Fatalf("Error creating pod: %v", err)
    }
}

Custom Reactors

Reactors allow customizing fake client behavior:

import "k8s.io/client-go/testing"

func TestWithReactor(t *testing.T) {
    clientset := fake.NewSimpleClientset()

    // Add reactor for specific action
    clientset.PrependReactor("create", "pods", func(action testing.Action) (handled bool, ret runtime.Object, err error) {
        createAction := action.(testing.CreateAction)
        pod := createAction.GetObject().(*corev1.Pod)

        // Custom logic: reject pods without labels
        if len(pod.Labels) == 0 {
            return true, nil, errors.New("pods must have labels")
        }

        // Return false to let default handler process
        return false, nil, nil
    })

    // Test will use custom reactor
    pod := &corev1.Pod{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "test-pod",
            Namespace: "default",
            // No labels - will be rejected
        },
    }

    _, err := clientset.CoreV1().Pods("default").Create(
        context.TODO(), pod, metav1.CreateOptions{})

    if err == nil {
        t.Error("Expected error for pod without labels")
    }
}

Tracking Actions

func TestActionTracking(t *testing.T) {
    clientset := fake.NewSimpleClientset()

    // Perform operations
    clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{})
    clientset.CoreV1().Pods("default").Create(context.TODO(), &corev1.Pod{}, metav1.CreateOptions{})

    // Verify actions
    actions := clientset.Actions()
    if len(actions) != 2 {
        t.Errorf("Expected 2 actions, got %d", len(actions))
    }

    // Check action types
    listAction := actions[0]
    if listAction.GetVerb() != "list" {
        t.Errorf("Expected list action, got %s", listAction.GetVerb())
    }

    createAction := actions[1]
    if createAction.GetVerb() != "create" {
        t.Errorf("Expected create action, got %s", createAction.GetVerb())
    }
}

Fake Dynamic Client

import (
    "k8s.io/client-go/dynamic/fake"
    "k8s.io/apimachinery/pkg/runtime"
)

func TestDynamicClient(t *testing.T) {
    scheme := runtime.NewScheme()
    // Register types if needed

    // Create fake dynamic client
    dynamicClient := fake.NewSimpleDynamicClient(scheme, initialObjects...)

    // Use like regular dynamic client
    gvr := schema.GroupVersionResource{
        Group:    "apps",
        Version:  "v1",
        Resource: "deployments",
    }

    obj, err := dynamicClient.Resource(gvr).
        Namespace("default").
        Get(context.TODO(), "my-deployment", metav1.GetOptions{})
}

Testing Informers

import (
    "k8s.io/client-go/informers"
    "k8s.io/client-go/tools/cache"
)

func TestInformer(t *testing.T) {
    clientset := fake.NewSimpleClientset()

    // Create informer factory
    factory := informers.NewSharedInformerFactory(clientset, 0)
    podInformer := factory.Core().V1().Pods().Informer()

    // Add event handler
    called := false
    podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            called = true
        },
    })

    // Start informer
    stopCh := make(chan struct{})
    defer close(stopCh)
    factory.Start(stopCh)
    factory.WaitForCacheSync(stopCh)

    // Trigger event by creating pod
    clientset.CoreV1().Pods("default").Create(
        context.TODO(),
        &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test"}},
        metav1.CreateOptions{})

    // Give informer time to process
    time.Sleep(100 * time.Millisecond)

    if !called {
        t.Error("Event handler was not called")
    }
}

Object Tracker

import "k8s.io/client-go/testing"

func TestObjectTracker(t *testing.T) {
    scheme := runtime.NewScheme()
    corev1.AddToScheme(scheme)

    // Create object tracker
    tracker := testing.NewObjectTracker(scheme, codecs.UniversalDecoder())

    // Add objects
    pod := &corev1.Pod{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "test-pod",
            Namespace: "default",
        },
    }

    err := tracker.Add(pod)
    if err != nil {
        t.Fatalf("Error adding pod: %v", err)
    }

    // Retrieve object
    gvr := corev1.SchemeGroupVersion.WithResource("pods")
    obj, err := tracker.Get(gvr, "default", "test-pod")

    retrieved := obj.(*corev1.Pod)
    if retrieved.Name != "test-pod" {
        t.Errorf("Expected pod name test-pod, got %s", retrieved.Name)
    }
}

Best Practices for Testing

1. Use Fake Clients

  • Fast - no network calls
  • Deterministic - no timing issues
  • Isolated - no side effects

2. Verify Actions

  • Check all API calls made
  • Verify correct parameters
  • Ensure proper error handling

3. Test Event Handlers

  • Verify handler is called
  • Check handler behavior
  • Test error conditions

4. Mock External Dependencies

  • Use interfaces for dependencies
  • Create test doubles
  • Inject fake implementations

5. Use Table-Driven Tests

func TestScaling(t *testing.T) {
    tests := []struct {
        name            string
        initialReplicas int32
        desiredReplicas int32
        expectError     bool
    }{
        {"scale up", 1, 3, false},
        {"scale down", 5, 2, false},
        {"scale to zero", 3, 0, false},
        {"negative replicas", 3, -1, true},
    }

    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            // Test implementation
        })
    }
}

Install with Tessl CLI

npx tessl i tessl/golang-k8s-io--client-go

docs

index.md

tile.json