CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-test-utils

Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing

Pending
Overview
Eval results
Files

junit-integration.mddocs/

JUnit Integration

The JUnit integration provides annotation-driven configuration with automatic resource lifecycle management and test parameterization. It extends JUnit 5 with specialized extensions for connector testing.

Capabilities

Core Annotations

Annotations for marking test resources and configuration in test classes.

/**
 * Marks field defining TestEnvironment (PER-CLASS lifecycle)
 * Only one field per test class can be annotated with @TestEnv
 */
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestEnv {}

/**
 * Marks field defining ExternalContextFactory (PER-CASE lifecycle)
 * Multiple fields can be annotated for different external systems
 */
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestContext {}

/**
 * Marks field defining external system configuration
 */
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestExternalSystem {}

/**
 * Marks field defining semantic guarantees for testing
 */
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TestSemantics {}

Usage Examples:

import org.apache.flink.connector.testframe.junit.annotations.*;
import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;

@ExtendWith(ConnectorTestingExtension.class)
public class MyConnectorTestSuite extends SinkTestSuiteBase<String> {
    
    // Test environment (PER-CLASS lifecycle)
    @TestEnv
    MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
    
    // External context factory (PER-CASE lifecycle)
    @TestContext
    ExternalContextFactory<MySinkExternalContext> sinkContextFactory = 
        testName -> new MySinkExternalContext(testName);
    
    // Multiple contexts supported
    @TestContext
    ExternalContextFactory<MyOtherExternalContext> otherContextFactory = 
        testName -> new MyOtherExternalContext(testName);
    
    // External system configuration
    @TestExternalSystem
    MyExternalSystemConfig externalConfig = new MyExternalSystemConfig();
    
    // Semantic configuration
    @TestSemantics
    List<CheckpointingMode> semantics = Arrays.asList(
        CheckpointingMode.EXACTLY_ONCE,
        CheckpointingMode.AT_LEAST_ONCE
    );
}

Connector Testing Extension

Main JUnit 5 extension that orchestrates the testing framework.

/**
 * Main JUnit 5 extension for connector testing framework
 * Provides automatic lifecycle management and parameter injection
 */
public class ConnectorTestingExtension implements 
    BeforeAllCallback,           // Initialize PER-CLASS resources
    AfterAllCallback,            // Cleanup PER-CLASS resources  
    TestTemplateInvocationContextProvider,  // Provide test parameterization
    ParameterResolver            // Inject parameters into test methods
{
    
    /**
     * Initialize test environment and PER-CLASS resources
     * Called once before all test methods in the class
     */
    @Override
    public void beforeAll(ExtensionContext context) throws Exception;
    
    /**
     * Cleanup test environment and PER-CLASS resources
     * Called once after all test methods in the class
     */
    @Override
    public void afterAll(ExtensionContext context) throws Exception;
    
    /**
     * Provide test invocation contexts for parameterized testing
     * Generates combinations of external contexts and semantic modes
     */
    @Override
    public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(ExtensionContext context);
    
    /**
     * Resolve parameters for test method injection
     * Supports TestEnvironment, ExternalContext, CheckpointingMode, ClusterControllable
     */
    @Override
    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext);
    
    @Override
    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext);
}

Automatic Features:

  • Resource Lifecycle: Automatic startup/teardown of test environments
  • Parameter Injection: Injects test resources into @TestTemplate methods
  • Test Parameterization: Generates test combinations across contexts and semantics
  • Error Handling: Proper cleanup even when tests fail
  • Parallel Execution: Supports parallel test execution with resource isolation

Test Case Invocation Context Provider

Provides context for individual test case invocations with proper parameter resolution.

/**
 * Provides test case invocation contexts with parameter resolution
 */
public class TestCaseInvocationContextProvider implements TestTemplateInvocationContextProvider {
    
    /**
     * Check if extension supports the test method
     * @param context Extension context
     * @return true if method uses @TestTemplate with supported parameters
     */
    @Override
    public boolean supportsTestTemplate(ExtensionContext context);
    
    /**
     * Provide invocation contexts for test template
     * @param context Extension context
     * @return Stream of invocation contexts for parameter combinations
     */
    @Override
    public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(ExtensionContext context);
}

