A set of go libraries for building Kubernetes controllers with structured abstractions for managers, reconcilers, clients, caches, webhooks, and testing
The core framework provides the foundational abstractions for building Kubernetes controllers. It includes the Manager for lifecycle management, Controller for event processing, and Reconciler for implementing business logic.
The controller-runtime core framework implements the standard Kubernetes controller pattern:
Import Path: sigs.k8s.io/controller-runtime/pkg/manager
The Manager is the central component that manages the lifecycle of all controllers, caches, and webhooks. It provides shared dependencies and coordinates startup/shutdown.
package manager
import (
"context"
"net/http"
"github.com/go-logr/logr"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
// Manager is required to create Controllers and provides shared dependencies
type Manager interface {
// Cluster provides access to clients, caches, and schemes
cluster.Cluster
// Add adds a Runnable to the Manager
Add(Runnable) error
// Elected returns a channel that is closed when this manager is elected leader
Elected() <-chan struct{}
// AddMetricsServerExtraHandler adds an extra handler to the metrics server
AddMetricsServerExtraHandler(path string, handler http.Handler) error
// AddHealthzCheck adds a health check
AddHealthzCheck(name string, check healthz.Checker) error
// AddReadyzCheck adds a readiness check
AddReadyzCheck(name string, check healthz.Checker) error
// Start starts all registered Controllers and blocks until the context is cancelled
Start(ctx context.Context) error
// GetWebhookServer returns the webhook server
GetWebhookServer() webhook.Server
// GetLogger returns the logger
GetLogger() logr.Logger
// GetControllerOptions returns controller-specific configuration options
GetControllerOptions() config.Controller
}// New creates a new Manager for managing Controllers
func New(config *rest.Config, options Options) (Manager, error)type Options struct {
// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds
Scheme *runtime.Scheme
// MapperProvider provides the RESTMapper used to map Go types to Kubernetes APIs
MapperProvider func(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error)
// Cache configuration for the Manager's cache
Cache cache.Options
// NewCache is a function to create a new cache
NewCache cache.NewCacheFunc
// Client configuration
Client client.Options
// NewClient is a function to create a new client
NewClient client.NewClientFunc
// Logger is the logger that should be used by this manager
Logger logr.Logger
// LeaderElection determines whether to use leader election
LeaderElection bool
// LeaderElectionResourceLock determines which resource lock to use for leader election
LeaderElectionResourceLock string
// LeaderElectionNamespace determines the namespace in which the leader election resource will be created
LeaderElectionNamespace string
// LeaderElectionID determines the name of the resource that leader election will use for holding the leader lock
LeaderElectionID string
// LeaderElectionConfig can be specified to override the default config used for leader election
LeaderElectionConfig *rest.Config
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily on shutdown
LeaderElectionReleaseOnCancel bool
// LeaderElectionLabels are labels to apply to the leader election resource
LeaderElectionLabels map[string]string
// LeaderElectionResourceLockInterface allows providing a custom resourcelock.Interface
LeaderElectionResourceLockInterface resourcelock.Interface
// LeaseDuration is the duration that non-leader candidates will wait to force acquire leadership
LeaseDuration *time.Duration
// RenewDeadline is the duration that the acting leader will retry refreshing leadership before giving up
RenewDeadline *time.Duration
// RetryPeriod is the duration the LeaderElector clients should wait between tries of actions
RetryPeriod *time.Duration
// Metrics contains options for configuring the metrics server
Metrics metricsserver.Options
// HealthProbeBindAddress is the TCP address that the controller should bind to for serving health probes
HealthProbeBindAddress string
// ReadinessEndpointName is the name of the readiness endpoint
ReadinessEndpointName string
// LivenessEndpointName is the name of the liveness endpoint
LivenessEndpointName string
// PprofBindAddress is the TCP address that the controller should bind to for serving pprof
PprofBindAddress string
// WebhookServer is the webhook server
WebhookServer webhook.Server
// BaseContext is the base context to use for all requests
BaseContext BaseContextFunc
// GracefulShutdownTimeout is the duration given to runnable to stop before the manager actually returns on stop
GracefulShutdownTimeout *time.Duration
// Controller contains global controller options
Controller config.Controller
}// Runnable allows a component to be started
type Runnable interface {
// Start starts running the component. The component will stop when the context is closed
Start(context.Context) error
}// RunnableFunc implements Runnable using a function
type RunnableFunc func(context.Context) error
func (r RunnableFunc) Start(ctx context.Context) error// LeaderElectionRunnable knows whether it should be run with leader election
type LeaderElectionRunnable interface {
// NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode
NeedLeaderElection() bool
}// BaseContextFunc is used to provide a base Context to Runnables managed by a Manager
type BaseContextFunc func() context.Context// Server is a general purpose HTTP server
type Server struct {
// Name is a unique name for this server
Name string
// Server is the HTTP server
Server *http.Server
// Listener is the network listener
Listener net.Listener
// OnlyServeWhenLeader determines if the server should only be started when the manager is the leader
OnlyServeWhenLeader bool
// ShutdownTimeout is the timeout for server shutdown
ShutdownTimeout *time.Duration
}
func (s *Server) NeedLeaderElection() bool
func (s *Server) Start(ctx context.Context) errorpackage main
import (
"context"
"os"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
func main() {
// Setup logger
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
// Create scheme
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
// Create manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), manager.Options{
Scheme: scheme,
LeaderElection: true,
LeaderElectionID: "my-controller-leader-election",
LeaderElectionNamespace: "default",
HealthProbeBindAddress: ":8081",
})
if err != nil {
ctrl.Log.Error(err, "unable to create manager")
os.Exit(1)
}
// Add health checks
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
ctrl.Log.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
ctrl.Log.Error(err, "unable to set up ready check")
os.Exit(1)
}
// Start manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
ctrl.Log.Error(err, "problem running manager")
os.Exit(1)
}
}Import Path: sigs.k8s.io/controller-runtime/pkg/controller
Controllers implement Kubernetes APIs by responding to events and ensuring the actual state matches the desired state.
package controller
import (
"context"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
// Controller implements a Kubernetes API
type Controller interface {
// reconcile.Reconciler processes reconcile requests
reconcile.Reconciler
// Watch watches a source of events and enqueues reconcile requests
Watch(src source.Source) error
// Start starts the controller
Start(ctx context.Context) error
// GetLogger returns the logger
GetLogger() logr.Logger
}// New creates a new Controller registered with the Manager
func New(name string, mgr manager.Manager, options Options) (Controller, error)
// NewUnmanaged creates a new Controller that is not registered with a Manager
func NewUnmanaged(name string, options Options) (Controller, error)// TypedController is a generic controller that works with custom request types
type TypedController[request comparable] interface {
reconcile.TypedReconciler[request]
Watch(src source.TypedSource[request]) error
Start(ctx context.Context) error
GetLogger() logr.Logger
}
func NewTyped[request comparable](name string, mgr manager.Manager, options TypedOptions[request]) (TypedController[request], error)
func NewTypedUnmanaged[request comparable](name string, options TypedOptions[request]) (TypedController[request], error)type Options struct {
// SkipNameValidation allows skipping controller name validation
SkipNameValidation *bool
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles
MaxConcurrentReconciles int
// CacheSyncTimeout is the timeout for waiting for cache to sync
CacheSyncTimeout time.Duration
// RecoverPanic indicates whether the controller should recover from panics
RecoverPanic *bool
// NeedLeaderElection indicates whether this controller needs leader election
NeedLeaderElection *bool
// Reconciler is the reconcile.Reconciler to call with enqueued reconcile.Requests
Reconciler reconcile.Reconciler
// RateLimiter is used to limit how frequently requests may be queued
RateLimiter workqueue.RateLimiter
// NewQueue constructs the queue for this controller
NewQueue func(controllerName string, rateLimiter workqueue.RateLimiter) workqueue.RateLimitingInterface
// Logger is the logger for this controller
Logger logr.Logger
// LogConstructor constructs a logger for a reconcile request
LogConstructor func(request *reconcile.Request) logr.Logger
// UsePriorityQueue indicates whether to use a priority queue
UsePriorityQueue *bool
// EnableWarmup enables controller warmup on startup
EnableWarmup *bool
// ReconciliationTimeout is the maximum duration for a single reconcile
ReconciliationTimeout time.Duration
}type TypedOptions[request comparable] struct {
SkipNameValidation *bool
MaxConcurrentReconciles int
CacheSyncTimeout time.Duration
RecoverPanic *bool
NeedLeaderElection *bool
Reconciler reconcile.TypedReconciler[request]
RateLimiter workqueue.TypedRateLimiter[request]
NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request]
Logger logr.Logger
LogConstructor func(request *request) logr.Logger
UsePriorityQueue *bool
EnableWarmup *bool
ReconciliationTimeout time.Duration
}
func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller)// ReconcileIDFromContext gets the reconcileID from the current context
var ReconcileIDFromContext func(context.Context) (string, bool)Import Path: sigs.k8s.io/controller-runtime/pkg/reconcile
The reconcile package defines the Reconciler interface for implementing Kubernetes API controllers.
package reconcile
import (
"context"
"time"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// Reconciler implements a Kubernetes API for a specific Resource by Creating, Updating or Deleting Kubernetes objects
type Reconciler interface {
// Reconcile performs a full reconciliation for the object referred to by the Request
Reconcile(context.Context, Request) (Result, error)
}// TypedReconciler is a generic reconciler interface
type TypedReconciler[request comparable] interface {
Reconcile(context.Context, request) (Result, error)
}// Request contains the information necessary to reconcile a Kubernetes object
type Request struct {
// NamespacedName contains the name and namespace of the object to reconcile
types.NamespacedName
}// Result contains the result of a Reconciler invocation
type Result struct {
// Requeue tells the Controller to requeue the reconcile key
// Deprecated: use RequeueAfter instead
Requeue bool
// RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration
RequeueAfter time.Duration
}
func (r *Result) IsZero() bool// Func is a function that implements Reconciler
type Func func(context.Context, Request) (Result, error)
func (r Func) Reconcile(ctx context.Context, req Request) (Result, error)// TypedFunc is a typed function that implements TypedReconciler
type TypedFunc[request comparable] func(context.Context, request) (Result, error)
func (r TypedFunc[request]) Reconcile(ctx context.Context, req request) (Result, error)// ObjectReconciler is a specialized reconciler that acts on instances of client.Object
type ObjectReconciler[object client.Object] interface {
Reconcile(context.Context, object) (Result, error)
}
// AsReconciler converts an ObjectReconciler to a standard Reconciler
func AsReconciler[object client.Object](client client.Client, rec ObjectReconciler[object]) Reconciler// TerminalError wraps an error to indicate that reconciliation should not be retried
func TerminalError(wrapped error) errorpackage controllers
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type PodReconciler struct {
client.Client
Scheme *runtime.Scheme
}
func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := log.FromContext(ctx)
// Fetch the Pod instance
var pod corev1.Pod
if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
if errors.IsNotFound(err) {
// Object not found, could have been deleted after reconcile request
log.Info("Pod not found, likely deleted")
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request
log.Error(err, "Failed to get Pod")
return reconcile.Result{}, err
}
// Implement your reconciliation logic here
log.Info("Reconciling Pod", "name", pod.Name, "namespace", pod.Namespace)
// Add a label if it doesn't exist
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
if _, exists := pod.Labels["reconciled"]; !exists {
pod.Labels["reconciled"] = "true"
if err := r.Update(ctx, &pod); err != nil {
log.Error(err, "Failed to update Pod")
return reconcile.Result{}, err
}
log.Info("Updated Pod with reconciled label")
}
// Reconciliation successful
return reconcile.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager
func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Complete(r)
}Import Path: sigs.k8s.io/controller-runtime/pkg/cluster
The Cluster interface provides access to various components for interacting with a Kubernetes cluster.
package cluster
import (
"context"
"net/http"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// Cluster provides various methods to interact with a cluster
type Cluster interface {
// GetHTTPClient returns the HTTP client configured for this cluster
GetHTTPClient() *http.Client
// GetConfig returns the rest.Config for this cluster
GetConfig() *rest.Config
// GetCache returns the cache for this cluster
GetCache() cache.Cache
// GetScheme returns the scheme for this cluster
GetScheme() *runtime.Scheme
// GetClient returns a client configured for this cluster
GetClient() client.Client
// GetFieldIndexer returns the field indexer for this cluster
GetFieldIndexer() client.FieldIndexer
// GetEventRecorderFor returns an EventRecorder for the given name
GetEventRecorderFor(name string) record.EventRecorder
// GetRESTMapper returns the REST mapper for this cluster
GetRESTMapper() meta.RESTMapper
// GetAPIReader returns a reader that reads directly from the API server
GetAPIReader() client.Reader
// Start starts the cluster
Start(ctx context.Context) error
}// New creates a new Cluster
func New(config *rest.Config, opts ...Option) (Cluster, error)// Option can be used to manipulate Options
type Option func(*Options)
type Options struct {
// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds
Scheme *runtime.Scheme
// MapperProvider provides the RESTMapper
MapperProvider func(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error)
// Logger is the logger that should be used by this cluster
Logger logr.Logger
// HTTPClient is the HTTP client to use for requests
HTTPClient *http.Client
// Cache is the cache configuration
Cache cache.Options
// NewCache is a function to create a new cache
NewCache cache.NewCacheFunc
// Client is the client configuration
Client client.Options
// NewClient is a function to create a new client
NewClient client.NewClientFunc
}Import Path: sigs.k8s.io/controller-runtime/pkg/config
The config package provides configuration types for controllers.
package config
import (
"time"
"github.com/go-logr/logr"
)
// Controller contains configuration options for controllers
type Controller struct {
// SkipNameValidation allows skipping controller name validation
SkipNameValidation *bool
// GroupKindConcurrency is a map from GroupKind to the number of concurrent reconciles allowed for that kind
GroupKindConcurrency map[string]int
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles
MaxConcurrentReconciles int
// CacheSyncTimeout refers to the time limit for caches to sync
CacheSyncTimeout time.Duration
// RecoverPanic indicates whether the controller should recover from panics
RecoverPanic *bool
// NeedLeaderElection indicates whether this controller needs leader election
NeedLeaderElection *bool
// EnableWarmup enables controller warmup on startup
EnableWarmup *bool
// UsePriorityQueue indicates whether to use a priority queue
UsePriorityQueue *bool
// Logger is the logger for this controller
Logger logr.Logger
// ReconciliationTimeout is the maximum duration for a single reconcile
ReconciliationTimeout time.Duration
}Import Path: sigs.k8s.io/controller-runtime
// GroupResource specifies a Group and a Resource, but does not force a version
type GroupResource = schema.GroupResourceInstall with Tessl CLI
npx tessl i tessl/golang-sigs-k8s-io--controller-runtime