Supporting services include structured logging with logr, Prometheus metrics integration, health checks, leader election for high availability, and signal handling for graceful shutdown.
Controller-runtime provides several supporting services:
Import Path: sigs.k8s.io/controller-runtime/pkg/log
The log package provides utilities for structured logging.
package log
import (
"context"
"github.com/go-logr/logr"
)
// Log is the base logger used by controller-runtime
var Log logr.Logger
// SetLogger sets the logger implementation for the package
func SetLogger(l logr.Logger)
// FromContext returns a logger from the context
func FromContext(ctx context.Context, keysAndValues ...interface{}) logr.Logger
// IntoContext returns a context with the logger attached
func IntoContext(ctx context.Context, log logr.Logger) context.ContextNote: The LoggerInto variable in the root controllerruntime package is an alias for log.IntoContext:
// LoggerInto takes a context and sets the logger as one of its keys
var LoggerInto = log.IntoContext// NullLogSink is a logr.LogSink that does nothing
type NullLogSink struct{}
func (NullLogSink) Enabled(level int) bool
func (NullLogSink) Error(_ error, _ string, _ ...interface{})
func (NullLogSink) Info(_ int, _ string, _ ...interface{})
func (log NullLogSink) Init(logr.RuntimeInfo)
func (log NullLogSink) WithName(_ string) logr.LogSink
func (log NullLogSink) WithValues(_ ...interface{}) logr.LogSink// KubeAPIWarningLogger logs warnings from the Kubernetes API
type KubeAPIWarningLogger struct {
// Has unexported fields
}
// NewKubeAPIWarningLogger creates a new warning logger
func NewKubeAPIWarningLogger(opts KubeAPIWarningLoggerOptions) *KubeAPIWarningLogger
type KubeAPIWarningLoggerOptions struct {
// Deduplicate indicates whether to deduplicate warnings
Deduplicate bool
}
func (l *KubeAPIWarningLogger) HandleWarningHeaderWithContext(ctx context.Context, code int, _ string, message string)package main
import (
"context"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
func main() {
// Setup logger
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
// Get logger
logger := ctrl.Log.WithName("main")
logger.Info("starting application")
// Use logger in context
ctx := log.IntoContext(context.Background(), logger)
// Retrieve logger from context
log := log.FromContext(ctx)
log.Info("operation completed", "result", "success")
}Import Path: sigs.k8s.io/controller-runtime/pkg/log/zap
Integration with Uber's Zap logging library.
package zap
import (
"flag"
"io"
"github.com/go-logr/logr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// New creates a new logr.Logger using zap
func New(opts ...Opts) logr.Logger
// NewRaw creates a new zap.Logger
func NewRaw(opts ...Opts) *zap.Logger
// UseDevMode sets development mode
func UseDevMode(enabled bool) Opts
// WriteTo sets the output destination
func WriteTo(out io.Writer) Opts
// UseFlagOptions applies flag-provided options
func UseFlagOptions(in *Options) Opts
// Level sets the minimum log level
func Level(level zapcore.LevelEnabler) func(o *Options)
// StacktraceLevel sets the level at which to include stack traces
func StacktraceLevel(stacktraceLevel zapcore.LevelEnabler) func(o *Options)
// Encoder sets the encoder
func Encoder(encoder zapcore.Encoder) func(o *Options)
// JSONEncoder sets JSON encoding
func JSONEncoder(opts ...EncoderConfigOption) func(o *Options)
// ConsoleEncoder sets console encoding
func ConsoleEncoder(opts ...EncoderConfigOption) func(o *Options)
// RawZapOpts allows passing raw zap options
func RawZapOpts(zapOpts ...zap.Option) func(o *Options)// Opts is a functional option for configuring zap
type Opts func(*Options)
// Options contains zap configuration
type Options struct {
// Development puts the logger in development mode
Development bool
// Encoder is the encoder to use
Encoder zapcore.Encoder
// EncoderConfigOptions are options for the encoder config
EncoderConfigOptions []EncoderConfigOption
// NewEncoder is a function to create a new encoder
NewEncoder NewEncoderFunc
// DestWriter is the destination for log output
DestWriter io.Writer
// Level is the minimum enabled logging level
Level zapcore.LevelEnabler
// StacktraceLevel is the level at which stacktraces are recorded
StacktraceLevel zapcore.LevelEnabler
// ZapOpts are additional zap options
ZapOpts []zap.Option
// TimeEncoder is the time encoder
TimeEncoder zapcore.TimeEncoder
}
func (o *Options) BindFlags(fs *flag.FlagSet)// EncoderConfigOption is an option for encoder config
type EncoderConfigOption func(*zapcore.EncoderConfig)
// NewEncoderFunc is a function to create an encoder
type NewEncoderFunc func(...EncoderConfigOption) zapcore.Encoder// KubeAwareEncoder wraps a zapcore.Encoder with Kubernetes-specific formatting
type KubeAwareEncoder struct {
zapcore.Encoder
Verbose bool
}
func (k *KubeAwareEncoder) Clone() zapcore.Encoder
func (k *KubeAwareEncoder) EncodeEntry(entry zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error)package main
import (
"flag"
"os"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"go.uber.org/zap/zapcore"
)
func main() {
var enableDevMode bool
flag.BoolVar(&enableDevMode, "dev", false, "Enable development mode")
flag.Parse()
// Create logger with options
opts := zap.Options{
Development: enableDevMode,
Level: zapcore.InfoLevel,
TimeEncoder: zapcore.ISO8601TimeEncoder,
}
opts.BindFlags(flag.CommandLine)
logger := zap.New(
zap.UseFlagOptions(&opts),
zap.WriteTo(os.Stdout),
)
ctrl.SetLogger(logger)
logger.Info("application started",
"version", "1.0.0",
"port", 8080,
)
}Import Path: sigs.k8s.io/controller-runtime/pkg/metrics
Prometheus metrics integration.
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
// RegistererGatherer combines Registerer and Gatherer interfaces
type RegistererGatherer interface {
prometheus.Registerer
prometheus.Gatherer
}
// Registry is the default prometheus registry for controller-runtime
var Registry RegistererGatherer = prometheus.NewRegistry()const (
// WorkQueue metrics subsystem
WorkQueueSubsystem = "workqueue"
DepthKey = "depth"
AddsKey = "adds_total"
QueueLatencyKey = "queue_duration_seconds"
WorkDurationKey = "work_duration_seconds"
UnfinishedWorkKey = "unfinished_work_seconds"
LongestRunningProcessorKey = "longest_running_processor_seconds"
RetriesKey = "retries_total"
)package main
import (
"github.com/prometheus/client_golang/prometheus"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
var (
reconcileCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "mycontroller_reconcile_total",
Help: "Total number of reconciliations",
},
[]string{"result"},
)
reconcileDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "mycontroller_reconcile_duration_seconds",
Help: "Time spent in reconciliation",
},
[]string{"result"},
)
)
func init() {
// Register metrics with controller-runtime registry
metrics.Registry.MustRegister(reconcileCounter, reconcileDuration)
}
func main() {
// Metrics will be automatically exposed by the manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Metrics: metricsserver.Options{
BindAddress: ":8080",
},
})
// ...
}Import Path: sigs.k8s.io/controller-runtime/pkg/metrics/server
HTTP server for serving Prometheus metrics.
package server
import (
"context"
"crypto/tls"
"net"
"net/http"
"github.com/go-logr/logr"
"k8s.io/client-go/rest"
)
var DefaultBindAddress = ":8080"
// Server serves metrics over HTTP
type Server interface {
// AddExtraHandler adds an extra handler served on path
AddExtraHandler(path string, handler http.Handler) error
// NeedLeaderElection returns true if the metrics server needs leader election
NeedLeaderElection() bool
// Start starts the metrics server
Start(ctx context.Context) error
}
// NewServer creates a new metrics server
func NewServer(o Options, config *rest.Config, httpClient *http.Client) (Server, error)type Options struct {
// SecureServing enables secure serving with TLS
SecureServing bool
// BindAddress is the TCP address to bind to
BindAddress string
// ExtraHandlers contains extra handlers to register
ExtraHandlers map[string]http.Handler
// FilterProvider provides a filter for requests
FilterProvider func(c *rest.Config, httpClient *http.Client) (Filter, error)
// CertDir is the directory containing TLS certificates
CertDir string
// CertName is the name of the TLS certificate file
CertName string
// KeyName is the name of the TLS key file
KeyName string
// TLSOpts are TLS configuration options
TLSOpts []func(*tls.Config)
// ListenConfig allows customizing the network listener
ListenConfig net.ListenConfig
}// Filter wraps an http.Handler to provide filtering
type Filter func(log logr.Logger, handler http.Handler) (http.Handler, error)Import Path: sigs.k8s.io/controller-runtime/pkg/metrics/filters
Authentication and authorization filters for metrics endpoints.
package filters
import (
"net/http"
"k8s.io/client-go/rest"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)
// WithAuthenticationAndAuthorization creates a filter that performs authentication and authorization
func WithAuthenticationAndAuthorization(config *rest.Config, httpClient *http.Client) (metricsserver.Filter, error)Import Path: sigs.k8s.io/controller-runtime/pkg/healthz
Health check endpoints for liveness and readiness probes.
package healthz
import (
"net/http"
)
// Checker is a function that performs a health check
type Checker func(req *http.Request) error
var Ping Checker = func(_ *http.Request) error { return nil }// Handler serves health check endpoints
type Handler struct {
Checks map[string]Checker
}
func (h *Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request)// CheckHandler serves a single health check
type CheckHandler struct {
Checker
}
func (h CheckHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)package main
import (
"net/http"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
)
// Custom health check
func databaseHealthCheck(req *http.Request) error {
// Check database connectivity
if !isDBConnected() {
return fmt.Errorf("database is not connected")
}
return nil
}
func main() {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
HealthProbeBindAddress: ":8081",
})
if err != nil {
panic(err)
}
// Add health checks
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
panic(err)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
panic(err)
}
// Add custom health check
if err := mgr.AddHealthzCheck("database", databaseHealthCheck); err != nil {
panic(err)
}
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
panic(err)
}
}Import Path: sigs.k8s.io/controller-runtime/pkg/leaderelection
Leader election for high availability.
package leaderelection
import (
"time"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"sigs.k8s.io/controller-runtime/pkg/recorder"
)
// NewResourceLock creates a new resource lock for leader election
func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, options Options) (resourcelock.Interface, error)type Options struct {
// LeaderElection determines whether to use leader election
LeaderElection bool
// LeaderElectionResourceLock determines which resource lock to use
LeaderElectionResourceLock string
// LeaderElectionNamespace determines the namespace for the leader election resource
LeaderElectionNamespace string
// LeaderElectionID is the name of the resource for leader election
LeaderElectionID string
// RenewDeadline is how long the leader will retry refreshing leadership before giving up
RenewDeadline time.Duration
// LeaderLabels are labels to apply to the leader election resource
LeaderLabels map[string]string
}package main
import (
"time"
ctrl "sigs.k8s.io/controller-runtime"
)
func main() {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
// Enable leader election
LeaderElection: true,
LeaderElectionID: "my-controller-leader-election",
// Optional: Configure election parameters
LeaderElectionNamespace: "default",
LeaderElectionReleaseOnCancel: true,
LeaseDuration: ptrDuration(15 * time.Second),
RenewDeadline: ptrDuration(10 * time.Second),
RetryPeriod: ptrDuration(2 * time.Second),
})
if err != nil {
panic(err)
}
// Check if this instance is the leader
go func() {
<-mgr.Elected()
log.Info("this instance is now the leader")
}()
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
panic(err)
}
}
func ptrDuration(d time.Duration) *time.Duration {
return &d
}Import Path: sigs.k8s.io/controller-runtime/pkg/recorder
Event recorder provider interface.
package recorder
import (
"k8s.io/client-go/tools/record"
)
// Provider knows how to generate new event recorders
type Provider interface {
// GetEventRecorderFor returns an EventRecorder for the given name
GetEventRecorderFor(name string) record.EventRecorder
}package controllers
import (
"context"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type MyReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
}
func (r *MyReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
var obj corev1.Pod
if err := r.Get(ctx, req.NamespacedName, &obj); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
// Record events
r.Recorder.Event(&obj, corev1.EventTypeNormal, "Reconciling", "Starting reconciliation")
// Do work...
r.Recorder.Event(&obj, corev1.EventTypeNormal, "Reconciled", "Reconciliation completed successfully")
return reconcile.Result{}, nil
}
func (r *MyReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Get event recorder from manager
r.Recorder = mgr.GetEventRecorderFor("my-controller")
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Complete(r)
}Import Path: sigs.k8s.io/controller-runtime/pkg/manager/signals
Signal handling for graceful shutdown.
package signals
import (
"context"
)
// SetupSignalHandler registers for SIGTERM and SIGINT and returns a context that is cancelled on either signal
func SetupSignalHandler() context.Contextpackage main
import (
"os"
ctrl "sigs.k8s.io/controller-runtime"
)
func main() {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
os.Exit(1)
}
// Setup signal handler for graceful shutdown
ctx := ctrl.SetupSignalHandler()
// Start manager - will shut down gracefully on SIGTERM/SIGINT
if err := mgr.Start(ctx); err != nil {
os.Exit(1)
}
}package main
import (
"os"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)
func main() {
// Setup logging
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
log := ctrl.Log.WithName("main")
// Create scheme
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
// Create manager with all supporting services configured
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
// Metrics server
Metrics: metricsserver.Options{
BindAddress: ":8080",
},
// Health probes
HealthProbeBindAddress: ":8081",
// Leader election
LeaderElection: true,
LeaderElectionID: "my-app-leader-election",
LeaseDuration: ptrDuration(15 * time.Second),
RenewDeadline: ptrDuration(10 * time.Second),
RetryPeriod: ptrDuration(2 * time.Second),
// Graceful shutdown
GracefulShutdownTimeout: ptrDuration(30 * time.Second),
})
if err != nil {
log.Error(err, "unable to create manager")
os.Exit(1)
}
// Add health checks
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
log.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
log.Error(err, "unable to set up ready check")
os.Exit(1)
}
// Wait for leader election
go func() {
<-mgr.Elected()
log.Info("elected as leader")
}()
// Start manager with signal handler
log.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
log.Error(err, "problem running manager")
os.Exit(1)
}
}
func ptrDuration(d time.Duration) *time.Duration {
return &d
}