CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-test-utils

Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing

Pending
Overview
Eval results
Files

external-systems.mddocs/

External System Integration

The external system integration framework provides abstractions for connectors to interact with their respective external systems during testing. This includes data generation, reading, writing, and connector instantiation.

Capabilities

Base External Context

Foundation interface for all external system integrations.

/**
 * Base interface for external system integration in testing framework
 */
public interface ExternalContext extends AutoCloseable {
    /**
     * Get connector JAR URLs for job submission
     * @return List of connector JAR URLs to attach to Flink jobs
     */
    List<URL> getConnectorJarPaths();
}

/**
 * Factory for creating external context instances
 * @param <C> Type of external context to create
 */
public interface ExternalContextFactory<C extends ExternalContext> {
    /**
     * Create external context instance for test
     * @param testName Name of the current test for resource isolation
     * @return External context instance
     */
    C createExternalContext(String testName);
}

Usage Examples:

// External context factory registration
@TestContext
ExternalContextFactory<MyExternalContext> contextFactory = 
    testName -> new MyExternalContext(testName);

// Custom external context implementation
public class MyExternalContext implements ExternalContext {
    private final String testName;
    
    public MyExternalContext(String testName) {
        this.testName = testName;
    }
    
    @Override
    public List<URL> getConnectorJarPaths() {
        return Arrays.asList(
            new File("target/my-connector.jar").toURI().toURL()
        );
    }
    
    @Override
    public void close() throws Exception {
        // Cleanup external resources
    }
}

Sink External Context

Abstract base class for sink connector external system integration.

/**
 * External context for DataStream sink testing
 * @param <T> Type of data elements handled by the sink
 */
public abstract class DataStreamSinkExternalContext<T> extends ExternalContext {
    
    /**
     * Create sink instance for testing
     * @param sinkSettings Configuration settings for the sink
     * @return Configured sink instance
     * @throws UnsupportedOperationException if settings combination not supported
     */
    public abstract Sink<T> createSink(TestingSinkSettings sinkSettings);
    
    /**
     * Generate test data for sink validation
     * @param sinkSettings Sink configuration settings
     * @param seed Random seed for reproducible data generation
     * @return List of test data elements
     */
    public abstract List<T> generateTestData(TestingSinkSettings sinkSettings, long seed);
    
    /**
     * Create reader for validating data written to external system
     * @param sinkSettings Sink configuration settings
     * @return Data reader for external system validation
     */
    public abstract ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);
    
    /**
     * Get type information for data elements
     * @return TypeInformation for proper serialization
     */
    public abstract TypeInformation<T> getProducedType();
}

/**
 * External context specifically for Sink V2 API
 * @param <T> Type of data elements
 */
public abstract class DataStreamSinkV2ExternalContext<T> extends DataStreamSinkExternalContext<T> {
    // Inherits all methods from DataStreamSinkExternalContext
    // Specifically for org.apache.flink.api.connector.sink2.Sink implementations
}

/**
 * External context for Table API sink testing
 * @param <T> Type of data elements
 */
public abstract class TableSinkExternalContext<T> extends ExternalContext {
    // Table-specific sink testing methods
}

Usage Examples:

public class KafkaSinkExternalContext extends DataStreamSinkV2ExternalContext<String> {
    
    private final String topicName;
    private final KafkaContainer kafkaContainer;
    
    public KafkaSinkExternalContext(String testName) {
        this.topicName = "test-topic-" + testName;
        this.kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));
        this.kafkaContainer.start();
    }
    
    @Override
    public Sink<String> createSink(TestingSinkSettings sinkSettings) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", kafkaContainer.getBootstrapServers());
        
        return KafkaSink.<String>builder()
            .setBootstrapServers(kafkaContainer.getBootstrapServers())
            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic(topicName)
                .setValueSerializationSchema(new SimpleStringSchema())
                .build())
            .build();
    }
    
    @Override
    public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
        Random random = new Random(seed);
        return IntStream.range(0, 100)
            .mapToObj(i -> "test-record-" + i + "-" + random.nextInt(1000))
            .collect(Collectors.toList());
    }
    
    @Override
    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
        return new KafkaDataReader(kafkaContainer.getBootstrapServers(), topicName);
    }
    
    @Override
    public TypeInformation<String> getProducedType() {
        return Types.STRING;
    }
    
    @Override
    public void close() throws Exception {
        kafkaContainer.stop();
    }
}

Source External Context

Abstract base class for source connector external system integration.

/**
 * External context for DataStream source testing
 * @param <T> Type of data elements produced by the source
 */
public abstract class DataStreamSourceExternalContext<T> extends ExternalContext {
    
    /**
     * Create source instance for testing
     * @param sourceSettings Configuration settings for the source
     * @return Configured source instance
     * @throws UnsupportedOperationException if settings combination not supported
     */
    public abstract Source<T, ?, ?> createSource(TestingSourceSettings sourceSettings);
    
    /**
     * Generate test data for specific split
     * @param sourceSettings Source configuration settings
     * @param splitIndex Index of the split (for multiple split scenarios)
     * @param seed Random seed for reproducible data generation
     * @return List of test data elements for the split
     */
    public abstract List<T> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed);
    
    /**
     * Create writer for sending test data to external system split
     * @param sourceSettings Source configuration settings
     * @return Split data writer for external system
     */
    public abstract ExternalSystemSplitDataWriter<T> createSourceSplitDataWriter(TestingSourceSettings sourceSettings);
    
    /**
     * Get type information for data elements
     * @return TypeInformation for proper deserialization
     */
    public abstract TypeInformation<T> getProducedType();
}

/**
 * External context for Table API source testing
 * @param <T> Type of data elements
 */
public abstract class TableSourceExternalContext<T> extends ExternalContext {
    // Table-specific source testing methods
}

