The event handling subsystem provides flexible event routing through sources, handlers, and predicates. This enables fine-grained control over which events trigger reconciliation and how they are processed.
Event handling in controller-runtime consists of three main components:
Import Path: sigs.k8s.io/controller-runtime/pkg/event
The event package defines event types for Create, Update, Delete, and Generic operations.
package event
import (
"sigs.k8s.io/controller-runtime/pkg/client"
)
// CreateEvent is an event where a Kubernetes object was created
type CreateEvent = TypedCreateEvent[client.Object]
// TypedCreateEvent is a generic create event
type TypedCreateEvent[object any] struct {
// Object is the object that was created
Object object
// IsInInitialList indicates if this event is for an object that was part of the initial list
IsInInitialList bool
}// DeleteEvent is an event where a Kubernetes object was deleted
type DeleteEvent = TypedDeleteEvent[client.Object]
// TypedDeleteEvent is a generic delete event
type TypedDeleteEvent[object any] struct {
// Object is the object that was deleted
Object object
// DeleteStateUnknown is true if the object's deletion state is unknown
DeleteStateUnknown bool
}// UpdateEvent is an event where a Kubernetes object was updated
type UpdateEvent = TypedUpdateEvent[client.Object]
// TypedUpdateEvent is a generic update event
type TypedUpdateEvent[object any] struct {
// ObjectOld is the object before the update
ObjectOld object
// ObjectNew is the object after the update
ObjectNew object
}// GenericEvent is an event where the operation type is unknown
type GenericEvent = TypedGenericEvent[client.Object]
// TypedGenericEvent is a generic event type
type TypedGenericEvent[object any] struct {
// Object is the object for the event
Object object
}Import Path: sigs.k8s.io/controller-runtime/pkg/handler
Handlers transform events into reconcile requests that are enqueued for processing.
package handler
import (
"context"
"k8s.io/client-go/util/workqueue"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// EventHandler enqueues reconcile.Requests in response to events
type EventHandler = TypedEventHandler[client.Object, reconcile.Request]
// TypedEventHandler is a generic event handler interface
type TypedEventHandler[object any, request comparable] interface {
// Create is called in response to a create event
Create(context.Context, event.TypedCreateEvent[object], workqueue.TypedRateLimitingInterface[request])
// Update is called in response to an update event
Update(context.Context, event.TypedUpdateEvent[object], workqueue.TypedRateLimitingInterface[request])
// Delete is called in response to a delete event
Delete(context.Context, event.TypedDeleteEvent[object], workqueue.TypedRateLimitingInterface[request])
// Generic is called in response to a generic event
Generic(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
}// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object
type EnqueueRequestForObject = TypedEnqueueRequestForObject[client.Object]
type TypedEnqueueRequestForObject[object client.Object] struct{}
func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request])
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request])
func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request])
func (e *TypedEnqueueRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request])// EnqueueRequestForOwner enqueues Requests for the Owners of an object
func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler
// TypedEnqueueRequestForOwner is the typed version
func TypedEnqueueRequestForOwner[object client.Object](scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) TypedEventHandler[object, reconcile.Request]// OwnerOption modifies EnqueueRequestForOwner behavior
type OwnerOption func(e enqueueRequestForOwnerInterface)
// OnlyControllerOwner filters to only enqueue requests for controller owners
func OnlyControllerOwner() OwnerOption// EnqueueRequestsFromMapFunc enqueues Requests by running a mapping function
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler
// TypedEnqueueRequestsFromMapFunc is the typed version
func TypedEnqueueRequestsFromMapFunc[object any, request comparable](fn TypedMapFunc[object, request]) TypedEventHandler[object, request]// MapFunc is a function that maps an object to a list of reconcile requests
type MapFunc = TypedMapFunc[client.Object, reconcile.Request]
type TypedMapFunc[object any, request comparable] func(context.Context, object) []request// WithLowPriorityWhenUnchanged sets low priority for update events where the object hasn't changed
func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request]
const LowPriority = -100// Funcs is an EventHandler that allows users to provide Funcs for each event type
type Funcs = TypedFuncs[client.Object, reconcile.Request]
type TypedFuncs[object any, request comparable] struct {
// CreateFunc is called in response to a create event
CreateFunc func(context.Context, event.TypedCreateEvent[object], workqueue.TypedRateLimitingInterface[request])
// UpdateFunc is called in response to an update event
UpdateFunc func(context.Context, event.TypedUpdateEvent[object], workqueue.TypedRateLimitingInterface[request])
// DeleteFunc is called in response to a delete event
DeleteFunc func(context.Context, event.TypedDeleteEvent[object], workqueue.TypedRateLimitingInterface[request])
// GenericFunc is called in response to a generic event
GenericFunc func(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
}
func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request])
func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request])
func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDeleteEvent[object], q workqueue.TypedRateLimitingInterface[request])
func (h TypedFuncs[object, request]) Generic(ctx context.Context, e event.TypedGenericEvent[object], q workqueue.TypedRateLimitingInterface[request])package example
import (
"context"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/handler"
)
func SetupController(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}).
// Use default handler - enqueues the object itself
Complete(reconciler)
}package example
import (
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/handler"
)
func SetupControllerWithOwns(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}).
// Watch Pods owned by MyResource
Owns(&corev1.Pod{}, builder.WithOwnerOptions(
handler.OnlyControllerOwner(),
)).
Complete(reconciler)
}package example
import (
"context"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
func configMapToDeployment(ctx context.Context, obj client.Object) []reconcile.Request {
configMap := obj.(*corev1.ConfigMap)
// Map ConfigMap to all Deployments in the same namespace
var deployments appsv1.DeploymentList
if err := client.List(ctx, &deployments, client.InNamespace(configMap.Namespace)); err != nil {
return []reconcile.Request{}
}
requests := make([]reconcile.Request, len(deployments.Items))
for i, deployment := range deployments.Items {
requests[i] = reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(&deployment),
}
}
return requests
}
func SetupControllerWithMapFunc(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&appsv1.Deployment{}).
Watches(
&corev1.ConfigMap{},
handler.EnqueueRequestsFromMapFunc(configMapToDeployment),
).
Complete(reconciler)
}package example
import (
"context"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
func SetupCustomHandler(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Watches(
&corev1.ConfigMap{},
handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
// Custom logic for create events
q.Add(reconcile.Request{NamespacedName: client.ObjectKeyFromObject(e.Object)})
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
// Custom logic for update events
q.Add(reconcile.Request{NamespacedName: client.ObjectKeyFromObject(e.ObjectNew)})
},
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
// Custom logic for delete events
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
// Custom logic for generic events
},
},
).
Complete(reconciler)
}Import Path: sigs.k8s.io/controller-runtime/pkg/predicate
Predicates filter events before they are provided to EventHandlers.
package predicate
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
)
// Predicate filters events before enqueuing
type Predicate = TypedPredicate[client.Object]
// TypedPredicate is a generic predicate interface
type TypedPredicate[object any] interface {
// Create returns true if the Create event should be processed
Create(event.TypedCreateEvent[object]) bool
// Delete returns true if the Delete event should be processed
Delete(event.TypedDeleteEvent[object]) bool
// Update returns true if the Update event should be processed
Update(event.TypedUpdateEvent[object]) bool
// Generic returns true if the Generic event should be processed
Generic(event.TypedGenericEvent[object]) bool
}// And returns a predicate that ANDs all given predicates
func And[object any](predicates ...TypedPredicate[object]) TypedPredicate[object]
// Or returns a predicate that ORs all given predicates
func Or[object any](predicates ...TypedPredicate[object]) TypedPredicate[object]
// Not returns a predicate that negates the given predicate
func Not[object any](predicate TypedPredicate[object]) TypedPredicate[object]// GenerationChangedPredicate filters update events where Generation hasn't changed
type GenerationChangedPredicate = TypedGenerationChangedPredicate[client.Object]
type TypedGenerationChangedPredicate[object metav1.Object] struct {
TypedFuncs[object]
}
func (TypedGenerationChangedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool// ResourceVersionChangedPredicate filters update events where ResourceVersion hasn't changed
type ResourceVersionChangedPredicate = TypedResourceVersionChangedPredicate[client.Object]
type TypedResourceVersionChangedPredicate[T metav1.Object] struct {
TypedFuncs[T]
}
func (TypedResourceVersionChangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool// LabelChangedPredicate filters update events where Labels haven't changed
type LabelChangedPredicate = TypedLabelChangedPredicate[client.Object]
type TypedLabelChangedPredicate[object metav1.Object] struct {
TypedFuncs[object]
}
func (TypedLabelChangedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool// AnnotationChangedPredicate filters update events where Annotations haven't changed
type AnnotationChangedPredicate = TypedAnnotationChangedPredicate[client.Object]
type TypedAnnotationChangedPredicate[object metav1.Object] struct {
TypedFuncs[object]
}
func (TypedAnnotationChangedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool// LabelSelectorPredicate filters events based on a label selector
func LabelSelectorPredicate(s metav1.LabelSelector) (Predicate, error)// Funcs allows users to provide Funcs for each event type
type Funcs = TypedFuncs[client.Object]
type TypedFuncs[object any] struct {
// CreateFunc returns true if the Create event should be processed
CreateFunc func(event.TypedCreateEvent[object]) bool
// DeleteFunc returns true if the Delete event should be processed
DeleteFunc func(event.TypedDeleteEvent[object]) bool
// UpdateFunc returns true if the Update event should be processed
UpdateFunc func(event.TypedUpdateEvent[object]) bool
// GenericFunc returns true if the Generic event should be processed
GenericFunc func(event.TypedGenericEvent[object]) bool
}
func (p TypedFuncs[object]) Create(e event.TypedCreateEvent[object]) bool
func (p TypedFuncs[object]) Delete(e event.TypedDeleteEvent[object]) bool
func (p TypedFuncs[object]) Update(e event.TypedUpdateEvent[object]) bool
func (p TypedFuncs[object]) Generic(e event.TypedGenericEvent[object]) bool// NewPredicateFuncs returns a predicate that applies the given filter function
func NewPredicateFuncs(filter func(object client.Object) bool) Funcs
// NewTypedPredicateFuncs returns a typed predicate
func NewTypedPredicateFuncs[object any](filter func(object object) bool) TypedFuncs[object]package example
import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
func SetupWithGenerationPredicate(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}, builder.WithPredicates(
predicate.GenerationChangedPredicate{},
)).
Complete(reconciler)
}package example
import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
func SetupWithCombinedPredicates(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}, builder.WithPredicates(
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
),
)).
Complete(reconciler)
}package example
import (
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
func SetupWithCustomPredicate(mgr ctrl.Manager) error {
// Only process pods in Running phase
runningPodPredicate := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
pod := e.Object.(*corev1.Pod)
return pod.Status.Phase == corev1.PodRunning
},
UpdateFunc: func(e event.UpdateEvent) bool {
newPod := e.ObjectNew.(*corev1.Pod)
return newPod.Status.Phase == corev1.PodRunning
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true // Process all deletes
},
GenericFunc: func(e event.GenericEvent) bool {
return false // Ignore generic events
},
}
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}, builder.WithPredicates(runningPodPredicate)).
Complete(reconciler)
}package example
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
func SetupWithLabelSelector(mgr ctrl.Manager) error {
labelPredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "my-app",
},
})
if err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}, builder.WithPredicates(labelPredicate)).
Complete(reconciler)
}Import Path: sigs.k8s.io/controller-runtime/pkg/source
Sources provide streams of events to controllers.
package source
import (
"context"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// Source is a source of events
type Source = TypedSource[reconcile.Request]
// TypedSource is a generic source of events
type TypedSource[request comparable] interface {
// Start starts the source
Start(context.Context, workqueue.TypedRateLimitingInterface[request]) error
}// SyncingSource is a source that needs syncing prior to being usable
type SyncingSource = TypedSyncingSource[reconcile.Request]
type TypedSyncingSource[request comparable] interface {
TypedSource[request]
WaitForSync(ctx context.Context) error
}// Kind creates a Source for a specific object type
func Kind[object client.Object](
cache cache.Cache,
obj object,
handler handler.TypedEventHandler[object, reconcile.Request],
predicates ...predicate.TypedPredicate[object],
) SyncingSource
// TypedKind creates a typed source
func TypedKind[object client.Object, request comparable](
cache cache.Cache,
obj object,
handler handler.TypedEventHandler[object, request],
predicates ...predicate.TypedPredicate[object],
) TypedSyncingSource[request]// Channel creates a Source from a Go channel
func Channel[object any](
source <-chan event.TypedGenericEvent[object],
handler handler.TypedEventHandler[object, reconcile.Request],
opts ...ChannelOpt[object, reconcile.Request],
) Source
// TypedChannel creates a typed channel source
func TypedChannel[object any, request comparable](
source <-chan event.TypedGenericEvent[object],
handler handler.TypedEventHandler[object, request],
opts ...ChannelOpt[object, request],
) TypedSource[request]// ChannelOpt configures a channel source
type ChannelOpt[object any, request comparable] func(*channel[object, request])
// WithBufferSize sets the buffer size for the channel source
func WithBufferSize[object any, request comparable](bufferSize int) ChannelOpt[object, request]
// WithPredicates sets predicates for the channel source
func WithPredicates[object any, request comparable](p ...predicate.TypedPredicate[object]) ChannelOpt[object, request]// Func is a function that implements Source
type Func = TypedFunc[reconcile.Request]
type TypedFunc[request comparable] func(context.Context, workqueue.TypedRateLimitingInterface[request]) error
func (f TypedFunc[request]) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[request]) error
func (f TypedFunc[request]) String() string// Informer creates a source from an informer
type Informer struct {
// Informer is the controller-runtime informer
Informer cache.Informer
// Handler transforms events into reconcile requests
Handler handler.EventHandler
// Predicates filter events
Predicates []predicate.Predicate
}
func (is *Informer) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error
func (is *Informer) String() stringpackage example
import (
"context"
"time"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
)
func SetupWithChannelSource(mgr ctrl.Manager) error {
// Create a channel for events
eventChannel := make(chan event.GenericEvent)
// Start a goroutine to send events
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
eventChannel <- event.GenericEvent{Object: &MyResource{}}
}
}()
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}).
WatchesRawSource(
source.Channel(
eventChannel,
&handler.EnqueueRequestForObject{},
),
).
Complete(reconciler)
}package example
import (
"context"
"time"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
func customSource(ctx context.Context, queue workqueue.RateLimitingInterface) error {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
// Enqueue all resources for reconciliation
queue.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: "default",
Name: "my-resource",
},
})
}
}
}
func SetupWithCustomSource(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&MyResource{}).
WatchesRawSource(source.Func(customSource)).
Complete(reconciler)
}