CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-test-utils

Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing

Pending
Overview
Eval results
Files

test-suites.mddocs/

Test Suites

The test suite framework provides base classes that implement comprehensive test scenarios for connector validation. These base classes use JUnit 5's @TestTemplate mechanism to automatically run multiple test variations with different configurations.

Capabilities

Sink Test Suite Base

Base class for testing sink connectors with comprehensive scenarios including basic functionality, savepoint restart, scaling, and metrics validation.

/**
 * Base class for sink test suite providing comprehensive test scenarios
 * @param <T> Type of data elements, must be Comparable for result validation
 */
public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
    
    /**
     * Test basic sink functionality with data writing and validation
     * @param testEnv The test environment (injected by framework)
     * @param externalContext External system context (injected by framework)
     * @param semantic Checkpointing semantic (injected by framework)
     */
    @TestTemplate
    @DisplayName("Test data stream sink")
    public void testBasicSink(
        TestEnvironment testEnv,
        DataStreamSinkExternalContext<T> externalContext,
        CheckpointingMode semantic
    ) throws Exception;
    
    /**
     * Test sink restart from savepoint with same parallelism
     * @param testEnv The test environment
     * @param externalContext External system context
     * @param semantic Checkpointing semantic
     */
    @TestTemplate
    @DisplayName("Test sink restarting from a savepoint")
    public void testStartFromSavepoint(
        TestEnvironment testEnv,
        DataStreamSinkExternalContext<T> externalContext,
        CheckpointingMode semantic
    ) throws Exception;
    
    /**
     * Test sink restart with higher parallelism (scale up)
     * @param testEnv The test environment
     * @param externalContext External system context
     * @param semantic Checkpointing semantic
     */
    @TestTemplate
    @DisplayName("Test sink restarting with a higher parallelism")
    public void testScaleUp(
        TestEnvironment testEnv,
        DataStreamSinkExternalContext<T> externalContext,
        CheckpointingMode semantic
    ) throws Exception;
    
    /**
     * Test sink restart with lower parallelism (scale down)
     * @param testEnv The test environment
     * @param externalContext External system context
     * @param semantic Checkpointing semantic
     */
    @TestTemplate
    @DisplayName("Test sink restarting with a lower parallelism")
    public void testScaleDown(
        TestEnvironment testEnv,
        DataStreamSinkExternalContext<T> externalContext,
        CheckpointingMode semantic
    ) throws Exception;
    
    /**
     * Test sink metrics reporting (e.g., numRecordsOut)
     * @param testEnv The test environment
     * @param externalContext External system context
     * @param semantic Checkpointing semantic
     */
    @TestTemplate
    @DisplayName("Test sink metrics")
    public void testMetrics(
        TestEnvironment testEnv,
        DataStreamSinkExternalContext<T> externalContext,
        CheckpointingMode semantic
    ) throws Exception;
}

Usage Examples:

import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
import org.apache.flink.connector.testframe.junit.annotations.*;

public class KafkaSinkTestSuite extends SinkTestSuiteBase<String> {
    
    @TestEnv
    MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
    
    @TestContext
    ExternalContextFactory<KafkaSinkExternalContext> contextFactory = 
        testName -> new KafkaSinkExternalContext(testName);
    
    // All test methods are inherited and run automatically
    // testBasicSink, testStartFromSavepoint, testScaleUp, testScaleDown, testMetrics
}

Source Test Suite Base

Base class for testing source connectors with scenarios including single/multiple splits, savepoint restart, scaling, idle readers, and failover testing.

/**
 * Base class for source test suite providing comprehensive test scenarios
 * @param <T> Type of data elements produced by the source
 */
public abstract class SourceTestSuiteBase<T> {
    
    /**
     * Test source with single split (bounded source required)
     * @param testEnv The test environment
     * @param externalContext External system context
     * @param semantic Checkpointing semantic
     */
    @TestTemplate
    @DisplayName("Test source with single split")
    public void testSourceSingleSplit(
        TestEnvironment testEnv,
        DataStreamSourceExternalContext<T> externalContext,
        CheckpointingMode semantic
    ) throws Exception;
    
    /**
     * Test source with multiple splits (bounded source required)
     * @param testEnv The test environment
     * @param externalContext External system context
     * @param semantic Checkpointing semantic
     */
    @TestTemplate
    @DisplayName("Test source with multiple splits")
    public void testMultipleSplits(
        TestEnvironment testEnv,
        DataStreamSourceExternalContext<T> externalContext,
        CheckpointingMode semantic
    ) throws Exception;
    
    /**
     * Test source restart from savepoint
     * @param testEnv The test environment
     * @param externalContext External system context
     * @param semantic Checkpointing semantic
     */
    @TestTemplate
    @DisplayName("Test source restarting from a savepoint")
    public void testSavepoint(
        TestEnvironment testEnv,
        DataStreamSourceExternalContext<T> externalContext,
        CheckpointingMode semantic
    ) throws Exception;
    
    /**
     * Test source scale up (restart with higher parallelism)
     * @param testEnv The test environment
     * @param externalContext External system context
     * @param semantic Checkpointing semantic
     */
    @TestTemplate
    @DisplayName("Test source restarting with a higher parallelism")
    public void testScaleUp(
        TestEnvironment testEnv,
        DataStreamSourceExternalContext<T> externalContext,
        CheckpointingMode semantic
    ) throws Exception;
    
    /**
     * Test source scale down (restart with lower parallelism)
     * @param testEnv The test environment
     * @param externalContext External system context
     * @param semantic Checkpointing semantic
     */
    @TestTemplate
    @DisplayName("Test source restarting with a lower parallelism")
    public void testScaleDown(
        TestEnvironment testEnv,
        DataStreamSourceExternalContext<T> externalContext,
        CheckpointingMode semantic
    ) throws Exception;
    