Usage Examples:

public class KafkaSourceExternalContext extends DataStreamSourceExternalContext<String> {
    
    private final KafkaContainer kafkaContainer;
    private final Map<Integer, String> splitTopics = new HashMap<>();
    
    public KafkaSourceExternalContext(String testName) {
        this.kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));
        this.kafkaContainer.start();
    }
    
    @Override
    public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
        return KafkaSource.<String>builder()
            .setBootstrapServers(kafkaContainer.getBootstrapServers())
            .setTopics(splitTopics.values())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setBounded(sourceSettings.getBoundedness() == Boundedness.BOUNDED ? 
                OffsetsInitializer.latest() : null)
            .build();
    }
    
    @Override
    public List<String> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed) {
        Random random = new Random(seed);
        return IntStream.range(0, 50)
            .mapToObj(i -> "split-" + splitIndex + "-record-" + i + "-" + random.nextInt(1000))
            .collect(Collectors.toList());
    }
    
    @Override
    public ExternalSystemSplitDataWriter<T> createSourceSplitDataWriter(TestingSourceSettings sourceSettings) {
        return new KafkaSplitDataWriter(kafkaContainer.getBootstrapServers());
    }
    
    @Override
    public TypeInformation<String> getProducedType() {
        return Types.STRING;
    }
}

Data Access Interfaces

Interfaces for reading and writing data to external systems.

/**
 * Interface for reading data from external systems (used by sinks for validation)
 * @param <T> Type of data elements
 */
public interface ExternalSystemDataReader<T> {
    /**
     * Poll for available data from external system
     * @param timeout Maximum time to wait for data
     * @return List of available data elements (may be empty)
     */
    List<T> poll(Duration timeout);
}

/**
 * Interface for writing data to external system splits (used by sources for test data setup)
 * @param <T> Type of data elements
 */
public interface ExternalSystemSplitDataWriter<T> {
    /**
     * Write records to external system split
     * @param records List of records to write
     */
    void writeRecords(List<T> records);
}

Usage Examples:

public class KafkaDataReader implements ExternalSystemDataReader<String> {
    private final KafkaConsumer<String, String> consumer;
    
    public KafkaDataReader(String bootstrapServers, String topic) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootstrapServers);
        props.setProperty("group.id", "test-consumer-" + UUID.randomUUID());
        props.setProperty("key.deserializer", StringDeserializer.class.getName());
        props.setProperty("value.deserializer", StringDeserializer.class.getName());
        props.setProperty("auto.offset.reset", "earliest");
        
        this.consumer = new KafkaConsumer<>(props);
        this.consumer.subscribe(Arrays.asList(topic));
    }
    
    @Override
    public List<String> poll(Duration timeout) {
        ConsumerRecords<String, String> records = consumer.poll(timeout);
        return StreamSupport.stream(records.spliterator(), false)
            .map(ConsumerRecord::value)
            .collect(Collectors.toList());
    }
}

public class KafkaSplitDataWriter implements ExternalSystemSplitDataWriter<String> {
    private final KafkaProducer<String, String> producer;
    private final String topicPrefix;
    
    public KafkaSplitDataWriter(String bootstrapServers) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootstrapServers);
        props.setProperty("key.serializer", StringSerializer.class.getName());
        props.setProperty("value.serializer", StringSerializer.class.getName());
        
        this.producer = new KafkaProducer<>(props);
        this.topicPrefix = "test-source-";
    }
    
    @Override
    public void writeRecords(List<String> records) {
        String topic = topicPrefix + Thread.currentThread().getId();
        records.forEach(record -> {
            producer.send(new ProducerRecord<>(topic, record));
        });
        producer.flush();
    }
}

Containerized External Systems

Default implementation for containerized external systems using TestContainers.

/**
 * Abstract base class for containerized external systems
 */
public abstract class DefaultContainerizedExternalSystem {
    
    /**
     * Start containers and prepare external system
     */
    protected abstract void startContainers();
    
    /**
     * Stop containers and cleanup resources
     */
    protected abstract void stopContainers();
    
    /**
     * Get connection configuration for connectors
     * @return Configuration properties for connector
     */
    protected abstract Properties getConnectionProperties();
}

Configuration Types

/**
 * Configuration settings for sink testing
 */
public class TestingSinkSettings {
    public static Builder builder();
    
    public static class Builder {
        public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);
        public TestingSinkSettings build();
    }
    
    public CheckpointingMode getCheckpointingMode();
}

/**
 * Configuration settings for source testing
 */
public class TestingSourceSettings {
    public static Builder builder();
    
    public static class Builder {
        public Builder setBoundedness(Boundedness boundedness);
        public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);
        public TestingSourceSettings build();
    }
    
    public Boundedness getBoundedness();
    public CheckpointingMode getCheckpointingMode();
}

Integration Patterns

Lifecycle Management

External contexts follow a predictable lifecycle:

  1. Creation: Context created via factory for each test case
  2. Setup: External system resources initialized (containers, topics, etc.)
  3. Test Execution: Context used throughout test execution
  4. Cleanup: Context closed automatically after test completion

Resource Isolation

Each test case gets its own external context instance, ensuring:

  • Test Isolation: No interference between test cases
  • Resource Cleanup: Automatic cleanup of external resources
  • Parallel Execution: Tests can run in parallel safely

Error Handling

External contexts should handle common error scenarios:

  • Resource Unavailability: Throw clear exceptions when external systems are not available
  • Configuration Errors: Validate configuration and provide helpful error messages
  • Cleanup Failures: Log warnings but don't fail tests on cleanup issues

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-test-utils

docs

assertions.md

containers.md

external-systems.md

index.md

junit-integration.md

metrics.md

test-environments.md

test-suites.md

tile.json