CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-test-utils

Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing

Pending
Overview
Eval results
Files

containers.mddocs/

Container Support

The container support provides TestContainers integration for running tests in isolated containerized environments with custom Flink clusters. This enables realistic testing scenarios with proper network isolation and resource constraints.

Capabilities

Flink Containers

Utilities for creating and managing Flink containers using TestContainers.

/**
 * Utility class for creating Flink containers
 */
public class FlinkContainers {
    
    /**
     * Create JobManager container with default configuration
     * @return Configured JobManager container
     */
    public static FlinkContainer jobManager();
    
    /**
     * Create TaskManager container with default configuration
     * @return Configured TaskManager container
     */
    public static FlinkContainer taskManager();
    
    /**
     * Create complete Flink cluster (JobManager + TaskManagers)
     * @return Configured cluster containers
     */
    public static FlinkContainer cluster();
    
    /**
     * Create JobManager with custom configuration
     * @param configuration Flink configuration
     * @return Configured JobManager container
     */
    public static FlinkContainer jobManager(Configuration configuration);
    
    /**
     * Create TaskManager with custom configuration
     * @param configuration Flink configuration
     * @return Configured TaskManager container
     */
    public static FlinkContainer taskManager(Configuration configuration);
}

Usage Examples:

// Create simple cluster
FlinkContainer cluster = FlinkContainers.cluster();
cluster.start();

// Create custom JobManager
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "jobmanager");
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);

FlinkContainer jobManager = FlinkContainers.jobManager(config);
jobManager.start();

Container Test Environment

TestContainers-based test environment for isolated testing.

/**
 * Test environment using Flink containers for isolated testing
 */
public class FlinkContainerTestEnvironment implements TestEnvironment, ClusterControllable {
    
    /**
     * Create container environment with default settings
     */
    public FlinkContainerTestEnvironment();
    
    /**
     * Create container environment with custom settings
     * @param settings Container configuration settings
     */
    public FlinkContainerTestEnvironment(FlinkContainersSettings settings);
    
    /**
     * Create container environment with custom settings and testcontainers config
     * @param settings Container configuration settings
     * @param testcontainersSettings TestContainers configuration
     */
    public FlinkContainerTestEnvironment(
        FlinkContainersSettings settings, 
        TestcontainersSettings testcontainersSettings
    );
    
    @Override
    public StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);
    
    @Override
    public Endpoint getRestEndpoint();
    
    @Override
    public String getCheckpointUri();
    
    @Override
    public void startUp() throws Exception;
    
    @Override
    public void tearDown() throws Exception;
    
    /**
     * Trigger TaskManager failover by stopping and restarting TaskManager containers
     * @param jobClient Current job client
     * @param afterFailAction Action to execute after triggering failover
     */
    @Override
    public void triggerTaskManagerFailover(JobClient jobClient, Runnable afterFailAction) throws Exception;
}

Usage Examples:

// Default container environment
@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment();

// Custom configuration
@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
    FlinkContainersSettings.builder()
        .setNumTaskManagers(3)
        .setNumSlotsPerTaskManager(2)
        .setJobManagerMemory("2g")
        .setTaskManagerMemory("1g")
        .build()
);

// With TestContainers settings
@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
    FlinkContainersSettings.builder().build(),
    TestcontainersSettings.builder()
        .setNetwork(Network.newNetwork())
        .build()
);

Image Builder

Utility for building custom Flink images with connector JARs.

/**
 * Builder for custom Flink Docker images with connector JARs
 */
public class FlinkImageBuilder {
    
    /**
     * Build custom Flink image with connector JARs
     * @param jarPaths List of JAR file URLs to include in image
     * @return Docker image name for the built image
     * @throws ImageBuildException if image build fails
     */
    public static DockerImageName buildImage(List<URL> jarPaths) throws ImageBuildException;
    
    /**
     * Build custom Flink image with connector JARs and base image
     * @param baseImage Base Flink image to extend
     * @param jarPaths List of JAR file URLs to include in image
     * @return Docker image name for the built image
     * @throws ImageBuildException if image build fails
     */
    public static DockerImageName buildImage(DockerImageName baseImage, List<URL> jarPaths) throws ImageBuildException;
    
