CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-tests-2-10

Comprehensive integration test suite for Apache Flink stream processing framework providing test utilities, base classes, and infrastructure for validating fault tolerance, checkpointing, and streaming operations.

Overview
Eval results
Files

execution.mddocs/

Execution Utilities

Core utilities for executing test jobs with proper exception handling and result validation. These utilities handle the complexities of test execution in Flink environments and provide consistent patterns for test success validation.

Core Execution Utilities

TestUtils

Primary utility class for executing streaming jobs with proper exception handling and success validation.

public class TestUtils {
    public static JobExecutionResult tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception;
}

The tryExecute method:

  • Executes the streaming job with the given name
  • Catches ProgramInvocationException and JobExecutionException
  • Searches for nested SuccessException to determine test success
  • Fails the test if no SuccessException is found in the exception chain
  • Returns JobExecutionResult on successful completion or null if terminated by SuccessException

SuccessException

Custom exception used to indicate successful test completion and controlled program termination.

public class SuccessException extends Exception {
    public SuccessException();
}

This exception is typically thrown by:

  • Custom sink functions when expected results are achieved
  • Map/filter functions when target conditions are met
  • Source functions when sufficient data has been processed

Test Execution Patterns

Controlled Termination Pattern

Use SuccessException to terminate streaming jobs when test conditions are met:

public class ValidatingFunction implements MapFunction<Integer, Integer> {
    private int processedCount = 0;
    private final int targetCount;
    
    public ValidatingFunction(int targetCount) {
        this.targetCount = targetCount;
    }
    
    @Override
    public Integer map(Integer value) throws Exception {
        processedCount++;
        
        // Perform validation logic
        if (value < 0) {
            throw new RuntimeException("Invalid negative value");
        }
        
        // Terminate successfully when target reached
        if (processedCount >= targetCount) {
            throw new SuccessException("Processed " + targetCount + " elements successfully");
        }
        
        return value * 2;
    }
}

Result Collection with Success Validation

public class CollectingValidatingSink<T> implements SinkFunction<T> {
    private final List<T> results = new ArrayList<>();
    private final int expectedCount;
    
    public CollectingValidatingSink(int expectedCount) {
        this.expectedCount = expectedCount;
    }
    
    @Override
    public void invoke(T value) throws Exception {
        synchronized (results) {
            results.add(value);
            
            // Validate intermediate results
            validateElement(value);
            
            // Terminate when collection is complete
            if (results.size() >= expectedCount) {
                validateFinalResults();
                throw new SuccessException("Successfully collected " + expectedCount + " elements");
            }
        }
    }
    
    private void validateElement(T value) throws Exception {
        // Element-level validation logic
    }
    
    private void validateFinalResults() throws Exception {
        // Final result validation logic
    }
}

Usage Examples

Basic Streaming Job Execution

@Test
public void testBasicStreamingExecution() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    
    env.fromElements(1, 2, 3, 4, 5)
       .map(x -> x * 2)
       .addSink(new PrintSinkFunction<>());
    
    // Execute with proper exception handling
    JobExecutionResult result = TestUtils.tryExecute(env, "Basic Streaming Test");
    
    assertNotNull("Job should complete successfully", result);
}

Success Exception Pattern

@Test  
public void testSuccessExceptionPattern() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    
    env.fromSequence(1, Long.MAX_VALUE) // Infinite source
       .map(new ValidatingFunction(100)) // Terminates after 100 elements
       .addSink(new DiscardingSink<>());
    
    // Job will terminate via SuccessException
    JobExecutionResult result = TestUtils.tryExecute(env, "Success Exception Test");
    
    // Result will be null when terminated by SuccessException
    assertNull("Job should terminate via SuccessException", result);
}

Comprehensive Result Validation

@Test
public void testComprehensiveValidation() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(2);
    
    List<Integer> inputData = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    
    env.fromCollection(inputData)
       .keyBy(x -> x % 2) // Partition by even/odd
       .map(x -> x * x)   // Square each number
       .addSink(new ValidatingResultSink(inputData.size()));
    
    // Execute with validation
    TestUtils.tryExecute(env, "Comprehensive Validation Test");
}

class ValidatingResultSink implements SinkFunction<Integer> {
    private final Set<Integer> receivedValues = new HashSet<>();
    private final int expectedCount;
    
    public ValidatingResultSink(int expectedCount) {
        this.expectedCount = expectedCount;
    }
    
