Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing
—
Test environments manage the Flink cluster lifecycle and provide execution contexts for connector tests. They abstract away the complexity of cluster management while supporting various deployment modes.
Base interface for all test environment implementations.
/**
* Test environment for running Flink jobs
* Manages Flink cluster lifecycle and provides execution context
*/
public interface TestEnvironment extends TestResource {
/**
* Create StreamExecutionEnvironment for job building and execution
* @param envOptions Environment configuration options
* @return Configured StreamExecutionEnvironment bound to this cluster
*/
StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);
/**
* Get REST endpoint for cluster communication
* @return Endpoint with address and port for REST API access
*/
Endpoint getRestEndpoint();
/**
* Get checkpoint/savepoint storage path
* @return URI string for checkpoint and savepoint storage
*/
String getCheckpointUri();
/**
* Endpoint configuration for REST API access
*/
class Endpoint {
public Endpoint(String address, int port);
public String getAddress();
public int getPort();
}
}
/**
* Base interface for test resource lifecycle management
*/
public interface TestResource {
/**
* Start up the test resource (idempotent operation)
* @throws Exception if startup fails
*/
void startUp() throws Exception;
/**
* Tear down the test resource
* Should handle cleanup even if startup never occurred
* @throws Exception if teardown fails
*/
void tearDown() throws Exception;
}Usage Examples:
// Environment registration
@TestEnv
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
// Using environment in test
StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(
TestEnvironmentSettings.builder()
.setConnectorJarPaths(externalContext.getConnectorJarPaths())
.build()
);In-process test environment using Flink's MiniCluster for fast, lightweight testing.
/**
* Test environment using Flink MiniCluster for in-process testing
*/
public class MiniClusterTestEnvironment implements TestEnvironment {
/**
* Create MiniCluster environment with default configuration
*/
public MiniClusterTestEnvironment();
/**
* Create MiniCluster environment with custom configuration
* @param miniClusterConfig MiniCluster configuration
*/
public MiniClusterTestEnvironment(MiniClusterConfiguration miniClusterConfig);
@Override
public StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);
@Override
public Endpoint getRestEndpoint();
@Override
public String getCheckpointUri();
@Override
public void startUp() throws Exception;
@Override
public void tearDown() throws Exception;
}Usage Examples:
// Default MiniCluster environment
@TestEnv
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
// Custom MiniCluster configuration
@TestEnv
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment(
new MiniClusterConfiguration(
new Configuration(),
2, // number of task managers
2, // number of slots per task manager
ResourceID.generate(),
Time.minutes(10) // timeout
)
);Containerized test environment using Docker containers for isolated testing.
/**
* Test environment using Flink containers for isolated testing
*/
public class FlinkContainerTestEnvironment implements TestEnvironment {
/**
* Create container environment with specified settings
* @param settings Container configuration settings
*/
public FlinkContainerTestEnvironment(FlinkContainersSettings settings);
/**
* Create container environment with default settings
*/
public FlinkContainerTestEnvironment();
@Override
public StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);
@Override
public Endpoint getRestEndpoint();
@Override
public String getCheckpointUri();
@Override
public void startUp() throws Exception;
@Override
public void tearDown() throws Exception;
}Usage Examples:
// Default container environment
@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment();
// Custom container configuration
@TestEnv
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
FlinkContainersSettings.builder()
.setNumTaskManagers(2)
.setNumSlotsPerTaskManager(2)
.setJobManagerMemory("1g")
.setTaskManagerMemory("1g")
.build()
);Interface for environments that support cluster control operations like failover simulation.
/**
* Interface for test environments that support cluster control operations
*/
public interface ClusterControllable {
/**
* Trigger TaskManager failover during test execution
* @param jobClient Current job client
* @param afterFailAction Action to execute after triggering failover
* @throws Exception if failover cannot be triggered
*/
void triggerTaskManagerFailover(JobClient jobClient, Runnable afterFailAction) throws Exception;
}Usage Examples:
// In test method that supports cluster controllable
@TestTemplate
public void testTaskManagerFailure(
TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
ClusterControllable controller, // Injected when environment supports it
CheckpointingMode semantic
) throws Exception {
// Test implementation uses controller to trigger failover
controller.triggerTaskManagerFailover(jobClient, () -> {
// Actions to perform after failover is triggered
});
}Configuration for test environment behavior and job submission.
/**
* Configuration settings for test environment setup
*/
public class TestEnvironmentSettings {
public static Builder builder();
public static class Builder {
/**
* Set connector JAR paths to attach to jobs
* @param connectorJarPaths List of connector JAR URLs
* @return Builder instance
*/
public Builder setConnectorJarPaths(List<URL> connectorJarPaths);
/**
* Set savepoint path for job restoration
* @param savepointRestorePath Path to savepoint for job restart
* @return Builder instance
*/
public Builder setSavepointRestorePath(String savepointRestorePath);
/**
* Build the settings instance
* @return Configured TestEnvironmentSettings
*/
public TestEnvironmentSettings build();
}
public List<URL> getConnectorJarPaths();
public Optional<String> getSavepointRestorePath();
}Usage Examples:
// Basic environment settings
TestEnvironmentSettings settings = TestEnvironmentSettings.builder()
.setConnectorJarPaths(externalContext.getConnectorJarPaths())
.build();
// Settings with savepoint restoration
TestEnvironmentSettings restartSettings = TestEnvironmentSettings.builder()
.setConnectorJarPaths(externalContext.getConnectorJarPaths())
.setSavepointRestorePath("/path/to/savepoint")
.build();
StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(settings);Configuration for containerized test environments.
/**
* Configuration settings for Flink container environments
*/
public class FlinkContainersSettings {
public static Builder builder();
public static class Builder {
/**
* Set number of TaskManager containers
* @param numTaskManagers Number of TaskManager containers to start
* @return Builder instance
*/
public Builder setNumTaskManagers(int numTaskManagers);
/**
* Set number of slots per TaskManager
* @param numSlotsPerTaskManager Slots per TaskManager container
* @return Builder instance
*/
public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
/**
* Set JobManager memory allocation
* @param jobManagerMemory Memory allocation (e.g., "1g", "512m")
* @return Builder instance
*/
public Builder setJobManagerMemory(String jobManagerMemory);
/**
* Set TaskManager memory allocation
* @param taskManagerMemory Memory allocation (e.g., "1g", "512m")
* @return Builder instance
*/
public Builder setTaskManagerMemory(String taskManagerMemory);
/**
* Build the settings instance
* @return Configured FlinkContainersSettings
*/
public FlinkContainersSettings build();
}
public int getNumTaskManagers();
public int getNumSlotsPerTaskManager();
public String getJobManagerMemory();
public String getTaskManagerMemory();
}
/**
* General TestContainers configuration settings
*/
public class TestcontainersSettings {
public static Builder builder();
public static class Builder {
/**
* Set Docker network for container communication
* @param network TestContainers network instance
* @return Builder instance
*/
public Builder setNetwork(Network network);
/**
* Set log consumers for container output
* @param logConsumers Map of container name to log consumer
* @return Builder instance
*/
public Builder setLogConsumers(Map<String, Consumer<OutputFrame>> logConsumers);
/**
* Build the settings instance
* @return Configured TestcontainersSettings
*/
public TestcontainersSettings build();
}
public Optional<Network> getNetwork();
public Map<String, Consumer<OutputFrame>> getLogConsumers();
}Test environments follow PER-CLASS lifecycle:
@TestEnv
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
// Started once before first test method
// Shared by all test methods in the class
// Torn down after last test method completesMiniCluster Advantages:
Container Advantages:
// Lightweight configuration for fast tests
@TestEnv
MiniClusterTestEnvironment lightEnv = new MiniClusterTestEnvironment(
new MiniClusterConfiguration(
new Configuration(),
1, // single task manager
1, // single slot
ResourceID.generate(),
Time.seconds(30) // short timeout
)
);
// Heavy configuration for complex tests
@TestEnv
FlinkContainerTestEnvironment heavyEnv = new FlinkContainerTestEnvironment(
FlinkContainersSettings.builder()
.setNumTaskManagers(4)
.setNumSlotsPerTaskManager(4)
.setJobManagerMemory("2g")
.setTaskManagerMemory("2g")
.build()
);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-test-utils