    /**
     * Build custom Flink image with connector JARs and additional configuration
     * @param baseImage Base Flink image to extend
     * @param jarPaths List of JAR file URLs to include in image
     * @param additionalFiles Additional files to copy into image
     * @return Docker image name for the built image
     * @throws ImageBuildException if image build fails
     */
    public static DockerImageName buildImage(
        DockerImageName baseImage, 
        List<URL> jarPaths,
        Map<String, String> additionalFiles
    ) throws ImageBuildException;
}

/**
 * Exception thrown when Docker image build fails
 */
public class ImageBuildException extends Exception {
    public ImageBuildException(String message);
    public ImageBuildException(String message, Throwable cause);
}

Usage Examples:

// Build image with connector JARs
List<URL> connectorJars = Arrays.asList(
    new File("target/my-connector.jar").toURI().toURL(),
    new File("lib/dependency.jar").toURI().toURL()
);

try {
    DockerImageName customImage = FlinkImageBuilder.buildImage(connectorJars);
    
    // Use custom image in containers
    FlinkContainer jobManager = FlinkContainers.jobManager()
        .withDockerImageName(customImage);
        
} catch (ImageBuildException e) {
    throw new TestAbortedException("Failed to build custom Flink image", e);
}

TestContainers Configurator

Interface for configuring TestContainers behavior.

/**
 * Interface for configuring Flink TestContainers
 */
public interface FlinkTestcontainersConfigurator {
    
    /**
     * Configure JobManager container
     * @param jobManager JobManager container to configure
     */
    void configureJobManager(GenericContainer<?> jobManager);
    
    /**
     * Configure TaskManager container
     * @param taskManager TaskManager container to configure
     */
    void configureTaskManager(GenericContainer<?> taskManager);
    
    /**
     * Configure network settings
     * @param network Network to configure
     */
    void configureNetwork(Network network);
}

Usage Examples:

public class CustomFlinkConfigurator implements FlinkTestcontainersConfigurator {
    
    @Override
    public void configureJobManager(GenericContainer<?> jobManager) {
        jobManager
            .withEnv("JVM_ARGS", "-Xmx2g -Xms2g")
            .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("JobManager")))
            .withStartupTimeout(Duration.ofMinutes(5));
    }
    
    @Override
    public void configureTaskManager(GenericContainer<?> taskManager) {
        taskManager
            .withEnv("JVM_ARGS", "-Xmx1g -Xms1g")  
            .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("TaskManager")))
            .withStartupTimeout(Duration.ofMinutes(3));
    }
    
    @Override
    public void configureNetwork(Network network) {
        // Custom network configuration
    }
}

// Use custom configurator
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
    FlinkContainersSettings.builder()
        .setConfigurator(new CustomFlinkConfigurator())
        .build()
);

Configuration

Flink Container Settings

Configuration for Flink container cluster setup.

/**
 * Configuration settings for Flink container environments
 */
public class FlinkContainersSettings {
    
    public static Builder builder();
    
    public static class Builder {
        /**
         * Set number of TaskManager containers to start
         * @param numTaskManagers Number of TaskManager containers (default: 1)
         * @return Builder instance
         */
        public Builder setNumTaskManagers(int numTaskManagers);
        
        /**
         * Set number of slots per TaskManager container
         * @param numSlotsPerTaskManager Task slots per TaskManager (default: 2)
         * @return Builder instance
         */
        public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
        
        /**
         * Set JobManager memory allocation
         * @param jobManagerMemory Memory setting (e.g., "1g", "512m") (default: "1g")
         * @return Builder instance
         */
        public Builder setJobManagerMemory(String jobManagerMemory);
        
        /**
         * Set TaskManager memory allocation
         * @param taskManagerMemory Memory setting (e.g., "1g", "512m") (default: "1g") 
         * @return Builder instance
         */
        public Builder setTaskManagerMemory(String taskManagerMemory);
        
        /**
         * Set base Docker image for containers
         * @param baseImage Docker image name (default: "flink:1.18")
         * @return Builder instance
         */
        public Builder setBaseImage(DockerImageName baseImage);
        
        /**
         * Set custom TestContainers configurator
         * @param configurator Custom configurator for container setup
         * @return Builder instance
         */
        public Builder setConfigurator(FlinkTestcontainersConfigurator configurator);
        
        /**
         * Enable/disable checkpoint recovery testing
         * @param enableCheckpointRecovery Enable checkpoint recovery (default: true)
         * @return Builder instance
         */
        public Builder setEnableCheckpointRecovery(boolean enableCheckpointRecovery);
        
