CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-test-utils-parent

Comprehensive testing utilities for Apache Flink stream processing framework

Pending
Overview
Eval results
Files

connector-testing.mddocs/

Connector Testing Framework

Comprehensive testing framework for Flink connectors with support for external systems, multiple test environments, and automated test suites. This framework enables thorough testing of both source and sink connectors with real external systems and controlled test environments.

Capabilities

Test Framework Core

ConnectorTestingExtension

JUnit 5 extension that provides comprehensive testing infrastructure for Flink connectors.

/**
 * JUnit 5 extension for connector testing
 * Manages test lifecycle, external systems, and test environments
 */
@ExtendWith(ConnectorTestingExtension.class)
class ConnectorTestingExtension implements BeforeAllCallback, AfterAllCallback {
    void beforeAll(ExtensionContext context) throws Exception;
    void afterAll(ExtensionContext context) throws Exception;
}

Usage Examples:

import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(ConnectorTestingExtension.class)
public class MyConnectorTest {
    // Test methods will have access to managed test infrastructure
    
    @Test
    public void testSourceConnector() {
        // Test implementation
    }
}

Test Environment Management

Core interfaces and implementations for managing test execution environments.

TestEnvironment

Base interface for test execution environments that can run Flink jobs.

/**
 * Test execution environment interface
 * Provides abstraction over different Flink execution environments
 */
interface TestEnvironment extends TestResource {
    /** Submit and execute a Flink job */
    JobExecutionResult executeJob(JobGraph job) throws Exception;
    
    /** Get cluster information */
    ClusterClient<?> getClusterClient();
    
    /** Get job manager REST address */
    String getRestAddress(); 
    
    /** Get web UI URL */
    String getWebUIUrl();
}

/**
 * Base interface for test resources with lifecycle management
 */
interface TestResource extends AutoCloseable {
    /** Start/initialize the resource */
    void startUp() throws Exception;
    
    /** Stop/cleanup the resource */  
    void tearDown() throws Exception;
    
    void close() throws Exception;
}

MiniClusterTestEnvironment

Test environment based on Flink's embedded MiniCluster for fast, lightweight testing.

/**
 * MiniCluster-based test environment
 * Provides fast in-memory Flink execution for testing
 */
class MiniClusterTestEnvironment implements TestEnvironment, ClusterControllable {
    /** Create with default configuration */
    MiniClusterTestEnvironment();
    
    /** Create with custom configuration */
    MiniClusterTestEnvironment(MiniClusterConfiguration config);
    
    /** Create builder for configuration */
    static Builder builder();
    
    JobExecutionResult executeJob(JobGraph job) throws Exception;
    void triggerCheckpoint(long checkpointId) throws Exception;
    void cancelJob(JobID jobId) throws Exception;
    
    static class Builder {
        Builder setParallelism(int parallelism);
        Builder setCheckpointingEnabled(boolean enabled);
        Builder setCheckpointInterval(Duration interval);
        MiniClusterTestEnvironment build();
    }
}

FlinkContainerTestEnvironment

Test environment using Docker containers for testing with a real Flink cluster.

/**
 * Docker container-based test environment
 * Provides realistic Flink cluster environment for integration testing
 */
class FlinkContainerTestEnvironment implements TestEnvironment, ClusterControllable {
    /** Create with default Flink image */
    FlinkContainerTestEnvironment();
    
    /** Create with custom Flink image */
    FlinkContainerTestEnvironment(String flinkImageName);
    
    /** Create builder for configuration */
    static Builder builder();
    
    JobExecutionResult executeJob(JobGraph job) throws Exception;
    void restartTaskManager(int taskManagerIndex) throws Exception;
    void stopTaskManager(int taskManagerIndex) throws Exception;
    
    static class Builder {
        Builder withFlinkImage(String imageName);
        Builder withTaskManagers(int count);
        Builder withTaskSlots(int slots);
        Builder withJobManagerMemory(String memory);
        Builder withTaskManagerMemory(String memory);
        FlinkContainerTestEnvironment build();
    }
}

Usage Examples:

import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;

// MiniCluster environment for fast tests
MiniClusterTestEnvironment miniEnv = MiniClusterTestEnvironment.builder()
    .setParallelism(4)
    .setCheckpointingEnabled(true)
    .setCheckpointInterval(Duration.ofSeconds(1))
    .build();

// Container environment for integration tests
FlinkContainerTestEnvironment containerEnv = FlinkContainerTestEnvironment.builder()
    .withFlinkImage("flink:1.17")
    .withTaskManagers(2)
    .withTaskSlots(4)
    .withJobManagerMemory("1g")
    .withTaskManagerMemory("2g")
    .build();

