Comprehensive testing utilities for controller-runtime including envtest for running integration tests against a real API server, fake clients for unit testing, and utilities for testing with Gomega matchers.
Controller-runtime provides several testing tools:
Import Path: sigs.k8s.io/controller-runtime/pkg/envtest
EnvTest provides a local Kubernetes control plane for integration testing.
package envtest
import (
"context"
"time"
admissionv1 "k8s.io/api/admissionregistration/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/envtest/internal/controlplane"
)
// Environment provides a test control plane with etcd and kube-apiserver
type Environment struct {
// ControlPlane is the control plane components
ControlPlane controlplane.ControlPlane
// Scheme is the scheme to use for this environment
Scheme *runtime.Scheme
// Config is the rest.Config for connecting to the test cluster
Config *rest.Config
// KubeConfig is the raw kubeconfig bytes
KubeConfig []byte
// CRDInstallOptions are options for CRD installation
CRDInstallOptions CRDInstallOptions
// WebhookInstallOptions are options for webhook installation
WebhookInstallOptions WebhookInstallOptions
// ErrorIfCRDPathMissing determines whether missing CRD paths should error
ErrorIfCRDPathMissing bool
// CRDs is a list of CRDs to install
CRDs []*apiextensionsv1.CustomResourceDefinition
// CRDDirectoryPaths is a list of paths containing CRD yaml files
CRDDirectoryPaths []string
// DownloadBinaryAssets indicates whether to download binary assets
DownloadBinaryAssets bool
// DownloadBinaryAssetsVersion is the version of binaries to download
DownloadBinaryAssetsVersion string
// DownloadBinaryAssetsIndexURL is the URL for the binary assets index
DownloadBinaryAssetsIndexURL string
// BinaryAssetsDirectory is where binary assets are stored
BinaryAssetsDirectory string
// UseExistingCluster indicates whether to use an existing cluster
UseExistingCluster *bool
// ControlPlaneStartTimeout is the maximum time to wait for the control plane to start
ControlPlaneStartTimeout time.Duration
// ControlPlaneStopTimeout is the maximum time to wait for the control plane to stop
ControlPlaneStopTimeout time.Duration
// AttachControlPlaneOutput attaches control plane output to test output
AttachControlPlaneOutput bool
}// Start starts the test environment
func (te *Environment) Start() (*rest.Config, error)
// Stop stops the test environment
func (te *Environment) Stop() error
// AddUser adds a user with specific permissions
func (te *Environment) AddUser(user User, baseConfig *rest.Config) (*AuthenticatedUser, error)const (
StartTimeout = 60 // seconds
StopTimeout = 60 // seconds
)var (
// DefaultBinaryAssetsIndexURL is the default URL for downloading test binaries
DefaultBinaryAssetsIndexURL = "https://raw.githubusercontent.com/kubernetes-sigs/controller-tools/HEAD/envtest-releases.yaml"
// EmptyArguments represents empty command-line arguments
EmptyArguments = process.EmptyArguments
)// CRDInstallOptions configures CRD installation
type CRDInstallOptions struct {
// Scheme is the scheme to use
Scheme *runtime.Scheme
// Paths is a list of paths to CRD files or directories
Paths []string
// CRDs is a list of CRD objects to install
CRDs []*apiextensionsv1.CustomResourceDefinition
// ErrorIfPathMissing determines whether to error if a path is missing
ErrorIfPathMissing bool
// MaxTime is the maximum time to wait for CRDs to become ready
MaxTime time.Duration
// PollInterval is how often to poll for CRD readiness
PollInterval time.Duration
// CleanUpAfterUse determines whether to clean up CRDs after use
CleanUpAfterUse bool
// WebhookOptions are options for webhook installation
WebhookOptions WebhookInstallOptions
}// InstallCRDs installs CRDs into the cluster
func InstallCRDs(config *rest.Config, options CRDInstallOptions) ([]*apiextensionsv1.CustomResourceDefinition, error)
// UninstallCRDs uninstalls CRDs from the cluster
func UninstallCRDs(config *rest.Config, options CRDInstallOptions) error
// WaitForCRDs waits for CRDs to become ready
func WaitForCRDs(config *rest.Config, crds []*apiextensionsv1.CustomResourceDefinition, options CRDInstallOptions) error
// CreateCRDs creates CRDs in the cluster
func CreateCRDs(config *rest.Config, crds []*apiextensionsv1.CustomResourceDefinition) error
// ReadCRDFiles reads CRD files from the file system
func ReadCRDFiles(options *CRDInstallOptions) error// WebhookInstallOptions configures webhook installation
type WebhookInstallOptions struct {
// Paths is a list of paths to webhook configuration files
Paths []string
// MutatingWebhooks is a list of mutating webhook configurations
MutatingWebhooks []*admissionv1.MutatingWebhookConfiguration
// ValidatingWebhooks is a list of validating webhook configurations
ValidatingWebhooks []*admissionv1.ValidatingWebhookConfiguration
// IgnoreSchemeConvertible ignores scheme convertible errors
IgnoreSchemeConvertible bool
// IgnoreErrorIfPathMissing ignores errors if paths are missing
IgnoreErrorIfPathMissing bool
// LocalServingHost is the host for the local webhook server
LocalServingHost string
// LocalServingPort is the port for the local webhook server
LocalServingPort int
// LocalServingCertDir is the directory for TLS certificates
LocalServingCertDir string
// LocalServingCAData is the CA data for TLS
LocalServingCAData []byte
// LocalServingHostExternalName is the external hostname
LocalServingHostExternalName string
// MaxTime is the maximum time to wait for webhooks to become ready
MaxTime time.Duration
// PollInterval is how often to poll for webhook readiness
PollInterval time.Duration
}func (o *WebhookInstallOptions) Install(config *rest.Config) error
func (o *WebhookInstallOptions) Cleanup() error
func (o *WebhookInstallOptions) ModifyWebhookDefinitions() error
func (o *WebhookInstallOptions) PrepWithoutInstalling() error// WaitForWebhooks waits for webhooks to become ready
func WaitForWebhooks(config *rest.Config,
mutatingWebhooks []*admissionv1.MutatingWebhookConfiguration,
validatingWebhooks []*admissionv1.ValidatingWebhookConfiguration,
options WebhookInstallOptions,
) error// SetupEnvtestDefaultBinaryAssetsDirectory sets up the default directory for binaries
func SetupEnvtestDefaultBinaryAssetsDirectory() (string, error)type APIServer = controlplane.APIServer
type Arg = process.Arg
type Arguments = process.Arguments
type AuthenticatedUser = controlplane.AuthenticatedUser
type Authn = controlplane.Authn
type ControlPlane = controlplane.ControlPlane
type Etcd = controlplane.Etcd
type ListenAddr = process.ListenAddr
type SecureServing = controlplane.SecureServing
type User = controlplane.Userpackage controllers_test
import (
"context"
"path/filepath"
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
myapi "example.com/myapp/api/v1"
"example.com/myapp/controllers"
)
var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment
var ctx context.Context
var cancel context.CancelFunc
func TestControllers(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Controller Suite")
}
var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
ctx, cancel = context.WithCancel(context.TODO())
By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")},
ErrorIfCRDPathMissing: true,
}
var err error
cfg, err = testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())
err = myapi.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
// Start the manager
k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
})
Expect(err).ToNot(HaveOccurred())
err = (&controllers.MyReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())
go func() {
defer GinkgoRecover()
err = k8sManager.Start(ctx)
Expect(err).ToNot(HaveOccurred(), "failed to run manager")
}()
})
var _ = AfterSuite(func() {
cancel()
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})Import Path: sigs.k8s.io/controller-runtime/pkg/client/fake
Fake clients provide in-memory implementations for unit testing.
package fake
import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/testing"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
)
// NewFakeClient creates a new fake client
func NewFakeClient(initObjs ...runtime.Object) client.WithWatch
// AddIndex adds a field index to the fake client
func AddIndex(c client.Client, obj runtime.Object, field string, extractValue client.IndexerFunc) error// ClientBuilder builds fake clients with configuration
type ClientBuilder struct {
// Has unexported fields
}
// NewClientBuilder creates a new ClientBuilder
func NewClientBuilder() *ClientBuilder// Build builds the fake client
func (f *ClientBuilder) Build() client.WithWatch
// WithObjects initializes the fake client with objects
func (f *ClientBuilder) WithObjects(initObjs ...client.Object) *ClientBuilder
// WithLists initializes the fake client with lists
func (f *ClientBuilder) WithLists(initLists ...client.ObjectList) *ClientBuilder
// WithRuntimeObjects initializes with runtime objects
func (f *ClientBuilder) WithRuntimeObjects(initRuntimeObjs ...runtime.Object) *ClientBuilder
// WithScheme sets the scheme to use
func (f *ClientBuilder) WithScheme(scheme *runtime.Scheme) *ClientBuilder
// WithRESTMapper sets the REST mapper
func (f *ClientBuilder) WithRESTMapper(restMapper meta.RESTMapper) *ClientBuilder
// WithStatusSubresource indicates which objects have a status subresource
func (f *ClientBuilder) WithStatusSubresource(o ...client.Object) *ClientBuilder
// WithIndex adds a field index
func (f *ClientBuilder) WithIndex(obj runtime.Object, field string, extractValue client.IndexerFunc) *ClientBuilder
// WithObjectTracker sets a custom object tracker
func (f *ClientBuilder) WithObjectTracker(ot testing.ObjectTracker) *ClientBuilder
// WithInterceptorFuncs sets interceptor functions
func (f *ClientBuilder) WithInterceptorFuncs(interceptorFuncs interceptor.Funcs) *ClientBuilder
// WithReturnManagedFields indicates that managed fields should be returned
func (f *ClientBuilder) WithReturnManagedFields() *ClientBuilderpackage controllers_test
import (
"context"
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"example.com/myapp/controllers"
)
var _ = Describe("MyReconciler", func() {
var (
reconciler *controllers.MyReconciler
ctx context.Context
)
BeforeEach(func() {
ctx = context.Background()
// Create fake client with initial objects
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
initialObjects := []runtime.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:latest",
},
},
},
},
}
fakeClient := fake.NewClientBuilder().
WithScheme(scheme).
WithRuntimeObjects(initialObjects...).
Build()
reconciler = &controllers.MyReconciler{
Client: fakeClient,
Scheme: scheme,
}
})
It("should reconcile successfully", func() {
req := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: "test-pod",
Namespace: "default",
},
}
result, err := reconciler.Reconcile(ctx, req)
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())
// Verify the pod was modified
var pod corev1.Pod
err = reconciler.Client.Get(ctx, req.NamespacedName, &pod)
Expect(err).NotTo(HaveOccurred())
Expect(pod.Labels).To(HaveKey("reconciled"))
})
})Import Path: sigs.k8s.io/controller-runtime/pkg/envtest/komega
Komega provides Gomega matchers integrated with controller-runtime client.
package komega
import (
"context"
"github.com/onsi/gomega/types"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// SetClient sets the default client for komega operations
func SetClient(c client.Client)
// SetContext sets the default context for komega operations
func SetContext(c context.Context)
// Get returns a function that gets an object
func Get(obj client.Object) func() error
// List returns a function that lists objects
func List(list client.ObjectList, opts ...client.ListOption) func() error
// Object returns a function that gets an object and returns it
func Object(obj client.Object) func() (client.Object, error)
// ObjectList returns a function that lists objects and returns the list
func ObjectList(list client.ObjectList, opts ...client.ListOption) func() (client.ObjectList, error)
// Update returns a function that gets, modifies, and updates an object
func Update(obj client.Object, f func(), opts ...client.UpdateOption) func() error
// UpdateStatus returns a function that gets, modifies, and updates an object's status
func UpdateStatus(obj client.Object, f func(), opts ...client.SubResourceUpdateOption) func() error
// EqualObject returns a Gomega matcher for comparing objects
func EqualObject(original runtime.Object, opts ...EqualObjectOption) types.GomegaMatcher// Komega provides methods for testing with Gomega
type Komega interface {
Get(client.Object) func() error
List(client.ObjectList, ...client.ListOption) func() error
Update(client.Object, func(), ...client.UpdateOption) func() error
UpdateStatus(client.Object, func(), ...client.SubResourceUpdateOption) func() error
Object(client.Object) func() (client.Object, error)
ObjectList(client.ObjectList, ...client.ListOption) func() (client.ObjectList, error)
WithContext(context.Context) Komega
}
// New creates a new Komega instance
func New(c client.Client) Komega// EqualObjectOption modifies EqualObject behavior
type EqualObjectOption interface {
ApplyToEqualObjectMatcher(options *EqualObjectOptions)
}
type EqualObjectOptions struct {
// Has unexported fields
}
func (o *EqualObjectOptions) ApplyOptions(opts []EqualObjectOption) *EqualObjectOptions// IgnorePaths ignores specific paths when comparing objects
type IgnorePaths []string
func (i IgnorePaths) ApplyToEqualObjectMatcher(opts *EqualObjectOptions)// MatchPaths only matches specific paths when comparing objects
type MatchPaths []string
func (i MatchPaths) ApplyToEqualObjectMatcher(opts *EqualObjectOptions)var (
// IgnoreAutogeneratedMetadata ignores auto-generated metadata fields
IgnoreAutogeneratedMetadata = IgnorePaths{
"metadata.uid",
"metadata.generation",
"metadata.creationTimestamp",
"metadata.resourceVersion",
"metadata.managedFields",
"metadata.deletionGracePeriodSeconds",
"metadata.deletionTimestamp",
"metadata.selfLink",
"metadata.generateName",
}
)package controllers_test
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest/komega"
)
var _ = Describe("Pod Controller", func() {
var pod *corev1.Pod
BeforeEach(func() {
// Setup komega with the test client
komega.SetClient(k8sClient)
komega.SetContext(ctx)
pod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:latest",
},
},
},
}
Expect(k8sClient.Create(ctx, pod)).To(Succeed())
})
It("should eventually become running", func() {
// Use Eventually with komega.Object to wait for status update
Eventually(komega.Object(pod)).
WithTimeout(time.Minute).
WithPolling(time.Second).
Should(HaveField("Status.Phase", corev1.PodRunning))
})
It("should have the correct labels", func() {
// Use Eventually with komega.Get to wait for reconciliation
Eventually(komega.Get(pod)).Should(Succeed())
Expect(pod.Labels).To(HaveKeyWithValue("app", "test"))
})
It("should list pods correctly", func() {
list := &corev1.PodList{}
Eventually(komega.ObjectList(list, client.InNamespace("default"))).
Should(HaveField("Items", HaveLen(1)))
})
})Import Path: sigs.k8s.io/controller-runtime/pkg/controller/controllertest
Test utilities for controllers.
package controllertest
import (
"context"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// FakeInformer is a fake implementation of an informer for testing
type FakeInformer struct {
Synced bool
RunCount int
// Has unexported fields
}
func (f *FakeInformer) Add(obj metav1.Object)
func (f *FakeInformer) Update(oldObj, newObj metav1.Object)
func (f *FakeInformer) Delete(obj metav1.Object)
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error)
func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error)
func (f *FakeInformer) AddEventHandlerWithOptions(handler cache.ResourceEventHandler, _ cache.HandlerOptions) (cache.ResourceEventHandlerRegistration, error)
func (f *FakeInformer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error
func (f *FakeInformer) AddIndexers(indexers cache.Indexers) error
func (f *FakeInformer) HasSynced() bool
func (f *FakeInformer) IsStopped() bool// Queue is a fake queue for testing
type Queue = TypedQueue[reconcile.Request]
type TypedQueue[request comparable] struct {
workqueue.TypedInterface[request]
AddedRateLimitedLock sync.Mutex
AddedRatelimited []any
}
func (q *TypedQueue[request]) AddRateLimited(item request)
func (q *TypedQueue[request]) AddAfter(item request, duration time.Duration)
func (q *TypedQueue[request]) Forget(item request)
func (q *TypedQueue[request]) NumRequeues(item request) int// ErrorType is a test type that errors when used
type ErrorType struct{}
func (ErrorType) DeepCopyObject() runtime.Object
func (ErrorType) GetObjectKind() schema.ObjectKind// UnconventionalListType is a test type with non-standard list structure
type UnconventionalListType struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec string `json:"spec,omitempty"`
}
type UnconventionalListTypeList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []*UnconventionalListType `json:"items"`
}