or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

assertions.mdcontainers.mdexternal-systems.mdindex.mdjunit-integration.mdmetrics.mdtest-environments.mdtest-suites.md
tile.json

tessl/maven-org-apache-flink--flink-connector-test-utils

Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-test-utils@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-test-utils@2.1.0

index.mddocs/

Apache Flink Connector Test Utils

The Apache Flink Connector Test Utils library provides a comprehensive testing framework for Apache Flink connectors. It enables developers to create standardized, robust tests for both source and sink connectors with support for various testing scenarios including failover, scaling, metrics validation, and external system integration.

Package Information

  • Package Name: flink-connector-test-utils
  • Package Type: maven
  • Group ID: org.apache.flink
  • Artifact ID: flink-connector-test-utils
  • Language: Java
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-test-utils</artifactId>
      <version>2.1.0</version>
      <scope>test</scope>
    </dependency>

Core Imports

// Test suite base classes
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;

// JUnit annotations
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;

// Test utilities
import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
import org.apache.flink.connector.testframe.utils.MetricQuerier;

// External context interfaces
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;

Basic Usage

Creating a Sink Test Suite

import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;

@ExtendWith(ConnectorTestingExtension.class)
public class MySinkTestSuite extends SinkTestSuiteBase<String> {
    
    @TestEnv
    MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
    
    @TestContext
    ExternalContextFactory<MySinkExternalContext> sinkContextFactory = 
        (testName) -> new MySinkExternalContext(testName);
    
    // Test methods are automatically provided by SinkTestSuiteBase
    // Including: testBasicSink, testStartFromSavepoint, testScaleUp, etc.
}

// External context implementation
public class MySinkExternalContext extends DataStreamSinkV2ExternalContext<String> {
    
    @Override
    public Sink<String> createSink(TestingSinkSettings sinkSettings) {
        // Return your sink implementation
        return new MySink(/* configuration */);
    }
    
    @Override
    public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
        // Generate test data for your sink
        return Arrays.asList("test1", "test2", "test3");
    }
    
    @Override
    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
        // Return a reader to validate data written to external system
        return new MySinkDataReader(/* configuration */);
    }
}

Creating a Source Test Suite

import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;

public class MySourceTestSuite extends SourceTestSuiteBase<String> {
    
    @TestEnv
    MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
    
    @TestContext
    ExternalContextFactory<MySourceExternalContext> sourceContextFactory = 
        (testName) -> new MySourceExternalContext(testName);
    
    // Test methods are automatically provided by SourceTestSuiteBase
    // Including: testSourceSingleSplit, testMultipleSplits, testSavepoint, etc.
}

Architecture

The testing framework is built around several key components:

  • Test Suite Base Classes: Provide pre-built test cases for common scenarios (basic functionality, failover, scaling, metrics)
  • External Context Framework: Abstracts external system interactions for both sources and sinks
  • Test Environment: Manages Flink cluster lifecycle (MiniCluster or containerized)
  • JUnit Integration: Annotation-driven configuration with resource lifecycle management
  • Assertion Utilities: Specialized assertions for validating connector behavior with different semantic guarantees
  • Container Support: TestContainers integration for isolated testing environments

Capabilities

Test Suite Framework

Core test suite base classes providing standardized test scenarios for connector validation. Supports both sink and source connectors with comprehensive test coverage.