External System Testing

Framework for testing connectors with real external systems like databases, message queues, etc.

ExternalContext

Base interface for managing external system lifecycle and interaction during tests.

/**
 * Context for external system interaction
 * Manages lifecycle and provides access to external systems
 */
interface ExternalContext extends AutoCloseable {
    /** Initialize external system for testing */
    void setUp() throws Exception;
    
    /** Clean up external system after testing */
    void tearDown() throws Exception;
    
    /** Get connection information for Flink connectors */
    Properties getConnectionProperties();
    
    /** Generate unique identifier for this test run */
    String generateTestId();
}

/**
 * Factory for creating external contexts
 */
interface ExternalContextFactory<C extends ExternalContext> {
    /** Create external context for testing */
    C createExternalContext(String testName);
    
    /** Get display name for this external system */
    String getDisplayName();
}

Data Reading and Writing

Interfaces for reading and writing test data to external systems.

/**
 * Read data from external systems for verification
 */
interface ExternalSystemDataReader<T> extends AutoCloseable {
    /** Read all data from external system */
    List<T> readData() throws Exception;
    
    /** Read data with timeout */
    List<T> readData(Duration timeout) throws Exception;
    
    /** Read data matching criteria */
    List<T> readData(Predicate<T> filter) throws Exception;
    
    void close() throws Exception;
}

/**
 * Write data to external systems in splits for parallel testing
 */
interface ExternalSystemSplitDataWriter<T> extends AutoCloseable {
    /** Write split of data to external system */
    void writeSplit(List<T> data, int splitIndex) throws Exception;
    
    /** Write all splits and finalize */
    void writeAndFinalize(List<List<T>> splits) throws Exception;
    
    /** Get number of supported splits */
    int getMaxParallelism();
    
    void close() throws Exception;
}

Usage Examples:

import org.apache.flink.connector.testframe.external.ExternalContext;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;

// External context implementation
public class DatabaseExternalContext implements ExternalContext {
    private Connection connection;
    
    @Override
    public void setUp() throws Exception {
        connection = DriverManager.getConnection(getConnectionUrl());
        // Initialize test database
    }
    
    @Override
    public Properties getConnectionProperties() {
        Properties props = new Properties();
        props.setProperty("url", getConnectionUrl());
        return props;
    }
}

// Reading test data
ExternalSystemDataReader<MyRecord> reader = createDataReader();
List<MyRecord> results = reader.readData(Duration.ofSeconds(30));
assertEquals(expectedRecords, results);

Source Testing Framework

Specialized testing framework for Flink source connectors.

Source Test Contexts

Contexts specifically designed for testing source connectors with different APIs.

/**
 * Context for DataStream source testing
 * Provides source creation and data verification capabilities
 */
interface DataStreamSourceExternalContext<T> extends ExternalContext {
    /** Create source for DataStream API */
    SourceFunction<T> createSource(SourceSplitSerializer<?> splitSerializer);
    
    /** Get data reader for verification */
    ExternalSystemDataReader<T> createDataReader();
    
    /** Generate test data splits */
    List<List<T>> generateTestDataSplits(int numSplits);
}

/**
 * Context for Table API source testing
 */
interface TableSourceExternalContext extends ExternalContext {
    /** Create table source for Table API */
    DynamicTableSource createTableSource(TableDescriptor descriptor);
    
    /** Get table descriptor for source */
    TableDescriptor getTableDescriptor();
    
    /** Create catalog for table registration */
    Catalog createCatalog();
}

SourceTestSuiteBase

Base class providing comprehensive test suite for source connectors.

/**
 * Base class for source test suites
 * Provides standard test methods for source connector validation
 */
abstract class SourceTestSuiteBase<T> {
    /** Test basic source functionality */
    @TestTemplate
    void testSourceReading() throws Exception;
    
    /** Test source with checkpointing */
    @TestTemplate
    void testSourceWithCheckpointing() throws Exception;
    
    /** Test source restart from checkpoint */
    @TestTemplate
    void testSourceRestartFromCheckpoint() throws Exception;
    
    /** Test source parallelism handling */
    @TestTemplate
    void testSourceParallelism() throws Exception;
    
    /** Test source idempotency */
    @TestTemplate
    void testSourceIdempotency() throws Exception;
    
    /** Get external context for testing */
    protected abstract DataStreamSourceExternalContext<T> getExternalContext();
    
    /** Get test environment */
    protected abstract TestEnvironment getTestEnvironment();
}

Usage Examples:

import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;

@ExtendWith(ConnectorTestingExtension.class)
public class MySourceConnectorTest extends SourceTestSuiteBase<MyRecord> {
    
