Comprehensive integration test suite for Apache Flink stream processing framework providing test utilities, base classes, and infrastructure for validating fault tolerance, checkpointing, and streaming operations.
Specialized components for testing streaming operations including output selectors, result collection, stream partitioning utilities, and no-op functions for performance testing.
Output selector for splitting integer streams based on even/odd values, commonly used for stream partitioning tests.
public class EvenOddOutputSelector implements OutputSelector<Integer> {
@Override
public Iterable<String> select(Integer value);
}The selector returns:
"even" for even numbers (value % 2 == 0)"odd" for odd numbers (value % 2 != 0)Thread-safe sink function that collects streaming results into a list for test verification.
public class TestListResultSink<T> extends RichSinkFunction<T> {
private int resultListId;
public TestListResultSink();
@Override
public void open(Configuration parameters) throws Exception;
@Override
public void invoke(T value) throws Exception;
@Override
public void close() throws Exception;
public List<T> getResult();
public List<T> getSortedResult();
}Singleton utility providing thread-safe access to multiple test result collections. Used internally by TestListResultSink for managing concurrent result collection.
public class TestListWrapper {
private List<List<? extends Comparable>> lists;
private TestListWrapper();
public static TestListWrapper getInstance();
public int createList();
public List<?> getList(int listId);
}Sink that validates element reception and asserts at least one element was received during close(), useful for ensuring data flow.
public final class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
private List<T> received;
public ReceiveCheckNoOpSink();
@Override
public void open(Configuration conf);
@Override
public void invoke(T tuple);
@Override
public void close();
}Pass-through map function for integers, used for pipeline testing without transformation overhead.
public class NoOpIntMap implements MapFunction<Integer, Integer> {
@Override
public Integer map(Integer value) throws Exception;
}@Test
public void testStreamPartitioning() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Create source data
DataStream<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Split stream using EvenOddOutputSelector
SplitStream<Integer> splitStream = source.split(new EvenOddOutputSelector());
// Process even and odd streams separately
DataStream<Integer> evenStream = splitStream.select("even");
DataStream<Integer> oddStream = splitStream.select("odd");
// Collect results for verification
TestListResultSink<Integer> evenSink = new TestListResultSink<>();
TestListResultSink<Integer> oddSink = new TestListResultSink<>();
evenStream.addSink(evenSink);
oddStream.addSink(oddSink);
// Execute job
TestUtils.tryExecute(env, "Stream Partitioning Test");
// Verify results
List<Integer> evenResults = evenSink.getResult();
List<Integer> oddResults = oddSink.getResult();
assertEquals("Should have 5 even numbers", 5, evenResults.size());
assertEquals("Should have 5 odd numbers", 5, oddResults.size());
// Verify all even numbers are actually even
assertTrue("All results should be even",
evenResults.stream().allMatch(x -> x % 2 == 0));
assertTrue("All results should be odd",
oddResults.stream().allMatch(x -> x % 2 == 1));
}@Test
public void testResultCollection() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Create test data
List<String> testData = Arrays.asList("hello", "world", "flink", "streaming");
DataStream<String> source = env.fromCollection(testData);
// Transform data
DataStream<String> transformed = source
.map(String::toUpperCase)
.filter(s -> s.length() > 4);
// Collect results
TestListResultSink<String> resultSink = new TestListResultSink<>();
transformed.addSink(resultSink);
// Execute
TestUtils.tryExecute(env, "Result Collection Test");
// Validate results
List<String> results = resultSink.getResult();
assertEquals("Should have filtered results", 2, results.size());
assertTrue("Should contain HELLO", results.contains("HELLO"));
assertTrue("Should contain WORLD", results.contains("WORLD"));
assertTrue("Should contain FLINK", results.contains("FLINK"));
assertTrue("Should contain STREAMING", results.contains("STREAMING"));
}@Test
public void testStreamingPerformance() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// Generate large dataset
DataStream<Integer> source = env.fromSequence(1, 1000000);
// Apply no-op transformations to test overhead
DataStream<Integer> processed = source
.map(new NoOpIntMap()) // No-op transformation
.keyBy(x -> x % 100) // Partition by key
.map(new NoOpIntMap()) // Another no-op
.filter(x -> true); // Pass-through filter
// Use no-op sink to measure throughput
ReceiveCheckNoOpSink<Integer> sink = new ReceiveCheckNoOpSink<>();
processed.addSink(sink);
// Measure execution time
long startTime = System.currentTimeMillis();
TestUtils.tryExecute(env, "Performance Test");
long duration = System.currentTimeMillis() - startTime;
// Verify throughput
long processedCount = sink.getReceivedCount();
assertEquals("Should process all elements", 1000000, processedCount);
double throughput = processedCount / (duration / 1000.0);
System.out.println("Throughput: " + throughput + " elements/second");
// Assert minimum throughput requirement
assertTrue("Throughput should be reasonable", throughput > 10000);
}@Test
public void testMultiSinkValidation() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Split into multiple streams
SplitStream<Integer> split = source.split(new EvenOddOutputSelector());
// Create multiple sinks
TestListResultSink<Integer> allSink = new TestListResultSink<>();
TestListResultSink<Integer> evenSink = new TestListResultSink<>();
TestListResultSink<Integer> oddSink = new TestListResultSink<>();
// Connect streams to sinks
source.addSink(allSink);
split.select("even").addSink(evenSink);
split.select("odd").addSink(oddSink);
// Execute
TestUtils.tryExecute(env, "Multi-Sink Test");
// Validate consistency across sinks
List<Integer> allResults = allSink.getResult();
List<Integer> evenResults = evenSink.getResult();
List<Integer> oddResults = oddSink.getResult();
// Check total count
assertEquals("All sink should have all elements", 10, allResults.size());
assertEquals("Even + odd should equal total",
evenResults.size() + oddResults.size(), allResults.size());
// Check partitioning correctness
Set<Integer> combinedResults = new HashSet<>();
combinedResults.addAll(evenResults);
combinedResults.addAll(oddResults);
assertEquals("Combined results should match original",
new HashSet<>(allResults), combinedResults);
}@Test
public void testThreadSafeCollection() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // Multiple parallel instances
// Use wrapper for thread-safe collection
TestListWrapper<Integer> resultWrapper = new TestListWrapper<>();
DataStream<Integer> source = env.fromSequence(1, 1000);
// Custom sink using wrapper
source.addSink(new SinkFunction<Integer>() {
@Override
public void invoke(Integer value) {
resultWrapper.add(value * 2); // Transform and collect
}
});
TestUtils.tryExecute(env, "Thread-Safe Collection Test");
// Verify results
List<Integer> results = resultWrapper.getList();
assertEquals("Should have all elements", 1000, results.size());
// Verify transformation applied
assertTrue("All values should be even",
results.stream().allMatch(x -> x % 2 == 0));
// Verify range
assertTrue("Should contain expected values",
results.containsAll(Arrays.asList(2, 4, 6, 8, 10)));
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tests-2-10