Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.
—
Comprehensive collection of runtime utilities for test execution, process management, and common testing operations. These utilities provide essential infrastructure for running tests in controlled environments and managing Flink job execution.
Utilities for executing JobGraphs on MiniCluster instances with comprehensive control and monitoring capabilities.
/**
* Utility for running JobGraphs on MiniCluster for testing
*/
public class JobGraphRunningUtil {
/**
* Execute JobGraph on MiniCluster and wait for completion
* @param jobGraph JobGraph to execute
* @param miniCluster MiniCluster instance for execution
* @throws Exception if job execution fails
*/
public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;
/**
* Execute JobGraph with timeout
* @param jobGraph JobGraph to execute
* @param miniCluster MiniCluster instance
* @param timeoutMs timeout in milliseconds
* @return JobExecutionResult containing execution results
* @throws Exception if execution fails or times out
*/
public static JobExecutionResult executeWithTimeout(
JobGraph jobGraph,
MiniCluster miniCluster,
long timeoutMs) throws Exception;
/**
* Execute JobGraph and return execution result
* @param jobGraph JobGraph to execute
* @param miniCluster MiniCluster instance
* @return JobExecutionResult with job execution details
* @throws Exception if execution fails
*/
public static JobExecutionResult executeAndGetResult(
JobGraph jobGraph,
MiniCluster miniCluster) throws Exception;
/**
* Execute multiple JobGraphs sequentially
* @param jobGraphs list of JobGraphs to execute
* @param miniCluster MiniCluster instance
* @return List of JobExecutionResults
* @throws Exception if any job execution fails
*/
public static List<JobExecutionResult> executeSequentially(
List<JobGraph> jobGraphs,
MiniCluster miniCluster) throws Exception;
/**
* Submit JobGraph asynchronously and return CompletableFuture
* @param jobGraph JobGraph to submit
* @param miniCluster MiniCluster instance
* @return CompletableFuture containing JobExecutionResult
*/
public static CompletableFuture<JobExecutionResult> submitAsync(
JobGraph jobGraph,
MiniCluster miniCluster);
}Entry points and utilities for managing external processes during testing scenarios.
/**
* Entry point for task executor process testing
*/
public class TaskExecutorProcessEntryPoint {
/**
* Main entry point for standalone task executor process
* @param args command line arguments for task executor configuration
*/
public static void main(String[] args);
/**
* Start task executor with specific configuration
* @param config Configuration for task executor setup
* @throws Exception if task executor startup fails
*/
public static void startTaskExecutor(Configuration config) throws Exception;
/**
* Create default configuration for task executor testing
* @return Configuration with testing defaults
*/
public static Configuration createDefaultTestConfiguration();
}Common test functions and utilities for data processing and validation scenarios.
/**
* Tokenizer function for string processing tests
*/
public class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
/**
* Constructor for tokenizer with default configuration
*/
public Tokenizer();
/**
* Constructor for tokenizer with custom delimiter
* @param delimiter delimiter pattern for tokenization
*/
public Tokenizer(String delimiter);
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception;
}
/**
* Identity mapper for testing data flow without transformation
*/
public class NoOpIntMap implements MapFunction<Integer, Integer> {
/**
* Constructor for no-operation integer mapper
*/
public NoOpIntMap();
@Override
public Integer map(Integer value) throws Exception;
}
/**
* No-operation sink for testing data flow completion
*/
public class ReceiveCheckNoOpSink<T> implements SinkFunction<T> {
/**
* Constructor for no-op sink with receive tracking
* @param expectedCount expected number of elements to receive
*/
public ReceiveCheckNoOpSink(int expectedCount);
@Override
public void invoke(T value, Context context) throws Exception;
/**
* Check if expected number of elements were received
* @return boolean indicating if expected count was reached
*/
public boolean receivedExpectedCount();
/**
* Get actual count of received elements
* @return int representing actual received count
*/
public int getReceivedCount();
/**
* Reset the counter for reuse in multiple tests
*/
public void reset();
}Utilities specifically designed for testing recovery scenarios and restart strategies.
/**
* Utility class for recovery testing operations
*/
public class RecoveryTestUtils {
/**
* Validate recovery behavior from job execution result
* @param result JobExecutionResult to analyze
* @param expectedRestartCount expected number of restarts
* @return boolean indicating if recovery behavior is valid
*/
public static boolean validateRecoveryBehavior(
JobExecutionResult result,
int expectedRestartCount);
/**
* Create configuration for failure injection testing
* @param restartStrategy restart strategy to use
* @param maxFailures maximum number of failures to inject
* @return Configuration with failure injection settings
*/
public static Configuration createFailureInjectionConfig(
String restartStrategy,
int maxFailures);
/**
* Wait for job to reach specific state with timeout
* @param jobId JobID to monitor
* @param targetState target JobStatus to wait for
* @param timeoutMs timeout in milliseconds
* @param restGateway RestClient for job monitoring
* @return boolean indicating if state was reached
* @throws Exception if monitoring fails
*/
public static boolean waitForJobState(
JobID jobId,
JobStatus targetState,
long timeoutMs,
RestClusterClient<?> restGateway) throws Exception;
}Utilities for setting up and managing test environments and configurations.
/**
* Utility for creating and managing test environments
*/
public class TestEnvironmentUtil {
/**
* Create MiniCluster configuration for testing
* @param parallelism desired parallelism
* @param numTaskManagers number of task managers
* @return Configuration for MiniCluster setup
*/
public static Configuration createTestClusterConfig(
int parallelism,
int numTaskManagers);
/**
* Create streaming environment for testing
* @param parallelism parallelism for the environment
* @param checkpointingEnabled whether to enable checkpointing
* @return StreamExecutionEnvironment configured for testing
*/
public static StreamExecutionEnvironment createTestStreamEnv(
int parallelism,
boolean checkpointingEnabled);
/**
* Create batch environment for testing
* @param parallelism parallelism for the environment
* @return ExecutionEnvironment configured for testing
*/
public static ExecutionEnvironment createTestBatchEnv(int parallelism);
/**
* Set up test-specific logging configuration
* @param logLevel logging level for tests
* @param logToConsole whether to log to console
*/
public static void setupTestLogging(Level logLevel, boolean logToConsole);
/**
* Clean up test environment resources
* @param environment execution environment to clean up
* @param miniCluster mini cluster to shut down
* @throws Exception if cleanup fails
*/
public static void cleanupTestEnvironment(
StreamExecutionEnvironment environment,
MiniCluster miniCluster) throws Exception;
}Utilities for collecting and validating metrics during test execution.
/**
* Utility for metrics collection and validation in tests
*/
public class TestMetricsUtil {
/**
* Collect all metrics from MiniCluster
* @param miniCluster cluster to collect metrics from
* @return Map of metric names to values
* @throws Exception if metrics collection fails
*/
public static Map<String, Object> collectAllMetrics(MiniCluster miniCluster) throws Exception;
/**
* Wait for specific metric to reach expected value
* @param miniCluster cluster to monitor
* @param metricName name of metric to monitor
* @param expectedValue expected metric value
* @param timeoutMs timeout in milliseconds
* @return boolean indicating if metric reached expected value
* @throws Exception if monitoring fails
*/
public static boolean waitForMetricValue(
MiniCluster miniCluster,
String metricName,
Object expectedValue,
long timeoutMs) throws Exception;
/**
* Validate job metrics against expected values
* @param jobId JobID to validate metrics for
* @param expectedMetrics map of expected metric values
* @param miniCluster cluster to collect metrics from
* @return boolean indicating if all metrics match expectations
* @throws Exception if validation fails
*/
public static boolean validateJobMetrics(
JobID jobId,
Map<String, Object> expectedMetrics,
MiniCluster miniCluster) throws Exception;
}Usage Examples:
import org.apache.flink.test.runtime.*;
import org.apache.flink.test.util.*;
// Example: Executing jobs with runtime utilities
public class JobExecutionTest {
@Test
public void testJobExecution() throws Exception {
// Create test job graph
JobGraph jobGraph = createTestJobGraph();
// Set up mini cluster for testing
Configuration config = TestEnvironmentUtil.createTestClusterConfig(4, 2);
MiniCluster miniCluster = new MiniCluster(config);
miniCluster.start();
try {
// Execute job with timeout
JobExecutionResult result = JobGraphRunningUtil.executeWithTimeout(
jobGraph, miniCluster, 30000L);
// Validate execution results
assertTrue(result.isSuccess());
assertFalse(result.getAllAccumulatorResults().isEmpty());
} finally {
miniCluster.close();
}
}
@Test
public void testSequentialJobExecution() throws Exception {
List<JobGraph> jobs = Arrays.asList(
createTestJobGraph("job1"),
createTestJobGraph("job2"),
createTestJobGraph("job3")
);
Configuration config = TestEnvironmentUtil.createTestClusterConfig(2, 1);
MiniCluster miniCluster = new MiniCluster(config);
miniCluster.start();
try {
// Execute jobs sequentially
List<JobExecutionResult> results = JobGraphRunningUtil.executeSequentially(
jobs, miniCluster);
// Validate all jobs completed successfully
assertEquals(3, results.size());
results.forEach(result -> assertTrue(result.isSuccess()));
} finally {
miniCluster.close();
}
}
}
// Example: Recovery testing with utilities
public class RecoveryUtilityTest {
@Test
public void testRecoveryBehavior() throws Exception {
// Create job with failure injection
JobGraph faultTolerantJob = createJobWithFailures();
// Configure recovery settings
Configuration config = RecoveryTestUtils.createFailureInjectionConfig(
"fixed-delay", 3);
config.setString("restart-strategy.fixed-delay.delay", "1s");
MiniCluster miniCluster = new MiniCluster(config);
miniCluster.start();
try {
// Execute job and wait for completion
JobExecutionResult result = JobGraphRunningUtil.executeAndGetResult(
faultTolerantJob, miniCluster);
// Validate recovery behavior
boolean recoveryValid = RecoveryTestUtils.validateRecoveryBehavior(
result, 2);
assertTrue(recoveryValid);
} finally {
miniCluster.close();
}
}
}
// Example: Using test functions
public class TestFunctionUsage {
@Test
public void testTokenizerFunction() throws Exception {
StreamExecutionEnvironment env = TestEnvironmentUtil.createTestStreamEnv(1, false);
// Create test data
DataStreamSource<String> textStream = env.fromElements(
"hello world", "test data", "apache flink"
);
// Apply tokenizer
DataStream<Tuple2<String, Integer>> tokens = textStream
.flatMap(new Tokenizer())
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
// Use no-op sink to count results
ReceiveCheckNoOpSink<Tuple2<String, Integer>> countingSink =
new ReceiveCheckNoOpSink<>(6); // Expecting 6 tokens
tokens.addSink(countingSink);
// Execute and validate
env.execute("Tokenizer Test");
assertTrue(countingSink.receivedExpectedCount());
}
}
// Example: Environment setup and cleanup
public class EnvironmentManagementTest {
private StreamExecutionEnvironment env;
private MiniCluster miniCluster;
@Before
public void setup() throws Exception {
// Set up test logging
TestEnvironmentUtil.setupTestLogging(Level.INFO, true);
// Create test environment
env = TestEnvironmentUtil.createTestStreamEnv(2, true);
// Start mini cluster
Configuration config = TestEnvironmentUtil.createTestClusterConfig(2, 1);
miniCluster = new MiniCluster(config);
miniCluster.start();
}
@After
public void cleanup() throws Exception {
// Clean up test resources
TestEnvironmentUtil.cleanupTestEnvironment(env, miniCluster);
}
@Test
public void testWithManagedEnvironment() throws Exception {
// Test implementation using managed environment
DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5);
DataStream<Integer> processed = source
.map(new NoOpIntMap())
.filter(x -> x > 2);
ReceiveCheckNoOpSink<Integer> sink = new ReceiveCheckNoOpSink<>(3);
processed.addSink(sink);
env.execute("Managed Environment Test");
assertTrue(sink.receivedExpectedCount());
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tests