The core framework provides the foundational abstractions for building Kubernetes controllers. It includes the Manager for lifecycle management, Controller for event processing, and Reconciler for implementing business logic.
The controller-runtime core framework implements the standard Kubernetes controller pattern:
Import Path: sigs.k8s.io/controller-runtime/pkg/manager
The Manager is the central component that manages the lifecycle of all controllers, caches, and webhooks. It provides shared dependencies and coordinates startup/shutdown.
package manager
import (
"context"
"net/http"
"github.com/go-logr/logr"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
// Manager is required to create Controllers and provides shared dependencies
type Manager interface {
// Cluster provides access to clients, caches, and schemes
cluster.Cluster
// Add adds a Runnable to the Manager
Add(Runnable) error
// Elected returns a channel that is closed when this manager is elected leader
Elected() <-chan struct{}
// AddMetricsServerExtraHandler adds an extra handler to the metrics server
AddMetricsServerExtraHandler(path string, handler http.Handler) error
// AddHealthzCheck adds a health check
AddHealthzCheck(name string, check healthz.Checker) error
// AddReadyzCheck adds a readiness check
AddReadyzCheck(name string, check healthz.Checker) error
// Start starts all registered Controllers and blocks until the context is cancelled
Start(ctx context.Context) error
// GetWebhookServer returns the webhook server
GetWebhookServer() webhook.Server
// GetLogger returns the logger
GetLogger() logr.Logger
// GetControllerOptions returns controller-specific configuration options
GetControllerOptions() config.Controller
}// New creates a new Manager for managing Controllers
func New(config *rest.Config, options Options) (Manager, error)type Options struct {
// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds
Scheme *runtime.Scheme
// MapperProvider provides the RESTMapper used to map Go types to Kubernetes APIs
MapperProvider func(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error)
// Cache configuration for the Manager's cache
Cache cache.Options
// NewCache is a function to create a new cache
NewCache cache.NewCacheFunc
// Client configuration
Client client.Options
// NewClient is a function to create a new client
NewClient client.NewClientFunc
// Logger is the logger that should be used by this manager
Logger logr.Logger
// LeaderElection determines whether to use leader election
LeaderElection bool
// LeaderElectionResourceLock determines which resource lock to use for leader election
LeaderElectionResourceLock string
// LeaderElectionNamespace determines the namespace in which the leader election resource will be created
LeaderElectionNamespace string
// LeaderElectionID determines the name of the resource that leader election will use for holding the leader lock
LeaderElectionID string
// LeaderElectionConfig can be specified to override the default config used for leader election
LeaderElectionConfig *rest.Config
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily on shutdown
LeaderElectionReleaseOnCancel bool
// LeaderElectionLabels are labels to apply to the leader election resource
LeaderElectionLabels map[string]string
// LeaderElectionResourceLockInterface allows providing a custom resourcelock.Interface
LeaderElectionResourceLockInterface resourcelock.Interface
// LeaseDuration is the duration that non-leader candidates will wait to force acquire leadership
LeaseDuration *time.Duration
// RenewDeadline is the duration that the acting leader will retry refreshing leadership before giving up
RenewDeadline *time.Duration
// RetryPeriod is the duration the LeaderElector clients should wait between tries of actions
RetryPeriod *time.Duration
// Metrics contains options for configuring the metrics server
Metrics metricsserver.Options
// HealthProbeBindAddress is the TCP address that the controller should bind to for serving health probes
HealthProbeBindAddress string
// ReadinessEndpointName is the name of the readiness endpoint
ReadinessEndpointName string
// LivenessEndpointName is the name of the liveness endpoint
LivenessEndpointName string
// PprofBindAddress is the TCP address that the controller should bind to for serving pprof
PprofBindAddress string
// WebhookServer is the webhook server
WebhookServer webhook.Server
// BaseContext is the base context to use for all requests
BaseContext BaseContextFunc
// GracefulShutdownTimeout is the duration given to runnable to stop before the manager actually returns on stop
GracefulShutdownTimeout *time.Duration
// Controller contains global controller options
Controller config.Controller
}// Runnable allows a component to be started
type Runnable interface {
// Start starts running the component. The component will stop when the context is closed
Start(context.Context) error
}// RunnableFunc implements Runnable using a function
type RunnableFunc func(context.Context) error
func (r RunnableFunc) Start(ctx context.Context) error// LeaderElectionRunnable knows whether it should be run with leader election
type LeaderElectionRunnable interface {
// NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode
NeedLeaderElection() bool
}// BaseContextFunc is used to provide a base Context to Runnables managed by a Manager
type BaseContextFunc func() context.Context// Server is a general purpose HTTP server
type Server struct {
// Name is a unique name for this server
Name string
// Server is the HTTP server
Server *http.Server
// Listener is the network listener
Listener net.Listener
// OnlyServeWhenLeader determines if the server should only be started when the manager is the leader
OnlyServeWhenLeader bool
// ShutdownTimeout is the timeout for server shutdown
ShutdownTimeout *time.Duration
}
func (s *Server) NeedLeaderElection() bool
func (s *Server) Start(ctx context.Context) errorpackage main
import (
"context"
"os"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
func main() {
// Setup logger
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
// Create scheme
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
// Create manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), manager.Options{
Scheme: scheme,
LeaderElection: true,
LeaderElectionID: "my-controller-leader-election",
LeaderElectionNamespace: "default",
HealthProbeBindAddress: ":8081",
})
if err != nil {
ctrl.Log.Error(err, "unable to create manager")
os.Exit(1)
}
// Add health checks
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
ctrl.Log.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
ctrl.Log.Error(err, "unable to set up ready check")
os.Exit(1)
}
// Start manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
ctrl.Log.Error(err, "problem running manager")
os.Exit(1)
}
}Import Path: sigs.k8s.io/controller-runtime/pkg/controller
Controllers implement Kubernetes APIs by responding to events and ensuring the actual state matches the desired state.
package controller
import (
"context"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
// Controller implements a Kubernetes API
type Controller interface {
// reconcile.Reconciler processes reconcile requests
reconcile.Reconciler
// Watch watches a source of events and enqueues reconcile requests
Watch(src source.Source) error
// Start starts the controller
Start(ctx context.Context) error
// GetLogger returns the logger
GetLogger() logr.Logger
}// New creates a new Controller registered with the Manager
func New(name string, mgr manager.Manager, options Options) (Controller, error)
// NewUnmanaged creates a new Controller that is not registered with a Manager
func NewUnmanaged(name string, options Options) (Controller, error)// TypedController is a generic controller that works with custom request types
type TypedController[request comparable] interface {
reconcile.TypedReconciler[request]
Watch(src source.TypedSource[request]) error
Start(ctx context.Context) error
GetLogger() logr.Logger
}
func NewTyped[request comparable](name string, mgr manager.Manager, options TypedOptions[request]) (TypedController[request], error)
func NewTypedUnmanaged[request comparable](name string, options TypedOptions[request]) (TypedController[request], error)type Options struct {
// SkipNameValidation allows skipping controller name validation
SkipNameValidation *bool
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles
MaxConcurrentReconciles int
// CacheSyncTimeout is the timeout for waiting for cache to sync
CacheSyncTimeout time.Duration
// RecoverPanic indicates whether the controller should recover from panics
RecoverPanic *bool
// NeedLeaderElection indicates whether this controller needs leader election
NeedLeaderElection *bool
// Reconciler is the reconcile.Reconciler to call with enqueued reconcile.Requests
Reconciler reconcile.Reconciler
// RateLimiter is used to limit how frequently requests may be queued
RateLimiter workqueue.RateLimiter
// NewQueue constructs the queue for this controller
NewQueue func(controllerName string, rateLimiter workqueue.RateLimiter) workqueue.RateLimitingInterface
// Logger is the logger for this controller
Logger logr.Logger
// LogConstructor constructs a logger for a reconcile request
LogConstructor func(request *reconcile.Request) logr.Logger
// UsePriorityQueue indicates whether to use a priority queue
UsePriorityQueue *bool
// EnableWarmup enables controller warmup on startup
EnableWarmup *bool
// ReconciliationTimeout is the maximum duration for a single reconcile
ReconciliationTimeout time.Duration
}type TypedOptions[request comparable] struct {
SkipNameValidation *bool
MaxConcurrentReconciles int
CacheSyncTimeout time.Duration
RecoverPanic *bool
NeedLeaderElection *bool
Reconciler reconcile.TypedReconciler[request]
RateLimiter workqueue.TypedRateLimiter[request]
NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request]
Logger logr.Logger
LogConstructor func(request *request) logr.Logger
UsePriorityQueue *bool
EnableWarmup *bool
ReconciliationTimeout time.Duration
}
func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller)// ReconcileIDFromContext gets the reconcileID from the current context
var ReconcileIDFromContext func(context.Context) (string, bool)Import Path: sigs.k8s.io/controller-runtime/pkg/reconcile
The reconcile package defines the Reconciler interface for implementing Kubernetes API controllers.
package reconcile
import (
"context"
"time"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// Reconciler implements a Kubernetes API for a specific Resource by Creating, Updating or Deleting Kubernetes objects
type Reconciler interface {
// Reconcile performs a full reconciliation for the object referred to by the Request
Reconcile(context.Context, Request) (Result, error)
}// TypedReconciler is a generic reconciler interface
type TypedReconciler[request comparable] interface {
Reconcile(context.Context, request) (Result, error)
}// Request contains the information necessary to reconcile a Kubernetes object
type Request struct {
// NamespacedName contains the name and namespace of the object to reconcile
types.NamespacedName
}// Result contains the result of a Reconciler invocation
type Result struct {
// Requeue tells the Controller to requeue the reconcile key
// Deprecated: use RequeueAfter instead
Requeue bool
// RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration
RequeueAfter time.Duration
}
func (r *Result) IsZero() bool// Func is a function that implements Reconciler
type Func func(context.Context, Request) (Result, error)
func (r Func) Reconcile(ctx context.Context, req Request) (Result, error)// TypedFunc is a typed function that implements TypedReconciler
type TypedFunc[request comparable] func(context.Context, request) (Result, error)
func (r TypedFunc[request]) Reconcile(ctx context.Context, req request) (Result, error)// ObjectReconciler is a specialized reconciler that acts on instances of client.Object
type ObjectReconciler[object client.Object] interface {
Reconcile(context.Context, object) (Result, error)
}
// AsReconciler converts an ObjectReconciler to a standard Reconciler
func AsReconciler[object client.Object](client client.Client, rec ObjectReconciler[object]) Reconciler// TerminalError wraps an error to indicate that reconciliation should not be retried
func TerminalError(wrapped error) errorpackage controllers
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type PodReconciler struct {
client.Client
Scheme *runtime.Scheme
}
func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := log.FromContext(ctx)
// Fetch the Pod instance
var pod corev1.Pod
if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
if errors.IsNotFound(err) {
// Object not found, could have been deleted after reconcile request
log.Info("Pod not found, likely deleted")
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request
log.Error(err, "Failed to get Pod")
return reconcile.Result{}, err
}
// Implement your reconciliation logic here
log.Info("Reconciling Pod", "name", pod.Name, "namespace", pod.Namespace)
// Add a label if it doesn't exist
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
if _, exists := pod.Labels["reconciled"]; !exists {
pod.Labels["reconciled"] = "true"
if err := r.Update(ctx, &pod); err != nil {
log.Error(err, "Failed to update Pod")
return reconcile.Result{}, err
}
log.Info("Updated Pod with reconciled label")
}
// Reconciliation successful
return reconcile.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager
func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Complete(r)
}Import Path: sigs.k8s.io/controller-runtime/pkg/cluster
The Cluster interface provides access to various components for interacting with a Kubernetes cluster.
package cluster
import (
"context"
"net/http"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// Cluster provides various methods to interact with a cluster
type Cluster interface {
// GetHTTPClient returns the HTTP client configured for this cluster
GetHTTPClient() *http.Client
// GetConfig returns the rest.Config for this cluster
GetConfig() *rest.Config
// GetCache returns the cache for this cluster
GetCache() cache.Cache
// GetScheme returns the scheme for this cluster
GetScheme() *runtime.Scheme
// GetClient returns a client configured for this cluster
GetClient() client.Client
// GetFieldIndexer returns the field indexer for this cluster
GetFieldIndexer() client.FieldIndexer
// GetEventRecorderFor returns an EventRecorder for the given name
GetEventRecorderFor(name string) record.EventRecorder
// GetRESTMapper returns the REST mapper for this cluster
GetRESTMapper() meta.RESTMapper
// GetAPIReader returns a reader that reads directly from the API server
GetAPIReader() client.Reader
// Start starts the cluster
Start(ctx context.Context) error
}// New creates a new Cluster
func New(config *rest.Config, opts ...Option) (Cluster, error)// Option can be used to manipulate Options
type Option func(*Options)
type Options struct {
// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds
Scheme *runtime.Scheme
// MapperProvider provides the RESTMapper
MapperProvider func(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error)
// Logger is the logger that should be used by this cluster
Logger logr.Logger
// HTTPClient is the HTTP client to use for requests
HTTPClient *http.Client
// Cache is the cache configuration
Cache cache.Options
// NewCache is a function to create a new cache
NewCache cache.NewCacheFunc
// Client is the client configuration
Client client.Options
// NewClient is a function to create a new client
NewClient client.NewClientFunc
}Import Path: sigs.k8s.io/controller-runtime/pkg/config
The config package provides configuration types for controllers.
package config
import (
"time"
"github.com/go-logr/logr"
)
// Controller contains configuration options for controllers
type Controller struct {
// SkipNameValidation allows skipping controller name validation
SkipNameValidation *bool
// GroupKindConcurrency is a map from GroupKind to the number of concurrent reconciles allowed for that kind
GroupKindConcurrency map[string]int
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles
MaxConcurrentReconciles int
// CacheSyncTimeout refers to the time limit for caches to sync
CacheSyncTimeout time.Duration
// RecoverPanic indicates whether the controller should recover from panics
RecoverPanic *bool
// NeedLeaderElection indicates whether this controller needs leader election
NeedLeaderElection *bool
// EnableWarmup enables controller warmup on startup
EnableWarmup *bool
// UsePriorityQueue indicates whether to use a priority queue
UsePriorityQueue *bool
// Logger is the logger for this controller
Logger logr.Logger
// ReconciliationTimeout is the maximum duration for a single reconcile
ReconciliationTimeout time.Duration
}Import Path: sigs.k8s.io/controller-runtime
// GroupResource specifies a Group and a Resource, but does not force a version
type GroupResource = schema.GroupResource