Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-test-utils@2.1.0The Apache Flink Connector Test Utils library provides a comprehensive testing framework for Apache Flink connectors. It enables developers to create standardized, robust tests for both source and sink connectors with support for various testing scenarios including failover, scaling, metrics validation, and external system integration.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>// Test suite base classes
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
// JUnit annotations
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
// Test utilities
import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
import org.apache.flink.connector.testframe.utils.MetricQuerier;
// External context interfaces
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
@ExtendWith(ConnectorTestingExtension.class)
public class MySinkTestSuite extends SinkTestSuiteBase<String> {
@TestEnv
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
@TestContext
ExternalContextFactory<MySinkExternalContext> sinkContextFactory =
(testName) -> new MySinkExternalContext(testName);
// Test methods are automatically provided by SinkTestSuiteBase
// Including: testBasicSink, testStartFromSavepoint, testScaleUp, etc.
}
// External context implementation
public class MySinkExternalContext extends DataStreamSinkV2ExternalContext<String> {
@Override
public Sink<String> createSink(TestingSinkSettings sinkSettings) {
// Return your sink implementation
return new MySink(/* configuration */);
}
@Override
public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
// Generate test data for your sink
return Arrays.asList("test1", "test2", "test3");
}
@Override
public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
// Return a reader to validate data written to external system
return new MySinkDataReader(/* configuration */);
}
}import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
public class MySourceTestSuite extends SourceTestSuiteBase<String> {
@TestEnv
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
@TestContext
ExternalContextFactory<MySourceExternalContext> sourceContextFactory =
(testName) -> new MySourceExternalContext(testName);
// Test methods are automatically provided by SourceTestSuiteBase
// Including: testSourceSingleSplit, testMultipleSplits, testSavepoint, etc.
}The testing framework is built around several key components:
Core test suite base classes providing standardized test scenarios for connector validation. Supports both sink and source connectors with comprehensive test coverage.
public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
// Test methods provided automatically via JUnit @TestTemplate
void testBasicSink(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
void testStartFromSavepoint(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
void testScaleUp(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
void testScaleDown(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
void testMetrics(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
}
public abstract class SourceTestSuiteBase<T> {
// Test methods provided automatically via JUnit @TestTemplate
void testSourceSingleSplit(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
void testMultipleSplits(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
void testSavepoint(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
void testScaleUp(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
void testScaleDown(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
void testSourceMetrics(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
void testIdleReader(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
void testTaskManagerFailure(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, ClusterControllable controller, CheckpointingMode semantic);
}Framework for integrating with external systems, providing abstractions for source and sink connectors to interact with their respective external systems.
public interface ExternalContext extends AutoCloseable {
List<URL> getConnectorJarPaths();
}
public interface ExternalContextFactory<C extends ExternalContext> {
C createExternalContext(String testName);
}
public abstract class DataStreamSinkV2ExternalContext<T> extends ExternalContext {
public abstract Sink<T> createSink(TestingSinkSettings sinkSettings);
public abstract List<T> generateTestData(TestingSinkSettings sinkSettings, long seed);
public abstract ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);
public abstract TypeInformation<T> getProducedType();
}
public abstract class DataStreamSourceExternalContext<T> extends ExternalContext {
public abstract Source<T, ?, ?> createSource(TestingSourceSettings sourceSettings);
public abstract List<T> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed);
public abstract ExternalSystemSplitDataWriter<T> createSourceSplitDataWriter(TestingSourceSettings sourceSettings);
public abstract TypeInformation<T> getProducedType();
}Test environment abstractions for managing Flink cluster lifecycle, supporting both MiniCluster and containerized deployments.
public interface TestEnvironment extends TestResource {
StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);
Endpoint getRestEndpoint();
String getCheckpointUri();
class Endpoint {
public Endpoint(String address, int port);
public String getAddress();
public int getPort();
}
}
public interface TestResource {
void startUp() throws Exception;
void tearDown() throws Exception;
}Specialized assertion utilities for validating connector behavior with support for different semantic guarantees (exactly-once, at-least-once).
public final class CollectIteratorAssertions {
public static <T> CollectIteratorAssert<T> assertThat(Iterator<T> actual);
public static <T> UnorderedCollectIteratorAssert<T> assertUnordered(Iterator<T> actual);
}
public class CollectIteratorAssert<T> {
public CollectIteratorAssert<T> matchesRecordsFromSource(List<List<T>> expected, CheckpointingMode semantic);
public CollectIteratorAssert<T> withNumRecordsLimit(int limit);
}Utilities for querying and validating Flink job metrics via REST API, enabling performance and behavior validation.
public class MetricQuerier {
public MetricQuerier(Configuration configuration) throws ConfigurationException;
public static JobDetailsInfo getJobDetails(RestClient client, TestEnvironment.Endpoint endpoint, JobID jobId) throws Exception;
public Double getAggregatedMetricsByRestAPI(
TestEnvironment.Endpoint endpoint,
JobID jobId,
String sourceOrSinkName,
String metricName,
String filter
) throws Exception;
public AggregatedMetricsResponseBody getMetricList(TestEnvironment.Endpoint endpoint, JobID jobId, JobVertexID vertexId) throws Exception;
public AggregatedMetricsResponseBody getMetrics(TestEnvironment.Endpoint endpoint, JobID jobId, JobVertexID vertexId, String filters) throws Exception;
}Annotation-driven JUnit 5 integration with automatic resource lifecycle management and test parameterization.
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestEnv {}
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestContext {}
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestExternalSystem {}
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestSemantics {}
public class ConnectorTestingExtension implements BeforeAllCallback, AfterAllCallback,
TestTemplateInvocationContextProvider, ParameterResolver {
// Automatic lifecycle management and parameter injection
}TestContainers integration for running tests in isolated containerized environments with custom Flink clusters.
public class FlinkContainers {
public static FlinkContainer jobManager();
public static FlinkContainer taskManager();
public static FlinkContainer cluster();
}
public class FlinkContainerTestEnvironment implements TestEnvironment {
public FlinkContainerTestEnvironment(FlinkContainersSettings settings);
// Implements TestEnvironment methods
}
public class FlinkImageBuilder {
public static DockerImageName buildImage(List<URL> jarPaths) throws ImageBuildException;
}public class TestEnvironmentSettings {
public static Builder builder();
public static class Builder {
public Builder setConnectorJarPaths(List<URL> connectorJarPaths);
public Builder setSavepointRestorePath(String savepointRestorePath);
public TestEnvironmentSettings build();
}
}
public class TestingSinkSettings {
public static Builder builder();
public static class Builder {
public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);
public TestingSinkSettings build();
}
}
public class TestingSourceSettings {
public static Builder builder();
public static class Builder {
public Builder setBoundedness(Boundedness boundedness);
public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);
public TestingSourceSettings build();
}
}public interface ExternalSystemDataReader<T> {
List<T> poll(Duration timeout);
}
public interface ExternalSystemSplitDataWriter<T> {
void writeRecords(List<T> records);
}public class FlinkContainersSettings {
public static Builder builder();
public static class Builder {
public Builder setNumTaskManagers(int numTaskManagers);
public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
public Builder setJobManagerMemory(String jobManagerMemory);
public Builder setTaskManagerMemory(String taskManagerMemory);
public FlinkContainersSettings build();
}
}
public class TestcontainersSettings {
public static Builder builder();
public static class Builder {
public Builder setNetwork(Network network);
public Builder setLogConsumers(Map<String, Consumer<OutputFrame>> logConsumers);
public TestcontainersSettings build();
}
}