or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

builder.mdclient-cache.mdcore-framework.mdevent-handling.mdindex.mdsupporting-services.mdtesting.mdutilities.mdwebhooks.md
tile.json

event-handling.mddocs/

Event Handling

The event handling subsystem provides flexible event routing through sources, handlers, and predicates. This enables fine-grained control over which events trigger reconciliation and how they are processed.

Overview

Event handling in controller-runtime consists of three main components:

  1. Sources - Generate streams of events from various origins (watches, channels, etc.)
  2. Handlers - Transform events into reconcile requests
  3. Predicates - Filter events before they reach handlers

Event Package

Import Path: sigs.k8s.io/controller-runtime/pkg/event

The event package defines event types for Create, Update, Delete, and Generic operations.

Event Types

package event

import (
    "sigs.k8s.io/controller-runtime/pkg/client"
)

// CreateEvent is an event where a Kubernetes object was created
type CreateEvent = TypedCreateEvent[client.Object]

// TypedCreateEvent is a generic create event
type TypedCreateEvent[object any] struct {
    // Object is the object that was created
    Object object

    // IsInInitialList indicates if this event is for an object that was part of the initial list
    IsInInitialList bool
}
// DeleteEvent is an event where a Kubernetes object was deleted
type DeleteEvent = TypedDeleteEvent[client.Object]

// TypedDeleteEvent is a generic delete event
type TypedDeleteEvent[object any] struct {
    // Object is the object that was deleted
    Object object

    // DeleteStateUnknown is true if the object's deletion state is unknown
    DeleteStateUnknown bool
}
// UpdateEvent is an event where a Kubernetes object was updated
type UpdateEvent = TypedUpdateEvent[client.Object]

// TypedUpdateEvent is a generic update event
type TypedUpdateEvent[object any] struct {
    // ObjectOld is the object before the update
    ObjectOld object

    // ObjectNew is the object after the update
    ObjectNew object
}
// GenericEvent is an event where the operation type is unknown
type GenericEvent = TypedGenericEvent[client.Object]

// TypedGenericEvent is a generic event type
type TypedGenericEvent[object any] struct {
    // Object is the object for the event
    Object object
}

Handler Package

Import Path: sigs.k8s.io/controller-runtime/pkg/handler

Handlers transform events into reconcile requests that are enqueued for processing.

EventHandler Interface

package handler

import (
    "context"

    "k8s.io/client-go/util/workqueue"
    "k8s.io/apimachinery/pkg/api/meta"
    "k8s.io/apimachinery/pkg/runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/event"
    "sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// EventHandler enqueues reconcile.Requests in response to events
type EventHandler = TypedEventHandler[client.Object, reconcile.Request]

// TypedEventHandler is a generic event handler interface
type TypedEventHandler[object any, request comparable] interface {
    // Create is called in response to a create event
    Create(context.Context, event.TypedCreateEvent[object], workqueue.TypedRateLimitingInterface[request])

    // Update is called in response to an update event
    Update(context.Context, event.TypedUpdateEvent[object], workqueue.TypedRateLimitingInterface[request])

    // Delete is called in response to a delete event
    Delete(context.Context, event.TypedDeleteEvent[object], workqueue.TypedRateLimitingInterface[request])

    // Generic is called in response to a generic event
    Generic(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
}

Built-in Handlers

EnqueueRequestForObject

// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object
type EnqueueRequestForObject = TypedEnqueueRequestForObject[client.Object]

type TypedEnqueueRequestForObject[object client.Object] struct{}

func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request])
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request])
func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request])
func (e *TypedEnqueueRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request])

EnqueueRequestForOwner

// EnqueueRequestForOwner enqueues Requests for the Owners of an object
func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler

// TypedEnqueueRequestForOwner is the typed version
func TypedEnqueueRequestForOwner[object client.Object](scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) TypedEventHandler[object, reconcile.Request]
// OwnerOption modifies EnqueueRequestForOwner behavior
type OwnerOption func(e enqueueRequestForOwnerInterface)