    /**
     * Test source metrics reporting (e.g., numRecordsIn)
     * @param testEnv The test environment
     * @param externalContext External system context
     * @param semantic Checkpointing semantic
     */
    @TestTemplate
    @DisplayName("Test source metrics")
    public void testSourceMetrics(
        TestEnvironment testEnv,
        DataStreamSourceExternalContext<T> externalContext,
        CheckpointingMode semantic
    ) throws Exception;
    
    /**
     * Test source with idle readers (bounded source required)
     * Tests that sources properly handle idle readers with no assigned splits
     * @param testEnv The test environment
     * @param externalContext External system context
     * @param semantic Checkpointing semantic
     */
    @TestTemplate
    @DisplayName("Test source with at least one idle parallelism")
    public void testIdleReader(
        TestEnvironment testEnv,
        DataStreamSourceExternalContext<T> externalContext,
        CheckpointingMode semantic
    ) throws Exception;
    
    /**
     * Test source with TaskManager failover (unbounded source required)
     * @param testEnv The test environment
     * @param externalContext External system context
     * @param controller Cluster controller for triggering failures
     * @param semantic Checkpointing semantic
     */
    @TestTemplate
    @DisplayName("Test TaskManager failure")
    public void testTaskManagerFailure(
        TestEnvironment testEnv,
        DataStreamSourceExternalContext<T> externalContext,
        ClusterControllable controller,
        CheckpointingMode semantic
    ) throws Exception;
}

Usage Examples:

import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;

public class KafkaSourceTestSuite extends SourceTestSuiteBase<String> {
    
    @TestEnv
    MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
    
    @TestContext
    ExternalContextFactory<KafkaSourceExternalContext> contextFactory = 
        testName -> new KafkaSourceExternalContext(testName);
    
    // All test methods are inherited:
    // testSourceSingleSplit, testMultipleSplits, testSavepoint, 
    // testScaleUp, testScaleDown, testSourceMetrics, testIdleReader, testTaskManagerFailure
}

Helper Methods

Both base classes provide protected helper methods for test data generation and result validation.

// SinkTestSuiteBase helper methods
public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
    
    /**
     * Generate test data for sink testing
     * @param testingSinkSettings Settings for the sink test
     * @param externalContext External context for data generation
     * @return List of generated test records
     */
    protected List<T> generateTestData(
        TestingSinkSettings testingSinkSettings,
        DataStreamSinkExternalContext<T> externalContext
    );
    
    /**
     * Validate sink results with semantic guarantees
     * @param reader Reader for external system data
     * @param testData Expected test data
     * @param semantic Semantic guarantee to validate against
     */
    protected void checkResultWithSemantic(
        ExternalSystemDataReader<T> reader,
        List<T> testData,
        CheckpointingMode semantic
    ) throws Exception;
}

// SourceTestSuiteBase helper methods
public abstract class SourceTestSuiteBase<T> {
    
    /**
     * Generate test data and write to external system
     * @param splitIndex Index of the split to write to
     * @param externalContext External context
     * @param testingSourceSettings Source settings
     * @return List of generated test records
     */
    protected List<T> generateAndWriteTestData(
        int splitIndex,
        DataStreamSourceExternalContext<T> externalContext,
        TestingSourceSettings testingSourceSettings
    );
    
    /**
     * Validate source results with semantic guarantees
     * @param resultIterator Iterator over result data
     * @param testData Expected test data (list of splits)
     * @param semantic Semantic guarantee to validate against
     * @param limit Optional limit for unbounded sources
     */
    protected void checkResultWithSemantic(
        CloseableIterator<T> resultIterator,
        List<List<T>> testData,
        CheckpointingMode semantic,
        Integer limit
    );
    
    /**
     * Create source instance for testing
     * @param externalContext External context
     * @param sourceOptions Source configuration
     * @return Source instance
     */
    protected Source<T, ?, ?> tryCreateSource(
        DataStreamSourceExternalContext<T> externalContext,
        TestingSourceSettings sourceOptions
    );
    
    /**
     * Submit Flink job for testing
     * @param env Stream execution environment
     * @param jobName Name for the job
     * @return JobClient for managing the job
     */
    protected JobClient submitJob(StreamExecutionEnvironment env, String jobName) throws Exception;
}

Test Execution Flow

Sink Test Flow

  1. Setup Phase: Framework injects test environment, external context, and semantic mode
  2. Data Generation: Generate test data using external context
  3. Job Creation: Create Flink job with sink connected to external system
  4. Job Execution: Execute job and wait for completion
  5. Validation: Read data from external system and validate against expected results
  6. Cleanup: Framework handles resource cleanup

Source Test Flow

  1. Setup Phase: Framework injects test environment, external context, and semantic mode
  2. Data Preparation: Write test data to external system splits
  3. Job Creation: Create Flink job with source reading from external system
  4. Job Execution: Execute job and collect results
  5. Validation: Validate collected results against written test data
  6. Cleanup: Framework handles resource cleanup

Test Configuration

Tests are automatically parameterized across different configurations:

  • Checkpointing Modes: EXACTLY_ONCE, AT_LEAST_ONCE
  • External Contexts: All contexts provided via @TestContext annotations
  • Test Environments: Environment provided via @TestEnv annotation

The framework automatically generates test combinations and runs each test method multiple times with different parameter combinations.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-test-utils

docs

assertions.md

containers.md

external-systems.md

index.md

junit-integration.md

metrics.md

test-environments.md

test-suites.md

tile.json