public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
    // Test methods provided automatically via JUnit @TestTemplate
    void testBasicSink(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
    void testStartFromSavepoint(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
    void testScaleUp(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
    void testScaleDown(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
    void testMetrics(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
}

public abstract class SourceTestSuiteBase<T> {
    // Test methods provided automatically via JUnit @TestTemplate
    void testSourceSingleSplit(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
    void testMultipleSplits(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
    void testSavepoint(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
    void testScaleUp(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
    void testScaleDown(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
    void testSourceMetrics(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
    void testIdleReader(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
    void testTaskManagerFailure(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, ClusterControllable controller, CheckpointingMode semantic);
}

Test Suites

External System Integration

Framework for integrating with external systems, providing abstractions for source and sink connectors to interact with their respective external systems.

public interface ExternalContext extends AutoCloseable {
    List<URL> getConnectorJarPaths();
}

public interface ExternalContextFactory<C extends ExternalContext> {
    C createExternalContext(String testName);
}

public abstract class DataStreamSinkV2ExternalContext<T> extends ExternalContext {
    public abstract Sink<T> createSink(TestingSinkSettings sinkSettings);
    public abstract List<T> generateTestData(TestingSinkSettings sinkSettings, long seed);
    public abstract ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);
    public abstract TypeInformation<T> getProducedType();
}

public abstract class DataStreamSourceExternalContext<T> extends ExternalContext {
    public abstract Source<T, ?, ?> createSource(TestingSourceSettings sourceSettings);
    public abstract List<T> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed);
    public abstract ExternalSystemSplitDataWriter<T> createSourceSplitDataWriter(TestingSourceSettings sourceSettings);
    public abstract TypeInformation<T> getProducedType();
}

External System Integration

Test Environment Management

Test environment abstractions for managing Flink cluster lifecycle, supporting both MiniCluster and containerized deployments.

public interface TestEnvironment extends TestResource {
    StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);
    Endpoint getRestEndpoint();
    String getCheckpointUri();
    
    class Endpoint {
        public Endpoint(String address, int port);
        public String getAddress();
        public int getPort();
    }
}

public interface TestResource {
    void startUp() throws Exception;
    void tearDown() throws Exception;
}

Test Environments

Assertion and Validation Utilities

Specialized assertion utilities for validating connector behavior with support for different semantic guarantees (exactly-once, at-least-once).

public final class CollectIteratorAssertions {
    public static <T> CollectIteratorAssert<T> assertThat(Iterator<T> actual);
    public static <T> UnorderedCollectIteratorAssert<T> assertUnordered(Iterator<T> actual);
}

public class CollectIteratorAssert<T> {
    public CollectIteratorAssert<T> matchesRecordsFromSource(List<List<T>> expected, CheckpointingMode semantic);
    public CollectIteratorAssert<T> withNumRecordsLimit(int limit);
}

Assertions and Validation

Metrics and Monitoring

Utilities for querying and validating Flink job metrics via REST API, enabling performance and behavior validation.

public class MetricQuerier {
    public MetricQuerier(Configuration configuration) throws ConfigurationException;
    
    public static JobDetailsInfo getJobDetails(RestClient client, TestEnvironment.Endpoint endpoint, JobID jobId) throws Exception;
    
    public Double getAggregatedMetricsByRestAPI(
        TestEnvironment.Endpoint endpoint,
        JobID jobId,
        String sourceOrSinkName,
        String metricName,
        String filter
    ) throws Exception;
    
    public AggregatedMetricsResponseBody getMetricList(TestEnvironment.Endpoint endpoint, JobID jobId, JobVertexID vertexId) throws Exception;
    public AggregatedMetricsResponseBody getMetrics(TestEnvironment.Endpoint endpoint, JobID jobId, JobVertexID vertexId, String filters) throws Exception;
}

Metrics and Monitoring

JUnit Integration

Annotation-driven JUnit 5 integration with automatic resource lifecycle management and test parameterization.

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestEnv {}

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestContext {}

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestExternalSystem {}

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestSemantics {}

public class ConnectorTestingExtension implements BeforeAllCallback, AfterAllCallback, 
    TestTemplateInvocationContextProvider, ParameterResolver {
    // Automatic lifecycle management and parameter injection
}

JUnit Integration

Container Support

TestContainers integration for running tests in isolated containerized environments with custom Flink clusters.

public class FlinkContainers {
    public static FlinkContainer jobManager();
    public static FlinkContainer taskManager();
    public static FlinkContainer cluster();
}

public class FlinkContainerTestEnvironment implements TestEnvironment {
    public FlinkContainerTestEnvironment(FlinkContainersSettings settings);
    // Implements TestEnvironment methods
}

public class FlinkImageBuilder {
    public static DockerImageName buildImage(List<URL> jarPaths) throws ImageBuildException;
}

Container Support

Types

Core Configuration Types

public class TestEnvironmentSettings {
    public static Builder builder();
    
    public static class Builder {
        public Builder setConnectorJarPaths(List<URL> connectorJarPaths);
        public Builder setSavepointRestorePath(String savepointRestorePath);
        public TestEnvironmentSettings build();
    }
}

public class TestingSinkSettings {
    public static Builder builder();
    
    public static class Builder {
        public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);
        public TestingSinkSettings build();
    }
}

public class TestingSourceSettings {
    public static Builder builder();
    
    public static class Builder {
        public Builder setBoundedness(Boundedness boundedness);
        public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);
        public TestingSourceSettings build();
    }
}

Data Interface Types

public interface ExternalSystemDataReader<T> {
    List<T> poll(Duration timeout);
}

public interface ExternalSystemSplitDataWriter<T> {
    void writeRecords(List<T> records);
}

Container Configuration Types

public class FlinkContainersSettings {
    public static Builder builder();
    
    public static class Builder {
        public Builder setNumTaskManagers(int numTaskManagers);
        public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
        public Builder setJobManagerMemory(String jobManagerMemory);
        public Builder setTaskManagerMemory(String taskManagerMemory);
        public FlinkContainersSettings build();
    }
}

public class TestcontainersSettings {
    public static Builder builder();
    
    public static class Builder {
        public Builder setNetwork(Network network);
        public Builder setLogConsumers(Map<String, Consumer<OutputFrame>> logConsumers);
        public TestcontainersSettings build();
    }
}