Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing
—
The JUnit integration provides annotation-driven configuration with automatic resource lifecycle management and test parameterization. It extends JUnit 5 with specialized extensions for connector testing.
Annotations for marking test resources and configuration in test classes.
/**
* Marks field defining TestEnvironment (PER-CLASS lifecycle)
* Only one field per test class can be annotated with @TestEnv
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestEnv {}
/**
* Marks field defining ExternalContextFactory (PER-CASE lifecycle)
* Multiple fields can be annotated for different external systems
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestContext {}
/**
* Marks field defining external system configuration
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestExternalSystem {}
/**
* Marks field defining semantic guarantees for testing
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestSemantics {}Usage Examples:
import org.apache.flink.connector.testframe.junit.annotations.*;
import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
@ExtendWith(ConnectorTestingExtension.class)
public class MyConnectorTestSuite extends SinkTestSuiteBase<String> {
// Test environment (PER-CLASS lifecycle)
@TestEnv
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
// External context factory (PER-CASE lifecycle)
@TestContext
ExternalContextFactory<MySinkExternalContext> sinkContextFactory =
testName -> new MySinkExternalContext(testName);
// Multiple contexts supported
@TestContext
ExternalContextFactory<MyOtherExternalContext> otherContextFactory =
testName -> new MyOtherExternalContext(testName);
// External system configuration
@TestExternalSystem
MyExternalSystemConfig externalConfig = new MyExternalSystemConfig();
// Semantic configuration
@TestSemantics
List<CheckpointingMode> semantics = Arrays.asList(
CheckpointingMode.EXACTLY_ONCE,
CheckpointingMode.AT_LEAST_ONCE
);
}Main JUnit 5 extension that orchestrates the testing framework.
/**
* Main JUnit 5 extension for connector testing framework
* Provides automatic lifecycle management and parameter injection
*/
public class ConnectorTestingExtension implements
BeforeAllCallback, // Initialize PER-CLASS resources
AfterAllCallback, // Cleanup PER-CLASS resources
TestTemplateInvocationContextProvider, // Provide test parameterization
ParameterResolver // Inject parameters into test methods
{
/**
* Initialize test environment and PER-CLASS resources
* Called once before all test methods in the class
*/
@Override
public void beforeAll(ExtensionContext context) throws Exception;
/**
* Cleanup test environment and PER-CLASS resources
* Called once after all test methods in the class
*/
@Override
public void afterAll(ExtensionContext context) throws Exception;
/**
* Provide test invocation contexts for parameterized testing
* Generates combinations of external contexts and semantic modes
*/
@Override
public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(ExtensionContext context);
/**
* Resolve parameters for test method injection
* Supports TestEnvironment, ExternalContext, CheckpointingMode, ClusterControllable
*/
@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext);
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext);
}Automatic Features:
@TestTemplate methodsProvides context for individual test case invocations with proper parameter resolution.
/**
* Provides test case invocation contexts with parameter resolution
*/
public class TestCaseInvocationContextProvider implements TestTemplateInvocationContextProvider {
/**
* Check if extension supports the test method
* @param context Extension context
* @return true if method uses @TestTemplate with supported parameters
*/
@Override
public boolean supportsTestTemplate(ExtensionContext context);
/**
* Provide invocation contexts for test template
* @param context Extension context
* @return Stream of invocation contexts for parameter combinations
*/
@Override
public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(ExtensionContext context);
}Test methods can declare parameters that will be automatically injected:
// Supported parameter types for @TestTemplate methods:
/**
* Test environment instance (PER-CLASS lifecycle)
*/
TestEnvironment testEnv
/**
* External context for sink testing (PER-CASE lifecycle)
*/
DataStreamSinkExternalContext<T> externalContext
/**
* External context for source testing (PER-CASE lifecycle)
*/
DataStreamSourceExternalContext<T> externalContext
/**
* Checkpointing semantic mode (injected per test variation)
*/
CheckpointingMode semantic
/**
* Cluster controller for failure testing (when supported by environment)
*/
ClusterControllable controllerExample Method Signatures:
// Sink test method
@TestTemplate
public void testBasicSink(
TestEnvironment testEnv,
DataStreamSinkExternalContext<String> externalContext,
CheckpointingMode semantic
) throws Exception {
// Test implementation
}
// Source test method with failure testing
@TestTemplate
public void testTaskManagerFailure(
TestEnvironment testEnv,
DataStreamSourceExternalContext<String> externalContext,
ClusterControllable controller,
CheckpointingMode semantic
) throws Exception {
// Test implementation
}
// Custom test method
@TestTemplate
public void testCustomScenario(
TestEnvironment testEnv,
DataStreamSinkExternalContext<String> sinkContext,
DataStreamSourceExternalContext<String> sourceContext,
CheckpointingMode semantic
) throws Exception {
// Custom test with multiple contexts
}Resources with PER-CLASS lifecycle are shared across all test methods in the class.
@TestEnv
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
// Lifecycle:
// 1. Created when test class is instantiated
// 2. Started before first test method (@BeforeAll phase)
// 3. Shared by all test methods in the class
// 4. Stopped after last test method (@AfterAll phase)Benefits:
Resources with PER-CASE lifecycle are created fresh for each test case.
@TestContext
ExternalContextFactory<MySinkExternalContext> contextFactory =
testName -> new MySinkExternalContext(testName);
// Lifecycle:
// 1. Factory created when test class is instantiated
// 2. New context instance created before each test case
// 3. Context used during test execution
// 4. Context closed after test case completionBenefits:
// Class initialization
TestClass instance = new TestClass();
// Before all tests
@BeforeAll
testEnv.startUp(); // Start test environment
// For each test method
for (TestMethod method : testMethods) {
// For each parameter combination
for (ParameterCombination params : combinations) {
// Before each test case
ExternalContext context = contextFactory.createExternalContext(testName);
try {
// Execute test
method.invoke(testEnv, context, semantic);
} finally {
// After each test case
context.close(); // Cleanup external resources
}
}
}
// After all tests
@AfterAll
testEnv.tearDown(); // Stop test environmentThe framework automatically generates test parameter combinations:
// Configuration
@TestContext
ExternalContextFactory<Context1> context1Factory = ...;
@TestContext
ExternalContextFactory<Context2> context2Factory = ...;
@TestSemantics
List<CheckpointingMode> semantics = Arrays.asList(
CheckpointingMode.EXACTLY_ONCE,
CheckpointingMode.AT_LEAST_ONCE
);
// Generated combinations:
// 1. testMethod(testEnv, context1, EXACTLY_ONCE)
// 2. testMethod(testEnv, context1, AT_LEAST_ONCE)
// 3. testMethod(testEnv, context2, EXACTLY_ONCE)
// 4. testMethod(testEnv, context2, AT_LEAST_ONCE)Test cases get descriptive display names based on parameters:
// Generated display names:
// ✓ testBasicSink[Context1, EXACTLY_ONCE]
// ✓ testBasicSink[Context1, AT_LEAST_ONCE]
// ✓ testBasicSink[Context2, EXACTLY_ONCE]
// ✓ testBasicSink[Context2, AT_LEAST_ONCE]Override default parameterization for specific test methods:
@TestTemplate
@ParameterizedTest
@EnumSource(CheckpointingMode.class)
public void testCustomParameterization(
TestEnvironment testEnv,
DataStreamSinkExternalContext<String> externalContext,
CheckpointingMode semantic
) throws Exception {
// Custom parameterization using JUnit 5 @ParameterizedTest
}Test with multiple external systems simultaneously:
@TestContext
ExternalContextFactory<DatabaseExternalContext> dbContextFactory =
testName -> new DatabaseExternalContext(testName);
@TestContext
ExternalContextFactory<MessageQueueExternalContext> mqContextFactory =
testName -> new MessageQueueExternalContext(testName);
@TestTemplate
public void testDatabaseToMessageQueue(
TestEnvironment testEnv,
DatabaseExternalContext dbContext,
MessageQueueExternalContext mqContext,
CheckpointingMode semantic
) throws Exception {
// Test data flow from database to message queue
}Skip tests based on environment or configuration:
@TestTemplate
public void testContainerizedEnvironment(
TestEnvironment testEnv,
DataStreamSinkExternalContext<String> externalContext,
CheckpointingMode semantic
) throws Exception {
assumeTrue(testEnv instanceof FlinkContainerTestEnvironment,
"Test requires containerized environment");
// Test implementation
}
@TestTemplate
public void testExactlyOnceOnly(
TestEnvironment testEnv,
DataStreamSinkExternalContext<String> externalContext,
CheckpointingMode semantic
) throws Exception {
assumeTrue(semantic == CheckpointingMode.EXACTLY_ONCE,
"Test only valid for exactly-once semantic");
// Test implementation
}Configure error handling behavior:
@ExtendWith(ConnectorTestingExtension.class)
@TestMethodOrder(OrderAnnotation.class)
public class MyConnectorTestSuite extends SinkTestSuiteBase<String> {
@TestEnv
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
@TestContext
ExternalContextFactory<MySinkExternalContext> contextFactory = testName -> {
try {
return new MySinkExternalContext(testName);
} catch (Exception e) {
// Convert to TestAbortedException to skip test instead of failing
throw new TestAbortedException("External system not available", e);
}
};
@Order(1)
@TestTemplate
public void testPrerequisites(TestEnvironment testEnv) {
// Verify prerequisites before running main tests
assumeTrue(checkExternalSystemAvailability(),
"External system not available");
}
}@TestTemplate methods<!-- Maven Surefire configuration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M9</version>
<configuration>
<includes>
<include>**/*TestSuite.java</include>
</includes>
<!-- Enable parallel execution -->
<parallel>methods</parallel>
<threadCount>4</threadCount>
</configuration>
</plugin>// Gradle test configuration
test {
useJUnitPlatform()
// Enable parallel execution
maxParallelForks = 4
// Configure test logging
testLogging {
events "passed", "skipped", "failed"
showStandardStreams = true
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-test-utils