// OnlyControllerOwner filters to only enqueue requests for controller owners
func OnlyControllerOwner() OwnerOption

EnqueueRequestsFromMapFunc

// EnqueueRequestsFromMapFunc enqueues Requests by running a mapping function
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler

// TypedEnqueueRequestsFromMapFunc is the typed version
func TypedEnqueueRequestsFromMapFunc[object any, request comparable](fn TypedMapFunc[object, request]) TypedEventHandler[object, request]
// MapFunc is a function that maps an object to a list of reconcile requests
type MapFunc = TypedMapFunc[client.Object, reconcile.Request]

type TypedMapFunc[object any, request comparable] func(context.Context, object) []request

WithLowPriorityWhenUnchanged

// WithLowPriorityWhenUnchanged sets low priority for update events where the object hasn't changed
func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request]

const LowPriority = -100

Custom Handlers with Funcs

// Funcs is an EventHandler that allows users to provide Funcs for each event type
type Funcs = TypedFuncs[client.Object, reconcile.Request]

type TypedFuncs[object any, request comparable] struct {
    // CreateFunc is called in response to a create event
    CreateFunc func(context.Context, event.TypedCreateEvent[object], workqueue.TypedRateLimitingInterface[request])

    // UpdateFunc is called in response to an update event
    UpdateFunc func(context.Context, event.TypedUpdateEvent[object], workqueue.TypedRateLimitingInterface[request])

    // DeleteFunc is called in response to a delete event
    DeleteFunc func(context.Context, event.TypedDeleteEvent[object], workqueue.TypedRateLimitingInterface[request])

    // GenericFunc is called in response to a generic event
    GenericFunc func(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
}

func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request])
func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request])
func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDeleteEvent[object], q workqueue.TypedRateLimitingInterface[request])
func (h TypedFuncs[object, request]) Generic(ctx context.Context, e event.TypedGenericEvent[object], q workqueue.TypedRateLimitingInterface[request])

Handler Usage Examples

Basic Handler Usage

package example

import (
    "context"

    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/handler"
)

func SetupController(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&MyResource{}).
        // Use default handler - enqueues the object itself
        Complete(reconciler)
}

Owner Reference Handler

package example

import (
    corev1 "k8s.io/api/core/v1"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/handler"
)

func SetupControllerWithOwns(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&MyResource{}).
        // Watch Pods owned by MyResource
        Owns(&corev1.Pod{}, builder.WithOwnerOptions(
            handler.OnlyControllerOwner(),
        )).
        Complete(reconciler)
}

Map Function Handler

package example

import (
    "context"

    corev1 "k8s.io/api/core/v1"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/handler"
    "sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func configMapToDeployment(ctx context.Context, obj client.Object) []reconcile.Request {
    configMap := obj.(*corev1.ConfigMap)

    // Map ConfigMap to all Deployments in the same namespace
    var deployments appsv1.DeploymentList
    if err := client.List(ctx, &deployments, client.InNamespace(configMap.Namespace)); err != nil {
        return []reconcile.Request{}
    }

    requests := make([]reconcile.Request, len(deployments.Items))
    for i, deployment := range deployments.Items {
        requests[i] = reconcile.Request{
            NamespacedName: client.ObjectKeyFromObject(&deployment),
        }
    }
    return requests
}

func SetupControllerWithMapFunc(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&appsv1.Deployment{}).
        Watches(
            &corev1.ConfigMap{},
            handler.EnqueueRequestsFromMapFunc(configMapToDeployment),
        ).
        Complete(reconciler)
}

Custom Funcs Handler

package example

