Comprehensive integration test suite for Apache Flink stream processing framework providing test utilities, base classes, and infrastructure for validating fault tolerance, checkpointing, and streaming operations.
Core utilities for executing test jobs with proper exception handling and result validation. These utilities handle the complexities of test execution in Flink environments and provide consistent patterns for test success validation.
Primary utility class for executing streaming jobs with proper exception handling and success validation.
public class TestUtils {
public static JobExecutionResult tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception;
}The tryExecute method:
ProgramInvocationException and JobExecutionExceptionSuccessException to determine test successSuccessException is found in the exception chainJobExecutionResult on successful completion or null if terminated by SuccessExceptionCustom exception used to indicate successful test completion and controlled program termination.
public class SuccessException extends Exception {
public SuccessException();
}This exception is typically thrown by:
Use SuccessException to terminate streaming jobs when test conditions are met:
public class ValidatingFunction implements MapFunction<Integer, Integer> {
private int processedCount = 0;
private final int targetCount;
public ValidatingFunction(int targetCount) {
this.targetCount = targetCount;
}
@Override
public Integer map(Integer value) throws Exception {
processedCount++;
// Perform validation logic
if (value < 0) {
throw new RuntimeException("Invalid negative value");
}
// Terminate successfully when target reached
if (processedCount >= targetCount) {
throw new SuccessException("Processed " + targetCount + " elements successfully");
}
return value * 2;
}
}public class CollectingValidatingSink<T> implements SinkFunction<T> {
private final List<T> results = new ArrayList<>();
private final int expectedCount;
public CollectingValidatingSink(int expectedCount) {
this.expectedCount = expectedCount;
}
@Override
public void invoke(T value) throws Exception {
synchronized (results) {
results.add(value);
// Validate intermediate results
validateElement(value);
// Terminate when collection is complete
if (results.size() >= expectedCount) {
validateFinalResults();
throw new SuccessException("Successfully collected " + expectedCount + " elements");
}
}
}
private void validateElement(T value) throws Exception {
// Element-level validation logic
}
private void validateFinalResults() throws Exception {
// Final result validation logic
}
}@Test
public void testBasicStreamingExecution() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromElements(1, 2, 3, 4, 5)
.map(x -> x * 2)
.addSink(new PrintSinkFunction<>());
// Execute with proper exception handling
JobExecutionResult result = TestUtils.tryExecute(env, "Basic Streaming Test");
assertNotNull("Job should complete successfully", result);
}@Test
public void testSuccessExceptionPattern() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromSequence(1, Long.MAX_VALUE) // Infinite source
.map(new ValidatingFunction(100)) // Terminates after 100 elements
.addSink(new DiscardingSink<>());
// Job will terminate via SuccessException
JobExecutionResult result = TestUtils.tryExecute(env, "Success Exception Test");
// Result will be null when terminated by SuccessException
assertNull("Job should terminate via SuccessException", result);
}@Test
public void testComprehensiveValidation() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
List<Integer> inputData = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
env.fromCollection(inputData)
.keyBy(x -> x % 2) // Partition by even/odd
.map(x -> x * x) // Square each number
.addSink(new ValidatingResultSink(inputData.size()));
// Execute with validation
TestUtils.tryExecute(env, "Comprehensive Validation Test");
}
class ValidatingResultSink implements SinkFunction<Integer> {
private final Set<Integer> receivedValues = new HashSet<>();
private final int expectedCount;
public ValidatingResultSink(int expectedCount) {
this.expectedCount = expectedCount;
}
@Override
public void invoke(Integer value) throws Exception {
synchronized (receivedValues) {
// Validate value is a perfect square
int sqrt = (int) Math.sqrt(value);
if (sqrt * sqrt != value) {
throw new RuntimeException("Value " + value + " is not a perfect square");
}
receivedValues.add(value);
if (receivedValues.size() >= expectedCount) {
// Validate we received all expected squares
Set<Integer> expectedSquares = Set.of(1, 4, 9, 16, 25, 36, 49, 64, 81, 100);
if (!receivedValues.equals(expectedSquares)) {
throw new RuntimeException("Received values don't match expected squares");
}
throw new SuccessException("All " + expectedCount + " squares validated successfully");
}
}
}
}@Test
public void testExecutionWithTimeout() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Job that should complete within reasonable time
env.fromSequence(1, 1000)
.map(new SlowProcessingFunction(10)) // 10ms per element
.addSink(new TimeoutValidatingSink(5000)); // 5 second timeout
long startTime = System.currentTimeMillis();
try {
TestUtils.tryExecute(env, "Timeout Test");
} catch (Exception e) {
long duration = System.currentTimeMillis() - startTime;
assertTrue("Test should complete within timeout", duration < 10000);
throw e;
}
}
class TimeoutValidatingSink implements SinkFunction<Integer> {
private final long timeoutMs;
private final long startTime;
private int elementCount;
public TimeoutValidatingSink(long timeoutMs) {
this.timeoutMs = timeoutMs;
this.startTime = System.currentTimeMillis();
}
@Override
public void invoke(Integer value) throws Exception {
elementCount++;
long elapsed = System.currentTimeMillis() - startTime;
if (elapsed > timeoutMs) {
throw new RuntimeException("Test exceeded timeout of " + timeoutMs + "ms");
}
// Terminate after processing reasonable amount
if (elementCount >= 100) {
throw new SuccessException("Processed " + elementCount + " elements within timeout");
}
}
}@Test
public void testParallelExecutionValidation() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.fromSequence(1, 1000)
.map(new ParallelValidatingFunction())
.keyBy(x -> x % 4) // Distribute across 4 partitions
.addSink(new ParallelResultSink());
TestUtils.tryExecute(env, "Parallel Execution Test");
}
class ParallelValidatingFunction implements MapFunction<Long, String> {
@Override
public String map(Long value) throws Exception {
// Include subtask information for validation
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
return subtaskIndex + ":" + value;
}
}
class ParallelResultSink implements SinkFunction<String> {
private static final AtomicInteger totalReceived = new AtomicInteger(0);
private static final Set<Integer> activeSubtasks = ConcurrentHashMap.newKeySet();
@Override
public void invoke(String value) throws Exception {
// Parse subtask index
int subtaskIndex = Integer.parseInt(value.split(":")[0]);
activeSubtasks.add(subtaskIndex);
int received = totalReceived.incrementAndGet();
// Validate parallel execution
if (received >= 1000) {
if (activeSubtasks.size() < 4) {
throw new RuntimeException("Not all subtasks were active: " + activeSubtasks);
}
throw new SuccessException("Successfully validated parallel execution across " +
activeSubtasks.size() + " subtasks");
}
}
}@Test
public void testErrorRecovery() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 500));
env.fromSequence(1, 100)
.map(new RecoveringFunction())
.addSink(new RecoveryValidatingSink());
// Should succeed after recovery attempts
TestUtils.tryExecute(env, "Error Recovery Test");
}
class RecoveringFunction implements MapFunction<Long, Long> {
private static int attemptCount = 0;
@Override
public Long map(Long value) throws Exception {
// Fail on first attempt for certain values
if (attemptCount == 0 && value == 50) {
attemptCount++;
throw new RuntimeException("Simulated failure at value " + value);
}
return value;
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tests-2-10