    @Override
    protected DataStreamSourceExternalContext<MyRecord> getExternalContext() {
        return new MySourceExternalContext();
    }
    
    @Override
    protected TestEnvironment getTestEnvironment() {
        return MiniClusterTestEnvironment.builder()
            .setParallelism(2)
            .build();
    }
    
    // All standard source tests are inherited and will run automatically
    // testSourceReading(), testSourceWithCheckpointing(), etc.
}

Sink Testing Framework

Specialized testing framework for Flink sink connectors.

Sink Test Contexts

Contexts for testing sink connectors with different APIs and semantics.

/**
 * Context for DataStream sink testing  
 * Supports both legacy SinkFunction and new Sink API
 */
interface DataStreamSinkExternalContext<T> extends ExternalContext, ResultTypeQueryable<T> {
    /** Create sink function for legacy API */
    SinkFunction<T> createSinkFunction();
    
    /** Create data reader for result verification */
    ExternalSystemDataReader<T> createDataReader();
    
    /** Generate test data for sink testing */
    List<T> generateTestData(int numRecords);
    
    /** Get type information for records */
    TypeInformation<T> getProducedType();
}

/**
 * Context for Sink V2 API testing
 */
interface DataStreamSinkV2ExternalContext<T> extends DataStreamSinkExternalContext<T> {
    /** Create sink for new Sink API */
    Sink<T> createSink();
    
    /** Test exactly-once semantics support */
    boolean supportsExactlyOnce();
    
    /** Test at-least-once semantics support */
    boolean supportsAtLeastOnce();
}

/**
 * Context for Table API sink testing
 */
interface TableSinkExternalContext extends ExternalContext {
    /** Create table sink for Table API */
    DynamicTableSink createTableSink(TableDescriptor descriptor);
    
    /** Get table descriptor for sink */
    TableDescriptor getTableDescriptor();
    
    /** Create catalog for table registration */
    Catalog createCatalog();
}

SinkTestSuiteBase

Base class providing comprehensive test suite for sink connectors.

/**
 * Base class for sink test suites
 * Provides standard test methods for sink connector validation
 */
abstract class SinkTestSuiteBase<T> {
    /** Test basic sink writing */
    @TestTemplate
    void testSinkWriting() throws Exception;
    
    /** Test sink with checkpointing */
    @TestTemplate
    void testSinkWithCheckpointing() throws Exception;
    
    /** Test sink exactly-once semantics */
    @TestTemplate
    void testSinkExactlyOnce() throws Exception;
    
    /** Test sink at-least-once semantics */
    @TestTemplate
    void testSinkAtLeastOnce() throws Exception;
    
    /** Test sink with multiple writers */
    @TestTemplate
    void testSinkParallelism() throws Exception;
    
    /** Test sink failure recovery */
    @TestTemplate
    void testSinkFailureRecovery() throws Exception;
    
    /** Get external context for testing */
    protected abstract DataStreamSinkExternalContext<T> getExternalContext();
    
    /** Get test environment */
    protected abstract TestEnvironment getTestEnvironment();
}

Usage Examples:

import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;

@ExtendWith(ConnectorTestingExtension.class)
public class MySinkConnectorTest extends SinkTestSuiteBase<MyRecord> {
    
    @Override
    protected DataStreamSinkV2ExternalContext<MyRecord> getExternalContext() {
        return new MySinkExternalContext();
    }
    
    @Override
    protected TestEnvironment getTestEnvironment() {
        return FlinkContainerTestEnvironment.builder()
            .withTaskManagers(2)
            .build();
    }
    
    // All standard sink tests are inherited
    // testSinkWriting(), testSinkExactlyOnce(), etc.
}

Test Configuration Annotations

Annotations for configuring connector test behavior and requirements.

/**
 * Configure test context
 */
@interface TestContext {
    /** External context factory class */
    Class<? extends ExternalContextFactory<?>> value();
}

/**
 * Configure test environment
 */  
@interface TestEnv {
    /** Test environment class */
    Class<? extends TestEnvironment> value();
}

/**
 * Configure external system for testing
 */
@interface TestExternalSystem {
    /** External system identifier */
    String value();
}

/**
 * Configure test semantics requirements
 */
@interface TestSemantics {
    /** Required delivery guarantees */
    DeliveryGuarantee[] value();
}

Usage Examples:

import org.apache.flink.connector.testframe.junit.annotations.*;