import (
    "context"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/util/workqueue"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/event"
    "sigs.k8s.io/controller-runtime/pkg/handler"
    "sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func SetupCustomHandler(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&corev1.Pod{}).
        Watches(
            &corev1.ConfigMap{},
            handler.Funcs{
                CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
                    // Custom logic for create events
                    q.Add(reconcile.Request{NamespacedName: client.ObjectKeyFromObject(e.Object)})
                },
                UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
                    // Custom logic for update events
                    q.Add(reconcile.Request{NamespacedName: client.ObjectKeyFromObject(e.ObjectNew)})
                },
                DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
                    // Custom logic for delete events
                },
                GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
                    // Custom logic for generic events
                },
            },
        ).
        Complete(reconciler)
}

Predicate Package

Import Path: sigs.k8s.io/controller-runtime/pkg/predicate

Predicates filter events before they are provided to EventHandlers.

Predicate Interface

package predicate

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/labels"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/event"
)

// Predicate filters events before enqueuing
type Predicate = TypedPredicate[client.Object]

// TypedPredicate is a generic predicate interface
type TypedPredicate[object any] interface {
    // Create returns true if the Create event should be processed
    Create(event.TypedCreateEvent[object]) bool

    // Delete returns true if the Delete event should be processed
    Delete(event.TypedDeleteEvent[object]) bool

    // Update returns true if the Update event should be processed
    Update(event.TypedUpdateEvent[object]) bool

    // Generic returns true if the Generic event should be processed
    Generic(event.TypedGenericEvent[object]) bool
}

Predicate Combinators

// And returns a predicate that ANDs all given predicates
func And[object any](predicates ...TypedPredicate[object]) TypedPredicate[object]

// Or returns a predicate that ORs all given predicates
func Or[object any](predicates ...TypedPredicate[object]) TypedPredicate[object]

// Not returns a predicate that negates the given predicate
func Not[object any](predicate TypedPredicate[object]) TypedPredicate[object]

Built-in Predicates

GenerationChangedPredicate

// GenerationChangedPredicate filters update events where Generation hasn't changed
type GenerationChangedPredicate = TypedGenerationChangedPredicate[client.Object]

type TypedGenerationChangedPredicate[object metav1.Object] struct {
    TypedFuncs[object]
}

