Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing
—
The test suite framework provides base classes that implement comprehensive test scenarios for connector validation. These base classes use JUnit 5's @TestTemplate mechanism to automatically run multiple test variations with different configurations.
Base class for testing sink connectors with comprehensive scenarios including basic functionality, savepoint restart, scaling, and metrics validation.
/**
* Base class for sink test suite providing comprehensive test scenarios
* @param <T> Type of data elements, must be Comparable for result validation
*/
public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
/**
* Test basic sink functionality with data writing and validation
* @param testEnv The test environment (injected by framework)
* @param externalContext External system context (injected by framework)
* @param semantic Checkpointing semantic (injected by framework)
*/
@TestTemplate
@DisplayName("Test data stream sink")
public void testBasicSink(
TestEnvironment testEnv,
DataStreamSinkExternalContext<T> externalContext,
CheckpointingMode semantic
) throws Exception;
/**
* Test sink restart from savepoint with same parallelism
* @param testEnv The test environment
* @param externalContext External system context
* @param semantic Checkpointing semantic
*/
@TestTemplate
@DisplayName("Test sink restarting from a savepoint")
public void testStartFromSavepoint(
TestEnvironment testEnv,
DataStreamSinkExternalContext<T> externalContext,
CheckpointingMode semantic
) throws Exception;
/**
* Test sink restart with higher parallelism (scale up)
* @param testEnv The test environment
* @param externalContext External system context
* @param semantic Checkpointing semantic
*/
@TestTemplate
@DisplayName("Test sink restarting with a higher parallelism")
public void testScaleUp(
TestEnvironment testEnv,
DataStreamSinkExternalContext<T> externalContext,
CheckpointingMode semantic
) throws Exception;
/**
* Test sink restart with lower parallelism (scale down)
* @param testEnv The test environment
* @param externalContext External system context
* @param semantic Checkpointing semantic
*/
@TestTemplate
@DisplayName("Test sink restarting with a lower parallelism")
public void testScaleDown(
TestEnvironment testEnv,
DataStreamSinkExternalContext<T> externalContext,
CheckpointingMode semantic
) throws Exception;
/**
* Test sink metrics reporting (e.g., numRecordsOut)
* @param testEnv The test environment
* @param externalContext External system context
* @param semantic Checkpointing semantic
*/
@TestTemplate
@DisplayName("Test sink metrics")
public void testMetrics(
TestEnvironment testEnv,
DataStreamSinkExternalContext<T> externalContext,
CheckpointingMode semantic
) throws Exception;
}Usage Examples:
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
import org.apache.flink.connector.testframe.junit.annotations.*;
public class KafkaSinkTestSuite extends SinkTestSuiteBase<String> {
@TestEnv
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
@TestContext
ExternalContextFactory<KafkaSinkExternalContext> contextFactory =
testName -> new KafkaSinkExternalContext(testName);
// All test methods are inherited and run automatically
// testBasicSink, testStartFromSavepoint, testScaleUp, testScaleDown, testMetrics
}Base class for testing source connectors with scenarios including single/multiple splits, savepoint restart, scaling, idle readers, and failover testing.
/**
* Base class for source test suite providing comprehensive test scenarios
* @param <T> Type of data elements produced by the source
*/
public abstract class SourceTestSuiteBase<T> {
/**
* Test source with single split (bounded source required)
* @param testEnv The test environment
* @param externalContext External system context
* @param semantic Checkpointing semantic
*/
@TestTemplate
@DisplayName("Test source with single split")
public void testSourceSingleSplit(
TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic
) throws Exception;
/**
* Test source with multiple splits (bounded source required)
* @param testEnv The test environment
* @param externalContext External system context
* @param semantic Checkpointing semantic
*/
@TestTemplate
@DisplayName("Test source with multiple splits")
public void testMultipleSplits(
TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic
) throws Exception;
/**
* Test source restart from savepoint
* @param testEnv The test environment
* @param externalContext External system context
* @param semantic Checkpointing semantic
*/
@TestTemplate
@DisplayName("Test source restarting from a savepoint")
public void testSavepoint(
TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic
) throws Exception;
/**
* Test source scale up (restart with higher parallelism)
* @param testEnv The test environment
* @param externalContext External system context
* @param semantic Checkpointing semantic
*/
@TestTemplate
@DisplayName("Test source restarting with a higher parallelism")
public void testScaleUp(
TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic
) throws Exception;
/**
* Test source scale down (restart with lower parallelism)
* @param testEnv The test environment
* @param externalContext External system context
* @param semantic Checkpointing semantic
*/
@TestTemplate
@DisplayName("Test source restarting with a lower parallelism")
public void testScaleDown(
TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic
) throws Exception;
/**
* Test source metrics reporting (e.g., numRecordsIn)
* @param testEnv The test environment
* @param externalContext External system context
* @param semantic Checkpointing semantic
*/
@TestTemplate
@DisplayName("Test source metrics")
public void testSourceMetrics(
TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic
) throws Exception;
/**
* Test source with idle readers (bounded source required)
* Tests that sources properly handle idle readers with no assigned splits
* @param testEnv The test environment
* @param externalContext External system context
* @param semantic Checkpointing semantic
*/
@TestTemplate
@DisplayName("Test source with at least one idle parallelism")
public void testIdleReader(
TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
CheckpointingMode semantic
) throws Exception;
/**
* Test source with TaskManager failover (unbounded source required)
* @param testEnv The test environment
* @param externalContext External system context
* @param controller Cluster controller for triggering failures
* @param semantic Checkpointing semantic
*/
@TestTemplate
@DisplayName("Test TaskManager failure")
public void testTaskManagerFailure(
TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
ClusterControllable controller,
CheckpointingMode semantic
) throws Exception;
}Usage Examples:
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
public class KafkaSourceTestSuite extends SourceTestSuiteBase<String> {
@TestEnv
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
@TestContext
ExternalContextFactory<KafkaSourceExternalContext> contextFactory =
testName -> new KafkaSourceExternalContext(testName);
// All test methods are inherited:
// testSourceSingleSplit, testMultipleSplits, testSavepoint,
// testScaleUp, testScaleDown, testSourceMetrics, testIdleReader, testTaskManagerFailure
}Both base classes provide protected helper methods for test data generation and result validation.
// SinkTestSuiteBase helper methods
public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
/**
* Generate test data for sink testing
* @param testingSinkSettings Settings for the sink test
* @param externalContext External context for data generation
* @return List of generated test records
*/
protected List<T> generateTestData(
TestingSinkSettings testingSinkSettings,
DataStreamSinkExternalContext<T> externalContext
);
/**
* Validate sink results with semantic guarantees
* @param reader Reader for external system data
* @param testData Expected test data
* @param semantic Semantic guarantee to validate against
*/
protected void checkResultWithSemantic(
ExternalSystemDataReader<T> reader,
List<T> testData,
CheckpointingMode semantic
) throws Exception;
}
// SourceTestSuiteBase helper methods
public abstract class SourceTestSuiteBase<T> {
/**
* Generate test data and write to external system
* @param splitIndex Index of the split to write to
* @param externalContext External context
* @param testingSourceSettings Source settings
* @return List of generated test records
*/
protected List<T> generateAndWriteTestData(
int splitIndex,
DataStreamSourceExternalContext<T> externalContext,
TestingSourceSettings testingSourceSettings
);
/**
* Validate source results with semantic guarantees
* @param resultIterator Iterator over result data
* @param testData Expected test data (list of splits)
* @param semantic Semantic guarantee to validate against
* @param limit Optional limit for unbounded sources
*/
protected void checkResultWithSemantic(
CloseableIterator<T> resultIterator,
List<List<T>> testData,
CheckpointingMode semantic,
Integer limit
);
/**
* Create source instance for testing
* @param externalContext External context
* @param sourceOptions Source configuration
* @return Source instance
*/
protected Source<T, ?, ?> tryCreateSource(
DataStreamSourceExternalContext<T> externalContext,
TestingSourceSettings sourceOptions
);
/**
* Submit Flink job for testing
* @param env Stream execution environment
* @param jobName Name for the job
* @return JobClient for managing the job
*/
protected JobClient submitJob(StreamExecutionEnvironment env, String jobName) throws Exception;
}Tests are automatically parameterized across different configurations:
EXACTLY_ONCE, AT_LEAST_ONCE@TestContext annotations@TestEnv annotationThe framework automatically generates test combinations and runs each test method multiple times with different parameter combinations.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-test-utils