        /**
         * Build configured settings
         * @return FlinkContainersSettings instance
         */
        public FlinkContainersSettings build();
    }
    
    public int getNumTaskManagers();
    public int getNumSlotsPerTaskManager();
    public String getJobManagerMemory();
    public String getTaskManagerMemory();
    public DockerImageName getBaseImage();
    public Optional<FlinkTestcontainersConfigurator> getConfigurator();
    public boolean isCheckpointRecoveryEnabled();
}

TestContainers Settings

General TestContainers configuration settings.

/**
 * General TestContainers configuration settings
 */
public class TestcontainersSettings {
    
    public static Builder builder();
    
    public static class Builder {
        /**
         * Set Docker network for container communication
         * @param network Shared network for containers
         * @return Builder instance
         */
        public Builder setNetwork(Network network);
        
        /**
         * Set log consumers for container output
         * @param logConsumers Map of container name to log consumer
         * @return Builder instance
         */
        public Builder setLogConsumers(Map<String, Consumer<OutputFrame>> logConsumers);
        
        /**
         * Set startup timeout for containers
         * @param startupTimeout Maximum time to wait for container startup
         * @return Builder instance
         */
        public Builder setStartupTimeout(Duration startupTimeout);
        
        /**
         * Set container registry configuration
         * @param registryConfig Docker registry configuration
         * @return Builder instance
         */
        public Builder setRegistryConfig(DockerClientConfig registryConfig);
        
        /**
         * Enable/disable container reuse between tests
         * @param reuseContainers Enable container reuse (default: false)
         * @return Builder instance
         */
        public Builder setReuseContainers(boolean reuseContainers);
        
        /**
         * Build configured settings
         * @return TestcontainersSettings instance
         */
        public TestcontainersSettings build();
    }
    
    public Optional<Network> getNetwork();
    public Map<String, Consumer<OutputFrame>> getLogConsumers();
    public Duration getStartupTimeout();
    public Optional<DockerClientConfig> getRegistryConfig();
    public boolean isReuseContainers();
}

Advanced Usage Patterns

Multi-Container Test Setup

Set up complex test scenarios with multiple external systems.

public class ComplexConnectorTestSuite extends SinkTestSuiteBase<String> {
    
    // Shared network for all containers
    private static final Network testNetwork = Network.newNetwork();
    
    @TestEnv
    FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
        FlinkContainersSettings.builder()
            .setNumTaskManagers(2)
            .setNumSlotsPerTaskManager(2)
            .build(),
        TestcontainersSettings.builder()
            .setNetwork(testNetwork)
            .setLogConsumers(Map.of(
                "jobmanager", new Slf4jLogConsumer(LoggerFactory.getLogger("JobManager")),
                "taskmanager", new Slf4jLogConsumer(LoggerFactory.getLogger("TaskManager"))
            ))
            .build()
    );
    
    @TestContext
    ExternalContextFactory<KafkaExternalContext> kafkaContextFactory = testName -> 
        new KafkaExternalContext(testName, testNetwork);
    
    @TestContext
    ExternalContextFactory<DatabaseExternalContext> dbContextFactory = testName ->
        new DatabaseExternalContext(testName, testNetwork);
}

Custom Image Building

Build custom Flink images for specific testing scenarios.

public class CustomImageTestSuite extends SinkTestSuiteBase<String> {
    
    private static DockerImageName customFlinkImage;
    
    @BeforeAll
    static void buildCustomImage() throws Exception {
        List<URL> connectorJars = Arrays.asList(
            new File("target/my-connector.jar").toURI().toURL(),
            new File("lib/kafka-clients.jar").toURI().toURL(),
            new File("lib/commons-lang3.jar").toURI().toURL()
        );
        
        Map<String, String> additionalFiles = Map.of(
            "conf/log4j.properties", "/opt/flink/conf/log4j.properties",
            "conf/flink-conf.yaml", "/opt/flink/conf/flink-conf.yaml"
        );
        
        customFlinkImage = FlinkImageBuilder.buildImage(
            DockerImageName.parse("flink:1.18-java11"),
            connectorJars,
            additionalFiles
        );
    }
    
    @TestEnv
    FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
        FlinkContainersSettings.builder()
            .setBaseImage(customFlinkImage)
            .build()
    );
}