@ExtendWith(ConnectorTestingExtension.class)
@TestContext(MyConnectorExternalContextFactory.class)
@TestEnv(MiniClusterTestEnvironment.class)
@TestExternalSystem("kafka")
@TestSemantics({DeliveryGuarantee.EXACTLY_ONCE, DeliveryGuarantee.AT_LEAST_ONCE})
public class ConfiguredConnectorTest extends SourceTestSuiteBase<String> {
    // Test configuration provided by annotations
}

Container Testing Support

Docker container management for realistic Flink cluster testing.

FlinkContainers

Utility for managing Flink Docker containers in tests.

/**
 * Manage Flink containers for testing
 * Handles JobManager and TaskManager containers
 */
class FlinkContainers implements BeforeAllCallback, AfterAllCallback {
    /** Create with default Flink image */
    FlinkContainers();
    
    /** Create with custom image */
    FlinkContainers(String flinkImage);
    
    /** Get JobManager container */
    GenericContainer<?> getJobManagerContainer();
    
    /** Get TaskManager containers */
    List<GenericContainer<?>> getTaskManagerContainers();
    
    /** Get Flink REST client */
    RestClusterClient<String> getRestClient();
    
    void beforeAll(ExtensionContext context) throws Exception;
    void afterAll(ExtensionContext context) throws Exception;
}

/**
 * Build custom Flink Docker images for testing
 */
class FlinkImageBuilder {
    /** Create builder with base Flink image */
    static FlinkImageBuilder fromBaseImage(String baseImage);
    
    /** Add JAR file to image */
    FlinkImageBuilder addJar(Path jarPath);
    
    /** Add connector dependencies */
    FlinkImageBuilder addConnectorDependencies(String... coordinates);
    
    /** Set custom configuration */
    FlinkImageBuilder withConfiguration(String key, String value);
    
    /** Build the custom image */
    String build();
}

Usage Examples:

import org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.FlinkImageBuilder;

// Custom Flink image with connector
String customImage = FlinkImageBuilder
    .fromBaseImage("flink:1.17")
    .addConnectorDependencies("org.apache.flink:flink-connector-kafka:1.17.0")
    .withConfiguration("state.backend", "filesystem")
    .build();

// Use containers in test
@RegisterExtension
static FlinkContainers flinkContainers = new FlinkContainers(customImage);

@Test
void testWithContainers() {
    RestClusterClient<?> client = flinkContainers.getRestClient();
    // Submit job to container cluster
}

Utility Classes

CollectIteratorAssert

Specialized assertions for collected test results.

/**
 * Assertions for collected test results
 * Provides convenient verification of streaming results
 */
class CollectIteratorAssert<T> extends AbstractIterableAssert<CollectIteratorAssert<T>, Iterable<T>, T, ObjectAssert<T>> {
    /** Assert results match expected values in order */
    CollectIteratorAssert<T> containsExactly(T... expected);
    
    /** Assert results contain expected values in any order */
    CollectIteratorAssert<T> containsExactlyInAnyOrder(T... expected);
    
    /** Assert result count matches expected */
    CollectIteratorAssert<T> hasSize(int expectedSize);
    
    /** Assert all results match predicate */
    CollectIteratorAssert<T> allMatch(Predicate<T> predicate);
}

MetricQuerier

Utility for querying and asserting on Flink metrics during tests.

/**
 * Query and assert on Flink metrics
 * Enables verification of connector behavior through metrics
 */
class MetricQuerier {
    /** Create querier for test environment */
    static MetricQuerier forEnvironment(TestEnvironment environment);
    
    /** Query counter metric value */
    long getCounterValue(String metricName);
    
    /** Query gauge metric value */
    double getGaugeValue(String metricName);
    
    /** Query histogram metric */
    HistogramStatistics getHistogramValue(String metricName);
    
    /** Wait for metric to reach expected value */
    void waitForMetric(String metricName, long expectedValue, Duration timeout);
    
    /** Assert metric has expected value */
    void assertMetricEquals(String metricName, long expectedValue);
}

Usage Examples:

import org.apache.flink.connector.testframe.utils.CollectIteratorAssert;
import org.apache.flink.connector.testframe.utils.MetricQuerier;

// Collect and assert results
CloseableIterator<String> results = // ... collect from job
CollectIteratorAssert.assertThat(results)
    .hasSize(1000)
    .containsExactlyInAnyOrder(expectedResults.toArray(new String[0]));

// Query metrics
MetricQuerier metrics = MetricQuerier.forEnvironment(testEnvironment);
metrics.waitForMetric("numRecordsIn", 1000, Duration.ofSeconds(30));
metrics.assertMetricEquals("numRecordsOut", 1000);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-test-utils-parent

docs

client-testing.md

connector-testing.md

core-testing.md

index.md

migration-testing.md

table-testing.md

test-environments.md

tile.json