Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing
—
The container support provides TestContainers integration for running tests in isolated containerized environments with custom Flink clusters. This enables realistic testing scenarios with proper network isolation and resource constraints.
Utilities for creating and managing Flink containers using TestContainers.
/**
* Utility class for creating Flink containers
*/
public class FlinkContainers {
/**
* Create JobManager container with default configuration
* @return Configured JobManager container
*/
public static FlinkContainer jobManager();
/**
* Create TaskManager container with default configuration
* @return Configured TaskManager container
*/
public static FlinkContainer taskManager();
/**
* Create complete Flink cluster (JobManager + TaskManagers)
* @return Configured cluster containers
*/
public static FlinkContainer cluster();
/**
* Create JobManager with custom configuration
* @param configuration Flink configuration
* @return Configured JobManager container
*/
public static FlinkContainer jobManager(Configuration configuration);
/**
* Create TaskManager with custom configuration
* @param configuration Flink configuration
* @return Configured TaskManager container
*/
public static FlinkContainer taskManager(Configuration configuration);
}Usage Examples:
// Create simple cluster
FlinkContainer cluster = FlinkContainers.cluster();
cluster.start();
// Create custom JobManager
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "jobmanager");
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
FlinkContainer jobManager = FlinkContainers.jobManager(config);
jobManager.start();TestContainers-based test environment for isolated testing.
/**
* Test environment using Flink containers for isolated testing
*/
public class FlinkContainerTestEnvironment implements TestEnvironment, ClusterControllable {
/**
* Create container environment with default settings
*/
public FlinkContainerTestEnvironment();
/**
* Create container environment with custom settings
* @param settings Container configuration settings
*/
public FlinkContainerTestEnvironment(FlinkContainersSettings settings);
/**
* Create container environment with custom settings and testcontainers config
* @param settings Container configuration settings
* @param testcontainersSettings TestContainers configuration
*/
public FlinkContainerTestEnvironment(
FlinkContainersSettings settings,
TestcontainersSettings testcontainersSettings
);
@Override
public StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);
@Override
public Endpoint getRestEndpoint();
@Override
public String getCheckpointUri();
@Override
public void startUp() throws Exception;
@Override
public void tearDown() throws Exception;
/**
* Trigger TaskManager failover by stopping and restarting TaskManager containers
* @param jobClient Current job client
* @param afterFailAction Action to execute after triggering failover
*/
@Override
public void triggerTaskManagerFailover(JobClient jobClient, Runnable afterFailAction) throws Exception;
}Usage Examples:
// Default container environment
@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment();
// Custom configuration
@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
FlinkContainersSettings.builder()
.setNumTaskManagers(3)
.setNumSlotsPerTaskManager(2)
.setJobManagerMemory("2g")
.setTaskManagerMemory("1g")
.build()
);
// With TestContainers settings
@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
FlinkContainersSettings.builder().build(),
TestcontainersSettings.builder()
.setNetwork(Network.newNetwork())
.build()
);Utility for building custom Flink images with connector JARs.
/**
* Builder for custom Flink Docker images with connector JARs
*/
public class FlinkImageBuilder {
/**
* Build custom Flink image with connector JARs
* @param jarPaths List of JAR file URLs to include in image
* @return Docker image name for the built image
* @throws ImageBuildException if image build fails
*/
public static DockerImageName buildImage(List<URL> jarPaths) throws ImageBuildException;
/**
* Build custom Flink image with connector JARs and base image
* @param baseImage Base Flink image to extend
* @param jarPaths List of JAR file URLs to include in image
* @return Docker image name for the built image
* @throws ImageBuildException if image build fails
*/
public static DockerImageName buildImage(DockerImageName baseImage, List<URL> jarPaths) throws ImageBuildException;
/**
* Build custom Flink image with connector JARs and additional configuration
* @param baseImage Base Flink image to extend
* @param jarPaths List of JAR file URLs to include in image
* @param additionalFiles Additional files to copy into image
* @return Docker image name for the built image
* @throws ImageBuildException if image build fails
*/
public static DockerImageName buildImage(
DockerImageName baseImage,
List<URL> jarPaths,
Map<String, String> additionalFiles
) throws ImageBuildException;
}
/**
* Exception thrown when Docker image build fails
*/
public class ImageBuildException extends Exception {
public ImageBuildException(String message);
public ImageBuildException(String message, Throwable cause);
}Usage Examples:
// Build image with connector JARs
List<URL> connectorJars = Arrays.asList(
new File("target/my-connector.jar").toURI().toURL(),
new File("lib/dependency.jar").toURI().toURL()
);
try {
DockerImageName customImage = FlinkImageBuilder.buildImage(connectorJars);
// Use custom image in containers
FlinkContainer jobManager = FlinkContainers.jobManager()
.withDockerImageName(customImage);
} catch (ImageBuildException e) {
throw new TestAbortedException("Failed to build custom Flink image", e);
}Interface for configuring TestContainers behavior.
/**
* Interface for configuring Flink TestContainers
*/
public interface FlinkTestcontainersConfigurator {
/**
* Configure JobManager container
* @param jobManager JobManager container to configure
*/
void configureJobManager(GenericContainer<?> jobManager);
/**
* Configure TaskManager container
* @param taskManager TaskManager container to configure
*/
void configureTaskManager(GenericContainer<?> taskManager);
/**
* Configure network settings
* @param network Network to configure
*/
void configureNetwork(Network network);
}Usage Examples:
public class CustomFlinkConfigurator implements FlinkTestcontainersConfigurator {
@Override
public void configureJobManager(GenericContainer<?> jobManager) {
jobManager
.withEnv("JVM_ARGS", "-Xmx2g -Xms2g")
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("JobManager")))
.withStartupTimeout(Duration.ofMinutes(5));
}
@Override
public void configureTaskManager(GenericContainer<?> taskManager) {
taskManager
.withEnv("JVM_ARGS", "-Xmx1g -Xms1g")
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("TaskManager")))
.withStartupTimeout(Duration.ofMinutes(3));
}
@Override
public void configureNetwork(Network network) {
// Custom network configuration
}
}
// Use custom configurator
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
FlinkContainersSettings.builder()
.setConfigurator(new CustomFlinkConfigurator())
.build()
);Configuration for Flink container cluster setup.
/**
* Configuration settings for Flink container environments
*/
public class FlinkContainersSettings {
public static Builder builder();
public static class Builder {
/**
* Set number of TaskManager containers to start
* @param numTaskManagers Number of TaskManager containers (default: 1)
* @return Builder instance
*/
public Builder setNumTaskManagers(int numTaskManagers);
/**
* Set number of slots per TaskManager container
* @param numSlotsPerTaskManager Task slots per TaskManager (default: 2)
* @return Builder instance
*/
public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
/**
* Set JobManager memory allocation
* @param jobManagerMemory Memory setting (e.g., "1g", "512m") (default: "1g")
* @return Builder instance
*/
public Builder setJobManagerMemory(String jobManagerMemory);
/**
* Set TaskManager memory allocation
* @param taskManagerMemory Memory setting (e.g., "1g", "512m") (default: "1g")
* @return Builder instance
*/
public Builder setTaskManagerMemory(String taskManagerMemory);
/**
* Set base Docker image for containers
* @param baseImage Docker image name (default: "flink:1.18")
* @return Builder instance
*/
public Builder setBaseImage(DockerImageName baseImage);
/**
* Set custom TestContainers configurator
* @param configurator Custom configurator for container setup
* @return Builder instance
*/
public Builder setConfigurator(FlinkTestcontainersConfigurator configurator);
/**
* Enable/disable checkpoint recovery testing
* @param enableCheckpointRecovery Enable checkpoint recovery (default: true)
* @return Builder instance
*/
public Builder setEnableCheckpointRecovery(boolean enableCheckpointRecovery);
/**
* Build configured settings
* @return FlinkContainersSettings instance
*/
public FlinkContainersSettings build();
}
public int getNumTaskManagers();
public int getNumSlotsPerTaskManager();
public String getJobManagerMemory();
public String getTaskManagerMemory();
public DockerImageName getBaseImage();
public Optional<FlinkTestcontainersConfigurator> getConfigurator();
public boolean isCheckpointRecoveryEnabled();
}General TestContainers configuration settings.
/**
* General TestContainers configuration settings
*/
public class TestcontainersSettings {
public static Builder builder();
public static class Builder {
/**
* Set Docker network for container communication
* @param network Shared network for containers
* @return Builder instance
*/
public Builder setNetwork(Network network);
/**
* Set log consumers for container output
* @param logConsumers Map of container name to log consumer
* @return Builder instance
*/
public Builder setLogConsumers(Map<String, Consumer<OutputFrame>> logConsumers);
/**
* Set startup timeout for containers
* @param startupTimeout Maximum time to wait for container startup
* @return Builder instance
*/
public Builder setStartupTimeout(Duration startupTimeout);
/**
* Set container registry configuration
* @param registryConfig Docker registry configuration
* @return Builder instance
*/
public Builder setRegistryConfig(DockerClientConfig registryConfig);
/**
* Enable/disable container reuse between tests
* @param reuseContainers Enable container reuse (default: false)
* @return Builder instance
*/
public Builder setReuseContainers(boolean reuseContainers);
/**
* Build configured settings
* @return TestcontainersSettings instance
*/
public TestcontainersSettings build();
}
public Optional<Network> getNetwork();
public Map<String, Consumer<OutputFrame>> getLogConsumers();
public Duration getStartupTimeout();
public Optional<DockerClientConfig> getRegistryConfig();
public boolean isReuseContainers();
}Set up complex test scenarios with multiple external systems.
public class ComplexConnectorTestSuite extends SinkTestSuiteBase<String> {
// Shared network for all containers
private static final Network testNetwork = Network.newNetwork();
@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
FlinkContainersSettings.builder()
.setNumTaskManagers(2)
.setNumSlotsPerTaskManager(2)
.build(),
TestcontainersSettings.builder()
.setNetwork(testNetwork)
.setLogConsumers(Map.of(
"jobmanager", new Slf4jLogConsumer(LoggerFactory.getLogger("JobManager")),
"taskmanager", new Slf4jLogConsumer(LoggerFactory.getLogger("TaskManager"))
))
.build()
);
@TestContext
ExternalContextFactory<KafkaExternalContext> kafkaContextFactory = testName ->
new KafkaExternalContext(testName, testNetwork);
@TestContext
ExternalContextFactory<DatabaseExternalContext> dbContextFactory = testName ->
new DatabaseExternalContext(testName, testNetwork);
}Build custom Flink images for specific testing scenarios.
public class CustomImageTestSuite extends SinkTestSuiteBase<String> {
private static DockerImageName customFlinkImage;
@BeforeAll
static void buildCustomImage() throws Exception {
List<URL> connectorJars = Arrays.asList(
new File("target/my-connector.jar").toURI().toURL(),
new File("lib/kafka-clients.jar").toURI().toURL(),
new File("lib/commons-lang3.jar").toURI().toURL()
);
Map<String, String> additionalFiles = Map.of(
"conf/log4j.properties", "/opt/flink/conf/log4j.properties",
"conf/flink-conf.yaml", "/opt/flink/conf/flink-conf.yaml"
);
customFlinkImage = FlinkImageBuilder.buildImage(
DockerImageName.parse("flink:1.18-java11"),
connectorJars,
additionalFiles
);
}
@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
FlinkContainersSettings.builder()
.setBaseImage(customFlinkImage)
.build()
);
}Implement failure testing using container lifecycle control.
public class FailoverTestSuite extends SourceTestSuiteBase<String> {
@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
FlinkContainersSettings.builder()
.setNumTaskManagers(3) // Multiple TaskManagers for failover testing
.setEnableCheckpointRecovery(true)
.build()
);
@TestTemplate
public void testTaskManagerFailover(
TestEnvironment testEnv,
DataStreamSourceExternalContext<String> externalContext,
ClusterControllable controller,
CheckpointingMode semantic
) throws Exception {
// Start job and validate initial results
JobClient jobClient = startTestJob(testEnv, externalContext, semantic);
validateInitialResults(jobClient);
// Trigger TaskManager failure
controller.triggerTaskManagerFailover(jobClient, () -> {
// Actions after failure triggered
LOG.info("TaskManager failure triggered, waiting for recovery...");
});
// Validate recovery and continued processing
validateRecoveryResults(jobClient);
}
}Monitor container resource usage during tests.
public class PerformanceTestSuite extends SinkTestSuiteBase<String> {
@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
FlinkContainersSettings.builder()
.setJobManagerMemory("2g")
.setTaskManagerMemory("4g")
.setConfigurator(new ResourceMonitoringConfigurator())
.build()
);
private static class ResourceMonitoringConfigurator implements FlinkTestcontainersConfigurator {
@Override
public void configureJobManager(GenericContainer<?> jobManager) {
jobManager
.withCreateContainerCmdModifier(cmd -> {
// Set memory limits
cmd.getHostConfig()
.withMemory(2L * 1024 * 1024 * 1024) // 2GB
.withCpuQuota(100000L); // 1 CPU
})
.withLogConsumer(new ResourceUsageLogConsumer("JobManager"));
}
@Override
public void configureTaskManager(GenericContainer<?> taskManager) {
taskManager
.withCreateContainerCmdModifier(cmd -> {
// Set memory limits
cmd.getHostConfig()
.withMemory(4L * 1024 * 1024 * 1024) // 4GB
.withCpuQuota(200000L); // 2 CPUs
})
.withLogConsumer(new ResourceUsageLogConsumer("TaskManager"));
}
@Override
public void configureNetwork(Network network) {
// Network configuration
}
}
}// Use try-with-resources for automatic cleanup
try (Network network = Network.newNetwork()) {
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
FlinkContainersSettings.builder().build(),
TestcontainersSettings.builder().setNetwork(network).build()
);
// Test execution
} // Network automatically closed// Reuse containers when possible
TestcontainersSettings settings = TestcontainersSettings.builder()
.setReuseContainers(true) // Enable container reuse
.setStartupTimeout(Duration.ofMinutes(2)) // Reasonable timeout
.build();
// Use appropriate resource allocation
FlinkContainersSettings flinkSettings = FlinkContainersSettings.builder()
.setJobManagerMemory("1g") // Don't over-allocate for simple tests
.setTaskManagerMemory("1g")
.setNumTaskManagers(1) // Start with minimal cluster
.build();@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment() {
@Override
public void startUp() throws Exception {
try {
super.startUp();
} catch (Exception e) {
// Handle Docker not available
throw new TestAbortedException("Docker not available for container tests", e);
}
}
};# GitHub Actions example
jobs:
container-tests:
runs-on: ubuntu-latest
services:
docker:
image: docker:20.10
options: --privileged
steps:
- uses: actions/checkout@v3
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '11'
- name: Run container tests
run: ./mvnw test -Dtest=**/*ContainerTest*Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-test-utils