Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing
—
The external system integration framework provides abstractions for connectors to interact with their respective external systems during testing. This includes data generation, reading, writing, and connector instantiation.
Foundation interface for all external system integrations.
/**
* Base interface for external system integration in testing framework
*/
public interface ExternalContext extends AutoCloseable {
/**
* Get connector JAR URLs for job submission
* @return List of connector JAR URLs to attach to Flink jobs
*/
List<URL> getConnectorJarPaths();
}
/**
* Factory for creating external context instances
* @param <C> Type of external context to create
*/
public interface ExternalContextFactory<C extends ExternalContext> {
/**
* Create external context instance for test
* @param testName Name of the current test for resource isolation
* @return External context instance
*/
C createExternalContext(String testName);
}Usage Examples:
// External context factory registration
@TestContext
ExternalContextFactory<MyExternalContext> contextFactory =
testName -> new MyExternalContext(testName);
// Custom external context implementation
public class MyExternalContext implements ExternalContext {
private final String testName;
public MyExternalContext(String testName) {
this.testName = testName;
}
@Override
public List<URL> getConnectorJarPaths() {
return Arrays.asList(
new File("target/my-connector.jar").toURI().toURL()
);
}
@Override
public void close() throws Exception {
// Cleanup external resources
}
}Abstract base class for sink connector external system integration.
/**
* External context for DataStream sink testing
* @param <T> Type of data elements handled by the sink
*/
public abstract class DataStreamSinkExternalContext<T> extends ExternalContext {
/**
* Create sink instance for testing
* @param sinkSettings Configuration settings for the sink
* @return Configured sink instance
* @throws UnsupportedOperationException if settings combination not supported
*/
public abstract Sink<T> createSink(TestingSinkSettings sinkSettings);
/**
* Generate test data for sink validation
* @param sinkSettings Sink configuration settings
* @param seed Random seed for reproducible data generation
* @return List of test data elements
*/
public abstract List<T> generateTestData(TestingSinkSettings sinkSettings, long seed);
/**
* Create reader for validating data written to external system
* @param sinkSettings Sink configuration settings
* @return Data reader for external system validation
*/
public abstract ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);
/**
* Get type information for data elements
* @return TypeInformation for proper serialization
*/
public abstract TypeInformation<T> getProducedType();
}
/**
* External context specifically for Sink V2 API
* @param <T> Type of data elements
*/
public abstract class DataStreamSinkV2ExternalContext<T> extends DataStreamSinkExternalContext<T> {
// Inherits all methods from DataStreamSinkExternalContext
// Specifically for org.apache.flink.api.connector.sink2.Sink implementations
}
/**
* External context for Table API sink testing
* @param <T> Type of data elements
*/
public abstract class TableSinkExternalContext<T> extends ExternalContext {
// Table-specific sink testing methods
}Usage Examples:
public class KafkaSinkExternalContext extends DataStreamSinkV2ExternalContext<String> {
private final String topicName;
private final KafkaContainer kafkaContainer;
public KafkaSinkExternalContext(String testName) {
this.topicName = "test-topic-" + testName;
this.kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));
this.kafkaContainer.start();
}
@Override
public Sink<String> createSink(TestingSinkSettings sinkSettings) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaContainer.getBootstrapServers());
return KafkaSink.<String>builder()
.setBootstrapServers(kafkaContainer.getBootstrapServers())
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topicName)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.build();
}
@Override
public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
Random random = new Random(seed);
return IntStream.range(0, 100)
.mapToObj(i -> "test-record-" + i + "-" + random.nextInt(1000))
.collect(Collectors.toList());
}
@Override
public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
return new KafkaDataReader(kafkaContainer.getBootstrapServers(), topicName);
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
@Override
public void close() throws Exception {
kafkaContainer.stop();
}
}Abstract base class for source connector external system integration.
/**
* External context for DataStream source testing
* @param <T> Type of data elements produced by the source
*/
public abstract class DataStreamSourceExternalContext<T> extends ExternalContext {
/**
* Create source instance for testing
* @param sourceSettings Configuration settings for the source
* @return Configured source instance
* @throws UnsupportedOperationException if settings combination not supported
*/
public abstract Source<T, ?, ?> createSource(TestingSourceSettings sourceSettings);
/**
* Generate test data for specific split
* @param sourceSettings Source configuration settings
* @param splitIndex Index of the split (for multiple split scenarios)
* @param seed Random seed for reproducible data generation
* @return List of test data elements for the split
*/
public abstract List<T> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed);
/**
* Create writer for sending test data to external system split
* @param sourceSettings Source configuration settings
* @return Split data writer for external system
*/
public abstract ExternalSystemSplitDataWriter<T> createSourceSplitDataWriter(TestingSourceSettings sourceSettings);
/**
* Get type information for data elements
* @return TypeInformation for proper deserialization
*/
public abstract TypeInformation<T> getProducedType();
}
/**
* External context for Table API source testing
* @param <T> Type of data elements
*/
public abstract class TableSourceExternalContext<T> extends ExternalContext {
// Table-specific source testing methods
}Usage Examples:
public class KafkaSourceExternalContext extends DataStreamSourceExternalContext<String> {
private final KafkaContainer kafkaContainer;
private final Map<Integer, String> splitTopics = new HashMap<>();
public KafkaSourceExternalContext(String testName) {
this.kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));
this.kafkaContainer.start();
}
@Override
public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
return KafkaSource.<String>builder()
.setBootstrapServers(kafkaContainer.getBootstrapServers())
.setTopics(splitTopics.values())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(sourceSettings.getBoundedness() == Boundedness.BOUNDED ?
OffsetsInitializer.latest() : null)
.build();
}
@Override
public List<String> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed) {
Random random = new Random(seed);
return IntStream.range(0, 50)
.mapToObj(i -> "split-" + splitIndex + "-record-" + i + "-" + random.nextInt(1000))
.collect(Collectors.toList());
}
@Override
public ExternalSystemSplitDataWriter<T> createSourceSplitDataWriter(TestingSourceSettings sourceSettings) {
return new KafkaSplitDataWriter(kafkaContainer.getBootstrapServers());
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
}Interfaces for reading and writing data to external systems.
/**
* Interface for reading data from external systems (used by sinks for validation)
* @param <T> Type of data elements
*/
public interface ExternalSystemDataReader<T> {
/**
* Poll for available data from external system
* @param timeout Maximum time to wait for data
* @return List of available data elements (may be empty)
*/
List<T> poll(Duration timeout);
}
/**
* Interface for writing data to external system splits (used by sources for test data setup)
* @param <T> Type of data elements
*/
public interface ExternalSystemSplitDataWriter<T> {
/**
* Write records to external system split
* @param records List of records to write
*/
void writeRecords(List<T> records);
}Usage Examples:
public class KafkaDataReader implements ExternalSystemDataReader<String> {
private final KafkaConsumer<String, String> consumer;
public KafkaDataReader(String bootstrapServers, String topic) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", bootstrapServers);
props.setProperty("group.id", "test-consumer-" + UUID.randomUUID());
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", StringDeserializer.class.getName());
props.setProperty("auto.offset.reset", "earliest");
this.consumer = new KafkaConsumer<>(props);
this.consumer.subscribe(Arrays.asList(topic));
}
@Override
public List<String> poll(Duration timeout) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
return StreamSupport.stream(records.spliterator(), false)
.map(ConsumerRecord::value)
.collect(Collectors.toList());
}
}
public class KafkaSplitDataWriter implements ExternalSystemSplitDataWriter<String> {
private final KafkaProducer<String, String> producer;
private final String topicPrefix;
public KafkaSplitDataWriter(String bootstrapServers) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", bootstrapServers);
props.setProperty("key.serializer", StringSerializer.class.getName());
props.setProperty("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer<>(props);
this.topicPrefix = "test-source-";
}
@Override
public void writeRecords(List<String> records) {
String topic = topicPrefix + Thread.currentThread().getId();
records.forEach(record -> {
producer.send(new ProducerRecord<>(topic, record));
});
producer.flush();
}
}Default implementation for containerized external systems using TestContainers.
/**
* Abstract base class for containerized external systems
*/
public abstract class DefaultContainerizedExternalSystem {
/**
* Start containers and prepare external system
*/
protected abstract void startContainers();
/**
* Stop containers and cleanup resources
*/
protected abstract void stopContainers();
/**
* Get connection configuration for connectors
* @return Configuration properties for connector
*/
protected abstract Properties getConnectionProperties();
}/**
* Configuration settings for sink testing
*/
public class TestingSinkSettings {
public static Builder builder();
public static class Builder {
public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);
public TestingSinkSettings build();
}
public CheckpointingMode getCheckpointingMode();
}
/**
* Configuration settings for source testing
*/
public class TestingSourceSettings {
public static Builder builder();
public static class Builder {
public Builder setBoundedness(Boundedness boundedness);
public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);
public TestingSourceSettings build();
}
public Boundedness getBoundedness();
public CheckpointingMode getCheckpointingMode();
}External contexts follow a predictable lifecycle:
Each test case gets its own external context instance, ensuring:
External contexts should handle common error scenarios:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-test-utils