Thread-safe utilities for collecting and validating results in streaming Flink applications. These utilities provide mechanisms for result collection, test coordination, fault injection, validation, and streaming-specific helper functions that enable comprehensive testing of streaming topologies including failure scenarios.
Thread-safe sink for collecting streaming results into lists for verification.
/**
* Thread-safe sink that collects streaming results into a list for test verification
*/
public class TestListResultSink<T> extends RichSinkFunction<T> {
/**
* Default constructor creating empty result sink
*/
public TestListResultSink();
/**
* Get the collected results as an unordered list
* @return List containing all collected elements
*/
public List<T> getResult();
/**
* Get the collected results sorted using natural ordering
* @return Sorted list containing all collected elements
*/
public List<T> getSortedResult();
/**
* Sink function implementation - adds element to internal collection
* @param value Element to collect
* @throws Exception if collection fails
*/
public void invoke(T value) throws Exception;
}Usage Example:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
// Setup streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // Use parallelism 1 for deterministic results
// Create result collector
TestListResultSink<String> resultSink = new TestListResultSink<>();
// Build streaming topology
env.fromElements("hello", "world", "flink", "streaming")
.map(String::toUpperCase)
.addSink(resultSink);
// Execute and collect results
env.execute("Test Job");
// Verify results
List<String> results = resultSink.getResult();
assertEquals(4, results.size());
assertTrue(results.contains("HELLO"));
assertTrue(results.contains("WORLD"));
// Get sorted results for ordered verification
List<String> sortedResults = resultSink.getSortedResult();
assertEquals("FLINK", sortedResults.get(0));
assertEquals("HELLO", sortedResults.get(1));Singleton catalog for managing multiple test result lists across different test scenarios.
/**
* Singleton catalog for managing multiple test result lists
*/
public class TestListWrapper {
/**
* Get the singleton instance of TestListWrapper
* @return The singleton TestListWrapper instance
*/
public static TestListWrapper getInstance();
/**
* Create a new result list and return its ID
* @return Integer ID for the newly created list
*/
public int createList();
/**
* Retrieve a result list by its ID
* @param listId ID of the list to retrieve
* @return List of collected objects, or null if ID doesn't exist
*/
public List<Object> getList(int listId);
}Usage Example:
import org.apache.flink.test.streaming.runtime.util.TestListWrapper;
// Get wrapper instance
TestListWrapper wrapper = TestListWrapper.getInstance();
// Create lists for different test scenarios
int sourceResultsId = wrapper.createList();
int processedResultsId = wrapper.createList();
// Use in streaming topology (pseudocode - would need custom sink implementation)
dataStream1.addSink(new ListCollectorSink(sourceResultsId));
dataStream2.addSink(new ListCollectorSink(processedResultsId));
// After execution, retrieve results
List<Object> sourceResults = wrapper.getList(sourceResultsId);
List<Object> processedResults = wrapper.getList(processedResultsId);
// Verify both result sets
assertEquals(expectedSourceCount, sourceResults.size());
assertEquals(expectedProcessedCount, processedResults.size());Utility functions and selectors for streaming data routing and transformation.
/**
* Output selector that routes integers based on even/odd criteria
*/
public class EvenOddOutputSelector implements OutputSelector<Integer> {
/**
* Select output names based on whether the integer is even or odd
* @param value Integer value to evaluate
* @return Iterable of output names ("even" or "odd")
*/
public Iterable<String> select(Integer value);
}
/**
* No-operation mapper that returns input unchanged
*/
public class NoOpIntMap implements MapFunction<Integer, Integer> {
/**
* Identity mapping function for integers
* @param value Input integer
* @return Same integer unchanged
* @throws Exception if mapping fails
*/
public Integer map(Integer value) throws Exception;
}
/**
* No-operation sink that receives elements but does nothing with them
*/
public class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
/**
* Constructor with element count tracking
* @param expectedElementCount Expected number of elements to receive
*/
public ReceiveCheckNoOpSink(int expectedElementCount);
/**
* Receive element but perform no operation
* @param value Element to receive
* @param context Sink context
* @throws Exception if receive fails
*/
public void invoke(T value, Context context) throws Exception;
/**
* Check if expected number of elements were received
* @return true if expected count was reached
*/
public boolean isExpectedCountReached();
}Sources and sinks for testing failure scenarios and fault tolerance in streaming applications.
/**
* Source that introduces artificial failures for testing fault tolerance
*/
public class FailingSource<T> extends RichSourceFunction<T> {
/**
* Constructor for failing source with custom event generator
* @param generator Custom generator for emitting events before failure
* @param failAfterElements Number of elements to emit before inducing failure
*/
public FailingSource(EventEmittingGenerator<T> generator, int failAfterElements);
/**
* Interface for custom event generation in failing source
*/
public static interface EventEmittingGenerator<T> extends Serializable {
/**
* Emit a single event to the source context
* @param ctx Source context for emitting events
* @param eventSequenceNo Sequence number of current event
*/
public void emitEvent(SourceContext<T> ctx, int eventSequenceNo);
}
}
/**
* Sink for validating streaming results with custom validation logic
*/
public class ValidatingSink<T> extends RichSinkFunction<T> {
/**
* Constructor with result checker and count updater
* @param resultChecker Custom checker for validating individual results
* @param countUpdater Updater for tracking element counts
*/
public ValidatingSink(ResultChecker<T> resultChecker, CountUpdater countUpdater);
/**
* Interface for custom result validation logic
*/
public static interface ResultChecker<T> extends Serializable {
/**
* Check if a result meets validation criteria
* @param result Result element to validate
* @return true if result passes validation
*/
public boolean checkResult(T result);
}
/**
* Interface for updating element counts during validation
*/
public static interface CountUpdater extends Serializable {
/**
* Update the element count
* @param count Current element count
*/
public void updateCount(long count);
}
}Fault Injection Usage Examples:
import org.apache.flink.test.checkpointing.utils.FailingSource;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
// Create a failing source that emits integers and fails after 1000 elements
FailingSource<Integer> failingSource = new FailingSource<>(
new FailingSource.EventEmittingGenerator<Integer>() {
@Override
public void emitEvent(SourceContext<Integer> ctx, int eventSequenceNo) {
ctx.collect(eventSequenceNo);
}
},
1000 // Fail after 1000 elements
);
// Create a validating sink that checks for positive integers
ValidatingSink<Integer> validatingSink = new ValidatingSink<>(
new ValidatingSink.ResultChecker<Integer>() {
@Override
public boolean checkResult(Integer result) {
return result > 0; // Only accept positive integers
}
},
new ValidatingSink.CountUpdater() {
@Override
public void updateCount(long count) {
// Track count of validated elements
System.out.println("Validated elements: " + count);
}
}
);
// Build fault-tolerant streaming topology
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000); // Enable checkpointing for fault tolerance
env.addSource(failingSource)
.map(x -> Math.abs(x)) // Ensure positive values
.addSink(validatingSink);
// Job will fail after 1000 elements, then restart and validate recovery
env.execute("Fault Tolerance Test");Utilities for executing streaming tests with specialized exception handling.
/**
* Utility class for streaming test execution
*/
public class TestUtils {
/**
* Execute streaming job with SuccessException handling
* @param see StreamExecutionEnvironment configured for execution
* @param name Job name for identification
* @throws Exception if execution fails for reasons other than SuccessException
*/
public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception;
}
/**
* Exception thrown to indicate successful test completion
*/
public class SuccessException extends RuntimeException {
/**
* Default constructor for success indication
*/
public SuccessException();
/**
* Constructor with success message
* @param message Success message
*/
public SuccessException(String message);
}Common patterns for implementing streaming tests:
Basic Result Collection Pattern:
@Test
public void testStreamingTransformation() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Create result collector
TestListResultSink<Integer> resultSink = new TestListResultSink<>();
// Build topology
env.fromElements(1, 2, 3, 4, 5)
.map(x -> x * 2)
.filter(x -> x > 5)
.addSink(resultSink);
// Execute and verify
env.execute("Transformation Test");
List<Integer> results = resultSink.getSortedResult();
assertEquals(Arrays.asList(6, 8, 10), results);
}Multi-Stream Result Collection:
@Test
public void testMultiStreamProcessing() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TestListWrapper wrapper = TestListWrapper.getInstance();
int evenResultsId = wrapper.createList();
int oddResultsId = wrapper.createList();
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6);
// Split stream using output selector
SplitStream<Integer> splitStream = numbers.split(new EvenOddOutputSelector());
splitStream.select("even").addSink(new CustomListSink(evenResultsId));
splitStream.select("odd").addSink(new CustomListSink(oddResultsId));
env.execute("Multi-Stream Test");
// Verify results
List<Object> evenResults = wrapper.getList(evenResultsId);
List<Object> oddResults = wrapper.getList(oddResultsId);
assertEquals(3, evenResults.size()); // 2, 4, 6
assertEquals(3, oddResults.size()); // 1, 3, 5
}Success Exception Pattern:
@Test
public void testSuccessfulCompletion() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements("test")
.map(value -> {
// Simulate successful completion
throw new SuccessException("Test completed successfully");
})
.addSink(new DiscardingSink<>());
// Execute with success exception handling
TestUtils.tryExecute(env, "Success Test");
// Test passes if SuccessException was caught and handled
}Parallel Result Collection:
@Test
public void testParallelResultCollection() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
TestListResultSink<String> resultSink = new TestListResultSink<>();
env.fromCollection(generateLargeDataset())
.map(data -> processData(data))
.addSink(resultSink);
env.execute("Parallel Test");
// Results from parallel execution - order not guaranteed
List<String> results = resultSink.getResult();
assertEquals(expectedTotalCount, results.size());
// Use sorted results for deterministic verification
List<String> sortedResults = resultSink.getSortedResult();
assertEquals(expectedFirstElement, sortedResults.get(0));
assertEquals(expectedLastElement, sortedResults.get(sortedResults.size() - 1));
}Custom Sink with Count Verification:
@Test
public void testElementCountVerification() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
int expectedCount = 1000;
ReceiveCheckNoOpSink<Integer> countingSink = new ReceiveCheckNoOpSink<>(expectedCount);
env.fromCollection(generateIntegerSequence(expectedCount))
.addSink(countingSink);
env.execute("Count Verification Test");
assertTrue("Expected count not reached", countingSink.isExpectedCountReached());
}These streaming utilities provide the foundation for reliable, deterministic testing of streaming Flink applications, enabling comprehensive verification of streaming transformations, windowing operations, and stateful processing.