Failure Testing with Containers

Implement failure testing using container lifecycle control.

public class FailoverTestSuite extends SourceTestSuiteBase<String> {
    
    @TestEnv
    FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
        FlinkContainersSettings.builder()
            .setNumTaskManagers(3) // Multiple TaskManagers for failover testing
            .setEnableCheckpointRecovery(true)
            .build()
    );
    
    @TestTemplate
    public void testTaskManagerFailover(
        TestEnvironment testEnv,
        DataStreamSourceExternalContext<String> externalContext,
        ClusterControllable controller,
        CheckpointingMode semantic
    ) throws Exception {
        
        // Start job and validate initial results
        JobClient jobClient = startTestJob(testEnv, externalContext, semantic);
        validateInitialResults(jobClient);
        
        // Trigger TaskManager failure
        controller.triggerTaskManagerFailover(jobClient, () -> {
            // Actions after failure triggered
            LOG.info("TaskManager failure triggered, waiting for recovery...");
        });
        
        // Validate recovery and continued processing
        validateRecoveryResults(jobClient);
    }
}

Resource Monitoring

Monitor container resource usage during tests.

public class PerformanceTestSuite extends SinkTestSuiteBase<String> {
    
    @TestEnv
    FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
        FlinkContainersSettings.builder()
            .setJobManagerMemory("2g")
            .setTaskManagerMemory("4g")
            .setConfigurator(new ResourceMonitoringConfigurator())
            .build()
    );
    
    private static class ResourceMonitoringConfigurator implements FlinkTestcontainersConfigurator {
        
        @Override
        public void configureJobManager(GenericContainer<?> jobManager) {
            jobManager
                .withCreateContainerCmdModifier(cmd -> {
                    // Set memory limits
                    cmd.getHostConfig()
                        .withMemory(2L * 1024 * 1024 * 1024) // 2GB
                        .withCpuQuota(100000L); // 1 CPU
                })
                .withLogConsumer(new ResourceUsageLogConsumer("JobManager"));
        }
        
        @Override
        public void configureTaskManager(GenericContainer<?> taskManager) {
            taskManager
                .withCreateContainerCmdModifier(cmd -> {
                    // Set memory limits
                    cmd.getHostConfig()
                        .withMemory(4L * 1024 * 1024 * 1024) // 4GB
                        .withCpuQuota(200000L); // 2 CPUs
                })
                .withLogConsumer(new ResourceUsageLogConsumer("TaskManager"));
        }
        
        @Override
        public void configureNetwork(Network network) {
            // Network configuration
        }
    }
}

Best Practices

Resource Management

// Use try-with-resources for automatic cleanup
try (Network network = Network.newNetwork()) {
    FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
        FlinkContainersSettings.builder().build(),
        TestcontainersSettings.builder().setNetwork(network).build()
    );
    
    // Test execution
} // Network automatically closed

Performance Optimization

// Reuse containers when possible
TestcontainersSettings settings = TestcontainersSettings.builder()
    .setReuseContainers(true) // Enable container reuse
    .setStartupTimeout(Duration.ofMinutes(2)) // Reasonable timeout
    .build();

// Use appropriate resource allocation
FlinkContainersSettings flinkSettings = FlinkContainersSettings.builder()
    .setJobManagerMemory("1g") // Don't over-allocate for simple tests
    .setTaskManagerMemory("1g")
    .setNumTaskManagers(1) // Start with minimal cluster
    .build();

Error Handling

@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment() {
    @Override
    public void startUp() throws Exception {
        try {
            super.startUp();
        } catch (Exception e) {
            // Handle Docker not available
            throw new TestAbortedException("Docker not available for container tests", e);
        }
    }
};

CI/CD Integration

# GitHub Actions example
jobs:
  container-tests:
    runs-on: ubuntu-latest
    services:
      docker:
        image: docker:20.10
        options: --privileged
    steps:
      - uses: actions/checkout@v3
      - name: Set up JDK 11
        uses: actions/setup-java@v3
        with:
          java-version: '11'
      - name: Run container tests
        run: ./mvnw test -Dtest=**/*ContainerTest*

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-test-utils

docs

assertions.md

containers.md

external-systems.md

index.md

junit-integration.md

metrics.md

test-environments.md

test-suites.md

tile.json