A set of go libraries for building Kubernetes controllers with structured abstractions for managers, reconcilers, clients, caches, webhooks, and testing
The event handling subsystem provides flexible event routing through sources, handlers, and predicates. This enables fine-grained control over which events trigger reconciliation and how they are processed.
Event handling in controller-runtime consists of three main components:
Import Path: sigs.k8s.io/controller-runtime/pkg/event
The event package defines event types for Create, Update, Delete, and Generic operations.
package event
import (
"sigs.k8s.io/controller-runtime/pkg/client"
)
// CreateEvent is an event where a Kubernetes object was created
type CreateEvent = TypedCreateEvent[client.Object]
// TypedCreateEvent is a generic create event
type TypedCreateEvent[object any] struct {
// Object is the object that was created
Object object
// IsInInitialList indicates if this event is for an object that was part of the initial list
IsInInitialList bool
}// DeleteEvent is an event where a Kubernetes object was deleted
type DeleteEvent = TypedDeleteEvent[client.Object]
// TypedDeleteEvent is a generic delete event
type TypedDeleteEvent[object any] struct {
// Object is the object that was deleted
Object object
// DeleteStateUnknown is true if the object's deletion state is unknown
DeleteStateUnknown bool
}// UpdateEvent is an event where a Kubernetes object was updated
type UpdateEvent = TypedUpdateEvent[client.Object]
// TypedUpdateEvent is a generic update event
type TypedUpdateEvent[object any] struct {
// ObjectOld is the object before the update
ObjectOld object
// ObjectNew is the object after the update
ObjectNew object
}// GenericEvent is an event where the operation type is unknown
type GenericEvent = TypedGenericEvent[client.Object]
// TypedGenericEvent is a generic event type
type TypedGenericEvent[object any] struct {
// Object is the object for the event
Object object
}Import Path: sigs.k8s.io/controller-runtime/pkg/handler
Handlers transform events into reconcile requests that are enqueued for processing.
package handler
import (
"context"
"k8s.io/client-go/util/workqueue"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// EventHandler enqueues reconcile.Requests in response to events
type EventHandler = TypedEventHandler[client.Object, reconcile.Request]
// TypedEventHandler is a generic event handler interface
type TypedEventHandler[object any, request comparable] interface {
// Create is called in response to a create event
Create(context.Context, event.TypedCreateEvent[object], workqueue.TypedRateLimitingInterface[request])
// Update is called in response to an update event
Update(context.Context, event.TypedUpdateEvent[object], workqueue.TypedRateLimitingInterface[request])
// Delete is called in response to a delete event
Delete(context.Context, event.TypedDeleteEvent[object], workqueue.TypedRateLimitingInterface[request])
// Generic is called in response to a generic event
Generic(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
}// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object
type EnqueueRequestForObject = TypedEnqueueRequestForObject[client.Object]
type TypedEnqueueRequestForObject[object client.Object] struct{}
func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request])
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request])
func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request])
func (e *TypedEnqueueRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request])// EnqueueRequestForOwner enqueues Requests for the Owners of an object
func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler
// TypedEnqueueRequestForOwner is the typed version
func TypedEnqueueRequestForOwner[object client.Object](scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) TypedEventHandler[object, reconcile.Request]// OwnerOption modifies EnqueueRequestForOwner behavior
type OwnerOption func(e enqueueRequestForOwnerInterface)
// OnlyControllerOwner filters to only enqueue requests for controller owners
func OnlyControllerOwner() OwnerOption// EnqueueRequestsFromMapFunc enqueues Requests by running a mapping function
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler
// TypedEnqueueRequestsFromMapFunc is the typed version
func TypedEnqueueRequestsFromMapFunc[object any, request comparable](fn TypedMapFunc[object, request]) TypedEventHandler[object, request]// MapFunc is a function that maps an object to a list of reconcile requests
type MapFunc = TypedMapFunc[client.Object, reconcile.Request]
type TypedMapFunc[object any, request comparable] func(context.Context, object) []request// WithLowPriorityWhenUnchanged sets low priority for update events where the object hasn't changed
func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request]
const LowPriority = -100// Funcs is an EventHandler that allows users to provide Funcs for each event type
type Funcs = TypedFuncs[client.Object, reconcile.Request]
type TypedFuncs[object any, request comparable] struct {
// CreateFunc is called in response to a create event
CreateFunc func(context.Context, event.TypedCreateEvent[object], workqueue.TypedRateLimitingInterface[request])
// UpdateFunc is called in response to an update event
UpdateFunc func(context.Context, event.TypedUpdateEvent[object], workqueue.TypedRateLimitingInterface[request])
// DeleteFunc is called in response to a delete event
DeleteFunc func(context.Context, event.TypedDeleteEvent[object], workqueue.TypedRateLimitingInterface[request])
// GenericFunc is called in response to a generic event
GenericFunc func(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
}
func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request])
func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request])
func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDeleteEvent[object], q workqueue.TypedRateLimitingInterface[request])
func (h TypedFuncs[object, request]) Generic(ctx context.Context, e event.TypedGenericEvent[object], q workqueue.TypedRateLimitingInterface[request])package example
import (
"context"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/handler"
)
func SetupController(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}).
// Use default handler - enqueues the object itself
Complete(reconciler)
}package example
import (
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/handler"
)
func SetupControllerWithOwns(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}).
// Watch Pods owned by MyResource
Owns(&corev1.Pod{}, builder.WithOwnerOptions(
handler.OnlyControllerOwner(),
)).
Complete(reconciler)
}package example
import (
"context"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
func configMapToDeployment(ctx context.Context, obj client.Object) []reconcile.Request {
configMap := obj.(*corev1.ConfigMap)
// Map ConfigMap to all Deployments in the same namespace
var deployments appsv1.DeploymentList
if err := client.List(ctx, &deployments, client.InNamespace(configMap.Namespace)); err != nil {
return []reconcile.Request{}
}
requests := make([]reconcile.Request, len(deployments.Items))
for i, deployment := range deployments.Items {
requests[i] = reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(&deployment),
}
}
return requests
}
func SetupControllerWithMapFunc(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&appsv1.Deployment{}).
Watches(
&corev1.ConfigMap{},
handler.EnqueueRequestsFromMapFunc(configMapToDeployment),
).
Complete(reconciler)
}package example
import (
"context"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
func SetupCustomHandler(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Watches(
&corev1.ConfigMap{},
handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
// Custom logic for create events
q.Add(reconcile.Request{NamespacedName: client.ObjectKeyFromObject(e.Object)})
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
// Custom logic for update events
q.Add(reconcile.Request{NamespacedName: client.ObjectKeyFromObject(e.ObjectNew)})
},
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
// Custom logic for delete events
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
// Custom logic for generic events
},
},
).
Complete(reconciler)
}Import Path: sigs.k8s.io/controller-runtime/pkg/predicate
Predicates filter events before they are provided to EventHandlers.
package predicate
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
)
// Predicate filters events before enqueuing
type Predicate = TypedPredicate[client.Object]
// TypedPredicate is a generic predicate interface
type TypedPredicate[object any] interface {
// Create returns true if the Create event should be processed
Create(event.TypedCreateEvent[object]) bool
// Delete returns true if the Delete event should be processed
Delete(event.TypedDeleteEvent[object]) bool
// Update returns true if the Update event should be processed
Update(event.TypedUpdateEvent[object]) bool
// Generic returns true if the Generic event should be processed
Generic(event.TypedGenericEvent[object]) bool
}// And returns a predicate that ANDs all given predicates
func And[object any](predicates ...TypedPredicate[object]) TypedPredicate[object]
// Or returns a predicate that ORs all given predicates
func Or[object any](predicates ...TypedPredicate[object]) TypedPredicate[object]
// Not returns a predicate that negates the given predicate
func Not[object any](predicate TypedPredicate[object]) TypedPredicate[object]// GenerationChangedPredicate filters update events where Generation hasn't changed
type GenerationChangedPredicate = TypedGenerationChangedPredicate[client.Object]
type TypedGenerationChangedPredicate[object metav1.Object] struct {
TypedFuncs[object]
}
func (TypedGenerationChangedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool// ResourceVersionChangedPredicate filters update events where ResourceVersion hasn't changed
type ResourceVersionChangedPredicate = TypedResourceVersionChangedPredicate[client.Object]
type TypedResourceVersionChangedPredicate[T metav1.Object] struct {
TypedFuncs[T]
}
func (TypedResourceVersionChangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool// LabelChangedPredicate filters update events where Labels haven't changed
type LabelChangedPredicate = TypedLabelChangedPredicate[client.Object]
type TypedLabelChangedPredicate[object metav1.Object] struct {
TypedFuncs[object]
}
func (TypedLabelChangedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool// AnnotationChangedPredicate filters update events where Annotations haven't changed
type AnnotationChangedPredicate = TypedAnnotationChangedPredicate[client.Object]
type TypedAnnotationChangedPredicate[object metav1.Object] struct {
TypedFuncs[object]
}
func (TypedAnnotationChangedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool// LabelSelectorPredicate filters events based on a label selector
func LabelSelectorPredicate(s metav1.LabelSelector) (Predicate, error)// Funcs allows users to provide Funcs for each event type
type Funcs = TypedFuncs[client.Object]
type TypedFuncs[object any] struct {
// CreateFunc returns true if the Create event should be processed
CreateFunc func(event.TypedCreateEvent[object]) bool
// DeleteFunc returns true if the Delete event should be processed
DeleteFunc func(event.TypedDeleteEvent[object]) bool
// UpdateFunc returns true if the Update event should be processed
UpdateFunc func(event.TypedUpdateEvent[object]) bool
// GenericFunc returns true if the Generic event should be processed
GenericFunc func(event.TypedGenericEvent[object]) bool
}
func (p TypedFuncs[object]) Create(e event.TypedCreateEvent[object]) bool
func (p TypedFuncs[object]) Delete(e event.TypedDeleteEvent[object]) bool
func (p TypedFuncs[object]) Update(e event.TypedUpdateEvent[object]) bool
func (p TypedFuncs[object]) Generic(e event.TypedGenericEvent[object]) bool// NewPredicateFuncs returns a predicate that applies the given filter function
func NewPredicateFuncs(filter func(object client.Object) bool) Funcs
// NewTypedPredicateFuncs returns a typed predicate
func NewTypedPredicateFuncs[object any](filter func(object object) bool) TypedFuncs[object]package example
import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
func SetupWithGenerationPredicate(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}, builder.WithPredicates(
predicate.GenerationChangedPredicate{},
)).
Complete(reconciler)
}package example
import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
func SetupWithCombinedPredicates(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}, builder.WithPredicates(
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
),
)).
Complete(reconciler)
}package example
import (
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
func SetupWithCustomPredicate(mgr ctrl.Manager) error {
// Only process pods in Running phase
runningPodPredicate := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
pod := e.Object.(*corev1.Pod)
return pod.Status.Phase == corev1.PodRunning
},
UpdateFunc: func(e event.UpdateEvent) bool {
newPod := e.ObjectNew.(*corev1.Pod)
return newPod.Status.Phase == corev1.PodRunning
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true // Process all deletes
},
GenericFunc: func(e event.GenericEvent) bool {
return false // Ignore generic events
},
}
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}, builder.WithPredicates(runningPodPredicate)).
Complete(reconciler)
}package example
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
func SetupWithLabelSelector(mgr ctrl.Manager) error {
labelPredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "my-app",
},
})
if err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}, builder.WithPredicates(labelPredicate)).
Complete(reconciler)
}Import Path: sigs.k8s.io/controller-runtime/pkg/source
Sources provide streams of events to controllers.
package source
import (
"context"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// Source is a source of events
type Source = TypedSource[reconcile.Request]
// TypedSource is a generic source of events
type TypedSource[request comparable] interface {
// Start starts the source
Start(context.Context, workqueue.TypedRateLimitingInterface[request]) error
}// SyncingSource is a source that needs syncing prior to being usable
type SyncingSource = TypedSyncingSource[reconcile.Request]
type TypedSyncingSource[request comparable] interface {
TypedSource[request]
WaitForSync(ctx context.Context) error
}// Kind creates a Source for a specific object type
func Kind[object client.Object](
cache cache.Cache,
obj object,
handler handler.TypedEventHandler[object, reconcile.Request],
predicates ...predicate.TypedPredicate[object],
) SyncingSource
// TypedKind creates a typed source
func TypedKind[object client.Object, request comparable](
cache cache.Cache,
obj object,
handler handler.TypedEventHandler[object, request],
predicates ...predicate.TypedPredicate[object],
) TypedSyncingSource[request]// Channel creates a Source from a Go channel
func Channel[object any](
source <-chan event.TypedGenericEvent[object],
handler handler.TypedEventHandler[object, reconcile.Request],
opts ...ChannelOpt[object, reconcile.Request],
) Source
// TypedChannel creates a typed channel source
func TypedChannel[object any, request comparable](
source <-chan event.TypedGenericEvent[object],
handler handler.TypedEventHandler[object, request],
opts ...ChannelOpt[object, request],
) TypedSource[request]// ChannelOpt configures a channel source
type ChannelOpt[object any, request comparable] func(*channel[object, request])
// WithBufferSize sets the buffer size for the channel source
func WithBufferSize[object any, request comparable](bufferSize int) ChannelOpt[object, request]
// WithPredicates sets predicates for the channel source
func WithPredicates[object any, request comparable](p ...predicate.TypedPredicate[object]) ChannelOpt[object, request]// Func is a function that implements Source
type Func = TypedFunc[reconcile.Request]
type TypedFunc[request comparable] func(context.Context, workqueue.TypedRateLimitingInterface[request]) error
func (f TypedFunc[request]) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[request]) error
func (f TypedFunc[request]) String() string// Informer creates a source from an informer
type Informer struct {
// Informer is the controller-runtime informer
Informer cache.Informer
// Handler transforms events into reconcile requests
Handler handler.EventHandler
// Predicates filter events
Predicates []predicate.Predicate
}
func (is *Informer) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error
func (is *Informer) String() stringpackage example
import (
"context"
"time"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
)
func SetupWithChannelSource(mgr ctrl.Manager) error {
// Create a channel for events
eventChannel := make(chan event.GenericEvent)
// Start a goroutine to send events
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
eventChannel <- event.GenericEvent{Object: &MyResource{}}
}
}()
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}).
WatchesRawSource(
source.Channel(
eventChannel,
&handler.EnqueueRequestForObject{},
),
).
Complete(reconciler)
}package example
import (
"context"
"time"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
func customSource(ctx context.Context, queue workqueue.RateLimitingInterface) error {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
// Enqueue all resources for reconciliation
queue.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: "default",
Name: "my-resource",
},
})
}
}
}
func SetupWithCustomSource(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}).
WatchesRawSource(source.Func(customSource)).
Complete(reconciler)
}Install with Tessl CLI
npx tessl i tessl/golang-sigs-k8s-io--controller-runtime