Test Method Signatures

Supported Parameter Types

Test methods can declare parameters that will be automatically injected:

// Supported parameter types for @TestTemplate methods:

/**
 * Test environment instance (PER-CLASS lifecycle)
 */
TestEnvironment testEnv

/**
 * External context for sink testing (PER-CASE lifecycle)
 */
DataStreamSinkExternalContext<T> externalContext

/**
 * External context for source testing (PER-CASE lifecycle)  
 */
DataStreamSourceExternalContext<T> externalContext

/**
 * Checkpointing semantic mode (injected per test variation)
 */
CheckpointingMode semantic

/**
 * Cluster controller for failure testing (when supported by environment)
 */
ClusterControllable controller

Example Method Signatures:

// Sink test method
@TestTemplate
public void testBasicSink(
    TestEnvironment testEnv,
    DataStreamSinkExternalContext<String> externalContext,
    CheckpointingMode semantic
) throws Exception {
    // Test implementation
}

// Source test method with failure testing
@TestTemplate  
public void testTaskManagerFailure(
    TestEnvironment testEnv,
    DataStreamSourceExternalContext<String> externalContext,
    ClusterControllable controller,
    CheckpointingMode semantic
) throws Exception {
    // Test implementation
}

// Custom test method
@TestTemplate
public void testCustomScenario(
    TestEnvironment testEnv,
    DataStreamSinkExternalContext<String> sinkContext,
    DataStreamSourceExternalContext<String> sourceContext,
    CheckpointingMode semantic
) throws Exception {
    // Custom test with multiple contexts
}

Lifecycle Management

PER-CLASS Resources

Resources with PER-CLASS lifecycle are shared across all test methods in the class.

@TestEnv
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();

// Lifecycle:
// 1. Created when test class is instantiated
// 2. Started before first test method (@BeforeAll phase)
// 3. Shared by all test methods in the class
// 4. Stopped after last test method (@AfterAll phase)

Benefits:

  • Performance: Avoids expensive cluster startup/teardown per test
  • Resource Efficiency: Reduces memory and CPU usage
  • Test Speed: Faster test execution

PER-CASE Resources

Resources with PER-CASE lifecycle are created fresh for each test case.

@TestContext
ExternalContextFactory<MySinkExternalContext> contextFactory = 
    testName -> new MySinkExternalContext(testName);

// Lifecycle:
// 1. Factory created when test class is instantiated
// 2. New context instance created before each test case
// 3. Context used during test execution
// 4. Context closed after test case completion

Benefits:

  • Test Isolation: Each test gets fresh external resources
  • Resource Cleanup: Automatic cleanup prevents resource leaks
  • Parallel Safety: Tests can run in parallel safely

Lifecycle Sequence

// Class initialization
TestClass instance = new TestClass();

// Before all tests
@BeforeAll
testEnv.startUp(); // Start test environment

// For each test method
for (TestMethod method : testMethods) {
    // For each parameter combination
    for (ParameterCombination params : combinations) {
        
        // Before each test case
        ExternalContext context = contextFactory.createExternalContext(testName);
        
        try {
            // Execute test
            method.invoke(testEnv, context, semantic);
        } finally {
            // After each test case
            context.close(); // Cleanup external resources
        }
    }
}

// After all tests
@AfterAll
testEnv.tearDown(); // Stop test environment

Test Parameterization

Automatic Parameter Generation

The framework automatically generates test parameter combinations:

// Configuration
@TestContext
ExternalContextFactory<Context1> context1Factory = ...;

@TestContext  
ExternalContextFactory<Context2> context2Factory = ...;

@TestSemantics
List<CheckpointingMode> semantics = Arrays.asList(
    CheckpointingMode.EXACTLY_ONCE,
    CheckpointingMode.AT_LEAST_ONCE
);

// Generated combinations:
// 1. testMethod(testEnv, context1, EXACTLY_ONCE)
// 2. testMethod(testEnv, context1, AT_LEAST_ONCE)  
// 3. testMethod(testEnv, context2, EXACTLY_ONCE)
// 4. testMethod(testEnv, context2, AT_LEAST_ONCE)

Display Names

Test cases get descriptive display names based on parameters:

// Generated display names:
// ✓ testBasicSink[Context1, EXACTLY_ONCE]
// ✓ testBasicSink[Context1, AT_LEAST_ONCE]
// ✓ testBasicSink[Context2, EXACTLY_ONCE]  
// ✓ testBasicSink[Context2, AT_LEAST_ONCE]

Custom Parameterization

Override default parameterization for specific test methods:

@TestTemplate
@ParameterizedTest
@EnumSource(CheckpointingMode.class)
public void testCustomParameterization(
    TestEnvironment testEnv,
    DataStreamSinkExternalContext<String> externalContext,
    CheckpointingMode semantic
) throws Exception {
    // Custom parameterization using JUnit 5 @ParameterizedTest
}

Advanced Configuration

Multiple External Systems

Test with multiple external systems simultaneously:

@TestContext
ExternalContextFactory<DatabaseExternalContext> dbContextFactory = 
    testName -> new DatabaseExternalContext(testName);

@TestContext
ExternalContextFactory<MessageQueueExternalContext> mqContextFactory = 
    testName -> new MessageQueueExternalContext(testName);

@TestTemplate
public void testDatabaseToMessageQueue(
    TestEnvironment testEnv,
    DatabaseExternalContext dbContext,
    MessageQueueExternalContext mqContext,
    CheckpointingMode semantic
) throws Exception {
    // Test data flow from database to message queue
}

Conditional Test Execution

Skip tests based on environment or configuration:

@TestTemplate
public void testContainerizedEnvironment(
    TestEnvironment testEnv,
    DataStreamSinkExternalContext<String> externalContext,
    CheckpointingMode semantic
) throws Exception {
    
    assumeTrue(testEnv instanceof FlinkContainerTestEnvironment, 
        "Test requires containerized environment");
    
    // Test implementation
}

@TestTemplate
public void testExactlyOnceOnly(
    TestEnvironment testEnv,
    DataStreamSinkExternalContext<String> externalContext,
    CheckpointingMode semantic
) throws Exception {
    
    assumeTrue(semantic == CheckpointingMode.EXACTLY_ONCE,
        "Test only valid for exactly-once semantic");
        
    // Test implementation
}

Error Handling Configuration

Configure error handling behavior:

@ExtendWith(ConnectorTestingExtension.class)
@TestMethodOrder(OrderAnnotation.class)
public class MyConnectorTestSuite extends SinkTestSuiteBase<String> {
    
    @TestEnv
    MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
    
    @TestContext
    ExternalContextFactory<MySinkExternalContext> contextFactory = testName -> {
        try {
            return new MySinkExternalContext(testName);
        } catch (Exception e) {
            // Convert to TestAbortedException to skip test instead of failing
            throw new TestAbortedException("External system not available", e);
        }
    };
    
    @Order(1)
    @TestTemplate
    public void testPrerequisites(TestEnvironment testEnv) {
        // Verify prerequisites before running main tests
        assumeTrue(checkExternalSystemAvailability(), 
            "External system not available");
    }
}

Integration with IDEs

IntelliJ IDEA

  • Test Discovery: Automatically discovers @TestTemplate methods
  • Parameter Display: Shows parameter combinations in test tree
  • Debug Support: Full debugging support with parameter inspection
  • Test Filtering: Filter tests by parameter values

Eclipse

  • JUnit 5 Support: Requires JUnit 5 plugin for full support
  • Test Execution: Run individual parameter combinations
  • Progress Reporting: Shows progress across parameter combinations

Maven/Gradle

<!-- Maven Surefire configuration -->
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-surefire-plugin</artifactId>
    <version>3.0.0-M9</version>
    <configuration>
        <includes>
            <include>**/*TestSuite.java</include>
        </includes>
        <!-- Enable parallel execution -->
        <parallel>methods</parallel>
        <threadCount>4</threadCount>
    </configuration>
</plugin>
// Gradle test configuration
test {
    useJUnitPlatform()
    
    // Enable parallel execution
    maxParallelForks = 4
    
    // Configure test logging
    testLogging {
        events "passed", "skipped", "failed"
        showStandardStreams = true
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-test-utils

docs

assertions.md

containers.md

external-systems.md

index.md

junit-integration.md

metrics.md

test-environments.md

test-suites.md

tile.json