A set of go libraries for building Kubernetes controllers with structured abstractions for managers, reconcilers, clients, caches, webhooks, and testing
Supporting services include structured logging with logr, Prometheus metrics integration, health checks, leader election for high availability, and signal handling for graceful shutdown.
Controller-runtime provides several supporting services:
Import Path: sigs.k8s.io/controller-runtime/pkg/log
The log package provides utilities for structured logging.
package log
import (
"context"
"github.com/go-logr/logr"
)
// Log is the base logger used by controller-runtime
var Log logr.Logger
// SetLogger sets the logger implementation for the package
func SetLogger(l logr.Logger)
// FromContext returns a logger from the context
func FromContext(ctx context.Context, keysAndValues ...interface{}) logr.Logger
// IntoContext returns a context with the logger attached
func IntoContext(ctx context.Context, log logr.Logger) context.ContextNote: The LoggerInto variable in the root controllerruntime package is an alias for log.IntoContext:
// LoggerInto takes a context and sets the logger as one of its keys
var LoggerInto = log.IntoContext// NullLogSink is a logr.LogSink that does nothing
type NullLogSink struct{}
func (NullLogSink) Enabled(level int) bool
func (NullLogSink) Error(_ error, _ string, _ ...interface{})
func (NullLogSink) Info(_ int, _ string, _ ...interface{})
func (log NullLogSink) Init(logr.RuntimeInfo)
func (log NullLogSink) WithName(_ string) logr.LogSink
func (log NullLogSink) WithValues(_ ...interface{}) logr.LogSink// KubeAPIWarningLogger logs warnings from the Kubernetes API
type KubeAPIWarningLogger struct {
// Has unexported fields
}
// NewKubeAPIWarningLogger creates a new warning logger
func NewKubeAPIWarningLogger(opts KubeAPIWarningLoggerOptions) *KubeAPIWarningLogger
type KubeAPIWarningLoggerOptions struct {
// Deduplicate indicates whether to deduplicate warnings
Deduplicate bool
}
func (l *KubeAPIWarningLogger) HandleWarningHeaderWithContext(ctx context.Context, code int, _ string, message string)package main
import (
"context"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
func main() {
// Setup logger
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
// Get logger
logger := ctrl.Log.WithName("main")
logger.Info("starting application")
// Use logger in context
ctx := log.IntoContext(context.Background(), logger)
// Retrieve logger from context
log := log.FromContext(ctx)
log.Info("operation completed", "result", "success")
}Import Path: sigs.k8s.io/controller-runtime/pkg/log/zap
Integration with Uber's Zap logging library.
package zap
import (
"flag"
"io"
"github.com/go-logr/logr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// New creates a new logr.Logger using zap
func New(opts ...Opts) logr.Logger
// NewRaw creates a new zap.Logger
func NewRaw(opts ...Opts) *zap.Logger
// UseDevMode sets development mode
func UseDevMode(enabled bool) Opts
// WriteTo sets the output destination
func WriteTo(out io.Writer) Opts
// UseFlagOptions applies flag-provided options
func UseFlagOptions(in *Options) Opts
// Level sets the minimum log level
func Level(level zapcore.LevelEnabler) func(o *Options)
// StacktraceLevel sets the level at which to include stack traces
func StacktraceLevel(stacktraceLevel zapcore.LevelEnabler) func(o *Options)
// Encoder sets the encoder
func Encoder(encoder zapcore.Encoder) func(o *Options)
// JSONEncoder sets JSON encoding
func JSONEncoder(opts ...EncoderConfigOption) func(o *Options)
// ConsoleEncoder sets console encoding
func ConsoleEncoder(opts ...EncoderConfigOption) func(o *Options)
// RawZapOpts allows passing raw zap options
func RawZapOpts(zapOpts ...zap.Option) func(o *Options)// Opts is a functional option for configuring zap
type Opts func(*Options)
// Options contains zap configuration
type Options struct {
// Development puts the logger in development mode
Development bool
// Encoder is the encoder to use
Encoder zapcore.Encoder
// EncoderConfigOptions are options for the encoder config
EncoderConfigOptions []EncoderConfigOption
// NewEncoder is a function to create a new encoder
NewEncoder NewEncoderFunc
// DestWriter is the destination for log output
DestWriter io.Writer
// Level is the minimum enabled logging level
Level zapcore.LevelEnabler
// StacktraceLevel is the level at which stacktraces are recorded
StacktraceLevel zapcore.LevelEnabler
// ZapOpts are additional zap options
ZapOpts []zap.Option
// TimeEncoder is the time encoder
TimeEncoder zapcore.TimeEncoder
}
func (o *Options) BindFlags(fs *flag.FlagSet)// EncoderConfigOption is an option for encoder config
type EncoderConfigOption func(*zapcore.EncoderConfig)
// NewEncoderFunc is a function to create an encoder
type NewEncoderFunc func(...EncoderConfigOption) zapcore.Encoder// KubeAwareEncoder wraps a zapcore.Encoder with Kubernetes-specific formatting
type KubeAwareEncoder struct {
zapcore.Encoder
Verbose bool
}
func (k *KubeAwareEncoder) Clone() zapcore.Encoder
func (k *KubeAwareEncoder) EncodeEntry(entry zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error)package main
import (
"flag"
"os"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"go.uber.org/zap/zapcore"
)
func main() {
var enableDevMode bool
flag.BoolVar(&enableDevMode, "dev", false, "Enable development mode")
flag.Parse()
// Create logger with options
opts := zap.Options{
Development: enableDevMode,
Level: zapcore.InfoLevel,
TimeEncoder: zapcore.ISO8601TimeEncoder,
}
opts.BindFlags(flag.CommandLine)
logger := zap.New(
zap.UseFlagOptions(&opts),
zap.WriteTo(os.Stdout),
)
ctrl.SetLogger(logger)
logger.Info("application started",
"version", "1.0.0",
"port", 8080,
)
}Import Path: sigs.k8s.io/controller-runtime/pkg/metrics
Prometheus metrics integration.
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
// RegistererGatherer combines Registerer and Gatherer interfaces
type RegistererGatherer interface {
prometheus.Registerer
prometheus.Gatherer
}
// Registry is the default prometheus registry for controller-runtime
var Registry RegistererGatherer = prometheus.NewRegistry()const (
// WorkQueue metrics subsystem
WorkQueueSubsystem = "workqueue"
DepthKey = "depth"
AddsKey = "adds_total"
QueueLatencyKey = "queue_duration_seconds"
WorkDurationKey = "work_duration_seconds"
UnfinishedWorkKey = "unfinished_work_seconds"
LongestRunningProcessorKey = "longest_running_processor_seconds"
RetriesKey = "retries_total"
)package main
import (
"github.com/prometheus/client_golang/prometheus"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
var (
reconcileCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "mycontroller_reconcile_total",
Help: "Total number of reconciliations",
},
[]string{"result"},
)
reconcileDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "mycontroller_reconcile_duration_seconds",
Help: "Time spent in reconciliation",
},
[]string{"result"},
)
)
func init() {
// Register metrics with controller-runtime registry
metrics.Registry.MustRegister(reconcileCounter, reconcileDuration)
}
func main() {
// Metrics will be automatically exposed by the manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Metrics: metricsserver.Options{
BindAddress: ":8080",
},
})
// ...
}Import Path: sigs.k8s.io/controller-runtime/pkg/metrics/server
HTTP server for serving Prometheus metrics.
package server
import (
"context"
"crypto/tls"
"net"
"net/http"
"github.com/go-logr/logr"
"k8s.io/client-go/rest"
)
var DefaultBindAddress = ":8080"
// Server serves metrics over HTTP
type Server interface {
// AddExtraHandler adds an extra handler served on path
AddExtraHandler(path string, handler http.Handler) error
// NeedLeaderElection returns true if the metrics server needs leader election
NeedLeaderElection() bool
// Start starts the metrics server
Start(ctx context.Context) error
}
// NewServer creates a new metrics server
func NewServer(o Options, config *rest.Config, httpClient *http.Client) (Server, error)type Options struct {
// SecureServing enables secure serving with TLS
SecureServing bool
// BindAddress is the TCP address to bind to
BindAddress string
// ExtraHandlers contains extra handlers to register
ExtraHandlers map[string]http.Handler
// FilterProvider provides a filter for requests
FilterProvider func(c *rest.Config, httpClient *http.Client) (Filter, error)
// CertDir is the directory containing TLS certificates
CertDir string
// CertName is the name of the TLS certificate file
CertName string
// KeyName is the name of the TLS key file
KeyName string
// TLSOpts are TLS configuration options
TLSOpts []func(*tls.Config)
// ListenConfig allows customizing the network listener
ListenConfig net.ListenConfig
}// Filter wraps an http.Handler to provide filtering
type Filter func(log logr.Logger, handler http.Handler) (http.Handler, error)Import Path: sigs.k8s.io/controller-runtime/pkg/metrics/filters
Authentication and authorization filters for metrics endpoints.
package filters
import (
"net/http"
"k8s.io/client-go/rest"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)
// WithAuthenticationAndAuthorization creates a filter that performs authentication and authorization
func WithAuthenticationAndAuthorization(config *rest.Config, httpClient *http.Client) (metricsserver.Filter, error)Import Path: sigs.k8s.io/controller-runtime/pkg/healthz
Health check endpoints for liveness and readiness probes.
package healthz
import (
"net/http"
)
// Checker is a function that performs a health check
type Checker func(req *http.Request) error
var Ping Checker = func(_ *http.Request) error { return nil }// Handler serves health check endpoints
type Handler struct {
Checks map[string]Checker
}
func (h *Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request)// CheckHandler serves a single health check
type CheckHandler struct {
Checker
}
func (h CheckHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)package main
import (
"net/http"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
)
// Custom health check
func databaseHealthCheck(req *http.Request) error {
// Check database connectivity
if !isDBConnected() {
return fmt.Errorf("database is not connected")
}
return nil
}
func main() {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
HealthProbeBindAddress: ":8081",
})
if err != nil {
panic(err)
}
// Add health checks
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
panic(err)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
panic(err)
}
// Add custom health check
if err := mgr.AddHealthzCheck("database", databaseHealthCheck); err != nil {
panic(err)
}
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
panic(err)
}
}Import Path: sigs.k8s.io/controller-runtime/pkg/leaderelection
Leader election for high availability.
package leaderelection
import (
"time"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"sigs.k8s.io/controller-runtime/pkg/recorder"
)
// NewResourceLock creates a new resource lock for leader election
func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, options Options) (resourcelock.Interface, error)type Options struct {
// LeaderElection determines whether to use leader election
LeaderElection bool
// LeaderElectionResourceLock determines which resource lock to use
LeaderElectionResourceLock string
// LeaderElectionNamespace determines the namespace for the leader election resource
LeaderElectionNamespace string
// LeaderElectionID is the name of the resource for leader election
LeaderElectionID string
// RenewDeadline is how long the leader will retry refreshing leadership before giving up
RenewDeadline time.Duration
// LeaderLabels are labels to apply to the leader election resource
LeaderLabels map[string]string
}package main
import (
"time"
ctrl "sigs.k8s.io/controller-runtime"
)
func main() {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
// Enable leader election
LeaderElection: true,
LeaderElectionID: "my-controller-leader-election",
// Optional: Configure election parameters
LeaderElectionNamespace: "default",
LeaderElectionReleaseOnCancel: true,
LeaseDuration: ptrDuration(15 * time.Second),
RenewDeadline: ptrDuration(10 * time.Second),
RetryPeriod: ptrDuration(2 * time.Second),
})
if err != nil {
panic(err)
}
// Check if this instance is the leader
go func() {
<-mgr.Elected()
log.Info("this instance is now the leader")
}()
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
panic(err)
}
}
func ptrDuration(d time.Duration) *time.Duration {
return &d
}Import Path: sigs.k8s.io/controller-runtime/pkg/recorder
Event recorder provider interface.
package recorder
import (
"k8s.io/client-go/tools/record"
)
// Provider knows how to generate new event recorders
type Provider interface {
// GetEventRecorderFor returns an EventRecorder for the given name
GetEventRecorderFor(name string) record.EventRecorder
}package controllers
import (
"context"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type MyReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
}
func (r *MyReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
var obj corev1.Pod
if err := r.Get(ctx, req.NamespacedName, &obj); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
// Record events
r.Recorder.Event(&obj, corev1.EventTypeNormal, "Reconciling", "Starting reconciliation")
// Do work...
r.Recorder.Event(&obj, corev1.EventTypeNormal, "Reconciled", "Reconciliation completed successfully")
return reconcile.Result{}, nil
}
func (r *MyReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Get event recorder from manager
r.Recorder = mgr.GetEventRecorderFor("my-controller")
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Complete(r)
}Import Path: sigs.k8s.io/controller-runtime/pkg/manager/signals
Signal handling for graceful shutdown.
package signals
import (
"context"
)
// SetupSignalHandler registers for SIGTERM and SIGINT and returns a context that is cancelled on either signal
func SetupSignalHandler() context.Contextpackage main
import (
"os"
ctrl "sigs.k8s.io/controller-runtime"
)
func main() {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
os.Exit(1)
}
// Setup signal handler for graceful shutdown
ctx := ctrl.SetupSignalHandler()
// Start manager - will shut down gracefully on SIGTERM/SIGINT
if err := mgr.Start(ctx); err != nil {
os.Exit(1)
}
}package main
import (
"os"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)
func main() {
// Setup logging
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
log := ctrl.Log.WithName("main")
// Create scheme
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
// Create manager with all supporting services configured
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
// Metrics server
Metrics: metricsserver.Options{
BindAddress: ":8080",
},
// Health probes
HealthProbeBindAddress: ":8081",
// Leader election
LeaderElection: true,
LeaderElectionID: "my-app-leader-election",
LeaseDuration: ptrDuration(15 * time.Second),
RenewDeadline: ptrDuration(10 * time.Second),
RetryPeriod: ptrDuration(2 * time.Second),
// Graceful shutdown
GracefulShutdownTimeout: ptrDuration(30 * time.Second),
})
if err != nil {
log.Error(err, "unable to create manager")
os.Exit(1)
}
// Add health checks
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
log.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
log.Error(err, "unable to set up ready check")
os.Exit(1)
}
// Wait for leader election
go func() {
<-mgr.Elected()
log.Info("elected as leader")
}()
// Start manager with signal handler
log.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
log.Error(err, "problem running manager")
os.Exit(1)
}
}
func ptrDuration(d time.Duration) *time.Duration {
return &d
}Install with Tessl CLI
npx tessl i tessl/golang-sigs-k8s-io--controller-runtime