    @Override
    public void invoke(Integer value) throws Exception {
        synchronized (receivedValues) {
            // Validate value is a perfect square
            int sqrt = (int) Math.sqrt(value);
            if (sqrt * sqrt != value) {
                throw new RuntimeException("Value " + value + " is not a perfect square");
            }
            
            receivedValues.add(value);
            
            if (receivedValues.size() >= expectedCount) {
                // Validate we received all expected squares
                Set<Integer> expectedSquares = Set.of(1, 4, 9, 16, 25, 36, 49, 64, 81, 100);
                if (!receivedValues.equals(expectedSquares)) {
                    throw new RuntimeException("Received values don't match expected squares");
                }
                
                throw new SuccessException("All " + expectedCount + " squares validated successfully");
            }
        }
    }
}

Timeout and Error Handling

@Test
public void testExecutionWithTimeout() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    
    // Job that should complete within reasonable time
    env.fromSequence(1, 1000)
       .map(new SlowProcessingFunction(10)) // 10ms per element
       .addSink(new TimeoutValidatingSink(5000)); // 5 second timeout
    
    long startTime = System.currentTimeMillis();
    
    try {
        TestUtils.tryExecute(env, "Timeout Test");
    } catch (Exception e) {
        long duration = System.currentTimeMillis() - startTime;
        assertTrue("Test should complete within timeout", duration < 10000);
        throw e;
    }
}

class TimeoutValidatingSink implements SinkFunction<Integer> {
    private final long timeoutMs;
    private final long startTime;
    private int elementCount;
    
    public TimeoutValidatingSink(long timeoutMs) {
        this.timeoutMs = timeoutMs;
        this.startTime = System.currentTimeMillis();
    }
    
    @Override
    public void invoke(Integer value) throws Exception {
        elementCount++;
        
        long elapsed = System.currentTimeMillis() - startTime;
        if (elapsed > timeoutMs) {
            throw new RuntimeException("Test exceeded timeout of " + timeoutMs + "ms");
        }
        
        // Terminate after processing reasonable amount
        if (elementCount >= 100) {
            throw new SuccessException("Processed " + elementCount + " elements within timeout");
        }
    }
}

Parallel Execution Validation

@Test
public void testParallelExecutionValidation() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(4);
    
    env.fromSequence(1, 1000)
       .map(new ParallelValidatingFunction())
       .keyBy(x -> x % 4) // Distribute across 4 partitions
       .addSink(new ParallelResultSink());
    
    TestUtils.tryExecute(env, "Parallel Execution Test");
}

class ParallelValidatingFunction implements MapFunction<Long, String> {
    @Override
    public String map(Long value) throws Exception {
        // Include subtask information for validation
        int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
        return subtaskIndex + ":" + value;
    }
}

class ParallelResultSink implements SinkFunction<String> {
    private static final AtomicInteger totalReceived = new AtomicInteger(0);
    private static final Set<Integer> activeSubtasks = ConcurrentHashMap.newKeySet();
    
    @Override
    public void invoke(String value) throws Exception {
        // Parse subtask index
        int subtaskIndex = Integer.parseInt(value.split(":")[0]);
        activeSubtasks.add(subtaskIndex);
        
        int received = totalReceived.incrementAndGet();
        
        // Validate parallel execution
        if (received >= 1000) {
            if (activeSubtasks.size() < 4) {
                throw new RuntimeException("Not all subtasks were active: " + activeSubtasks);
            }
            
            throw new SuccessException("Successfully validated parallel execution across " + 
                activeSubtasks.size() + " subtasks");
        }
    }
}

Error Recovery Testing

@Test
public void testErrorRecovery() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(1000);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 500));
    
    env.fromSequence(1, 100)
       .map(new RecoveringFunction())
       .addSink(new RecoveryValidatingSink());
    
    // Should succeed after recovery attempts
    TestUtils.tryExecute(env, "Error Recovery Test");
}

class RecoveringFunction implements MapFunction<Long, Long> {
    private static int attemptCount = 0;
    
    @Override
    public Long map(Long value) throws Exception {
        // Fail on first attempt for certain values
        if (attemptCount == 0 && value == 50) {
            attemptCount++;
            throw new RuntimeException("Simulated failure at value " + value);
        }
        
        return value;
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-tests-2-10

docs

api-completeness.md

checkpointing.md

data-generation.md

execution.md

fault-tolerance.md

index.md

streaming.md

tile.json