func (TypedGenerationChangedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool

ResourceVersionChangedPredicate

// ResourceVersionChangedPredicate filters update events where ResourceVersion hasn't changed
type ResourceVersionChangedPredicate = TypedResourceVersionChangedPredicate[client.Object]

type TypedResourceVersionChangedPredicate[T metav1.Object] struct {
    TypedFuncs[T]
}

func (TypedResourceVersionChangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool

LabelChangedPredicate

// LabelChangedPredicate filters update events where Labels haven't changed
type LabelChangedPredicate = TypedLabelChangedPredicate[client.Object]

type TypedLabelChangedPredicate[object metav1.Object] struct {
    TypedFuncs[object]
}

func (TypedLabelChangedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool

AnnotationChangedPredicate

// AnnotationChangedPredicate filters update events where Annotations haven't changed
type AnnotationChangedPredicate = TypedAnnotationChangedPredicate[client.Object]

type TypedAnnotationChangedPredicate[object metav1.Object] struct {
    TypedFuncs[object]
}

func (TypedAnnotationChangedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool

Label Selector Predicate

// LabelSelectorPredicate filters events based on a label selector
func LabelSelectorPredicate(s metav1.LabelSelector) (Predicate, error)

Custom Predicates

// Funcs allows users to provide Funcs for each event type
type Funcs = TypedFuncs[client.Object]

type TypedFuncs[object any] struct {
    // CreateFunc returns true if the Create event should be processed
    CreateFunc func(event.TypedCreateEvent[object]) bool

    // DeleteFunc returns true if the Delete event should be processed
    DeleteFunc func(event.TypedDeleteEvent[object]) bool

    // UpdateFunc returns true if the Update event should be processed
    UpdateFunc func(event.TypedUpdateEvent[object]) bool

    // GenericFunc returns true if the Generic event should be processed
    GenericFunc func(event.TypedGenericEvent[object]) bool
}

func (p TypedFuncs[object]) Create(e event.TypedCreateEvent[object]) bool
func (p TypedFuncs[object]) Delete(e event.TypedDeleteEvent[object]) bool
func (p TypedFuncs[object]) Update(e event.TypedUpdateEvent[object]) bool
func (p TypedFuncs[object]) Generic(e event.TypedGenericEvent[object]) bool
// NewPredicateFuncs returns a predicate that applies the given filter function
func NewPredicateFuncs(filter func(object client.Object) bool) Funcs

// NewTypedPredicateFuncs returns a typed predicate
func NewTypedPredicateFuncs[object any](filter func(object object) bool) TypedFuncs[object]

Predicate Usage Examples

Using Built-in Predicates

package example

import (
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/builder"
    "sigs.k8s.io/controller-runtime/pkg/predicate"
)

func SetupWithGenerationPredicate(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&MyResource{}, builder.WithPredicates(
            predicate.GenerationChangedPredicate{},
        )).
        Complete(reconciler)
}

Combining Predicates

package example

import (
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/builder"
    "sigs.k8s.io/controller-runtime/pkg/predicate"
)

func SetupWithCombinedPredicates(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&MyResource{}, builder.WithPredicates(
            predicate.Or(
                predicate.GenerationChangedPredicate{},
                predicate.AnnotationChangedPredicate{},
            ),
        )).
        Complete(reconciler)
}

Custom Predicate with Funcs

package example

import (
    corev1 "k8s.io/api/core/v1"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/builder"
    "sigs.k8s.io/controller-runtime/pkg/event"
    "sigs.k8s.io/controller-runtime/pkg/predicate"
)

func SetupWithCustomPredicate(mgr ctrl.Manager) error {
    // Only process pods in Running phase
    runningPodPredicate := predicate.Funcs{
        CreateFunc: func(e event.CreateEvent) bool {
            pod := e.Object.(*corev1.Pod)
            return pod.Status.Phase == corev1.PodRunning
        },
        UpdateFunc: func(e event.UpdateEvent) bool {
            newPod := e.ObjectNew.(*corev1.Pod)
            return newPod.Status.Phase == corev1.PodRunning
        },
        DeleteFunc: func(e event.DeleteEvent) bool {
            return true // Process all deletes
        },
        GenericFunc: func(e event.GenericEvent) bool {
            return false // Ignore generic events
        },
    }

    return ctrl.NewControllerManagedBy(mgr).
        For(&corev1.Pod{}, builder.WithPredicates(runningPodPredicate)).
        Complete(reconciler)
}

Label Selector Predicate

package example

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/builder"
    "sigs.k8s.io/controller-runtime/pkg/predicate"
)

func SetupWithLabelSelector(mgr ctrl.Manager) error {
    labelPredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
        MatchLabels: map[string]string{
            "app": "my-app",
        },
    })
    if err != nil {
        return err
    }

    return ctrl.NewControllerManagedBy(mgr).
        For(&MyResource{}, builder.WithPredicates(labelPredicate)).
        Complete(reconciler)
}

Source Package

Import Path: sigs.k8s.io/controller-runtime/pkg/source

Sources provide streams of events to controllers.

Source Interface

package source

import (
    "context"

    "k8s.io/client-go/util/workqueue"
    "sigs.k8s.io/controller-runtime/pkg/cache"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/event"
    "sigs.k8s.io/controller-runtime/pkg/handler"
    "sigs.k8s.io/controller-runtime/pkg/predicate"
    "sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// Source is a source of events
type Source = TypedSource[reconcile.Request]

// TypedSource is a generic source of events
type TypedSource[request comparable] interface {
    // Start starts the source
    Start(context.Context, workqueue.TypedRateLimitingInterface[request]) error
}

SyncingSource

// SyncingSource is a source that needs syncing prior to being usable
type SyncingSource = TypedSyncingSource[reconcile.Request]

type TypedSyncingSource[request comparable] interface {
    TypedSource[request]
    WaitForSync(ctx context.Context) error
}

Creating Sources

Kind Source

// Kind creates a Source for a specific object type
func Kind[object client.Object](
    cache cache.Cache,
    obj object,
    handler handler.TypedEventHandler[object, reconcile.Request],
    predicates ...predicate.TypedPredicate[object],
) SyncingSource

// TypedKind creates a typed source
func TypedKind[object client.Object, request comparable](
    cache cache.Cache,
    obj object,
    handler handler.TypedEventHandler[object, request],
    predicates ...predicate.TypedPredicate[object],
) TypedSyncingSource[request]

Channel Source

// Channel creates a Source from a Go channel
func Channel[object any](
    source <-chan event.TypedGenericEvent[object],
    handler handler.TypedEventHandler[object, reconcile.Request],
    opts ...ChannelOpt[object, reconcile.Request],
) Source

// TypedChannel creates a typed channel source
func TypedChannel[object any, request comparable](
    source <-chan event.TypedGenericEvent[object],
    handler handler.TypedEventHandler[object, request],
    opts ...ChannelOpt[object, request],
) TypedSource[request]
// ChannelOpt configures a channel source
type ChannelOpt[object any, request comparable] func(*channel[object, request])

// WithBufferSize sets the buffer size for the channel source
func WithBufferSize[object any, request comparable](bufferSize int) ChannelOpt[object, request]

// WithPredicates sets predicates for the channel source
func WithPredicates[object any, request comparable](p ...predicate.TypedPredicate[object]) ChannelOpt[object, request]

Function Source

// Func is a function that implements Source
type Func = TypedFunc[reconcile.Request]

type TypedFunc[request comparable] func(context.Context, workqueue.TypedRateLimitingInterface[request]) error

func (f TypedFunc[request]) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[request]) error
func (f TypedFunc[request]) String() string

Informer Source

// Informer creates a source from an informer
type Informer struct {
    // Informer is the controller-runtime informer
    Informer cache.Informer

    // Handler transforms events into reconcile requests
    Handler handler.EventHandler

    // Predicates filter events
    Predicates []predicate.Predicate
}

func (is *Informer) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error
func (is *Informer) String() string

Source Usage Examples

Using Channel Source

package example

import (
    "context"
    "time"

    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/event"
    "sigs.k8s.io/controller-runtime/pkg/handler"
    "sigs.k8s.io/controller-runtime/pkg/source"
)

func SetupWithChannelSource(mgr ctrl.Manager) error {
    // Create a channel for events
    eventChannel := make(chan event.GenericEvent)

    // Start a goroutine to send events
    go func() {
        ticker := time.NewTicker(30 * time.Second)
        defer ticker.Stop()

        for range ticker.C {
            eventChannel <- event.GenericEvent{Object: &MyResource{}}
        }
    }()

    return ctrl.NewControllerManagedBy(mgr).
        For(&MyResource{}).
        WatchesRawSource(
            source.Channel(
                eventChannel,
                &handler.EnqueueRequestForObject{},
            ),
        ).
        Complete(reconciler)
}

Custom Function Source

package example

import (
    "context"
    "time"

    "k8s.io/client-go/util/workqueue"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/reconcile"
    "sigs.k8s.io/controller-runtime/pkg/source"
)

func customSource(ctx context.Context, queue workqueue.RateLimitingInterface) error {
    ticker := time.NewTicker(1 * time.Minute)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return nil
        case <-ticker.C:
            // Enqueue all resources for reconciliation
            queue.Add(reconcile.Request{
                NamespacedName: types.NamespacedName{
                    Namespace: "default",
                    Name:      "my-resource",
                },
            })
        }
    }
}

func SetupWithCustomSource(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&MyResource{}).
        WatchesRawSource(source.Func(customSource)).
        Complete(reconciler)
}