Specialized components for testing streaming operations including output selectors, result collection, stream partitioning utilities, and no-op functions for performance testing.
Output selector for splitting integer streams based on even/odd values, commonly used for stream partitioning tests.
public class EvenOddOutputSelector implements OutputSelector<Integer> {
@Override
public Iterable<String> select(Integer value);
}The selector returns:
"even" for even numbers (value % 2 == 0)"odd" for odd numbers (value % 2 != 0)Thread-safe sink function that collects streaming results into a list for test verification.
public class TestListResultSink<T> extends RichSinkFunction<T> {
private int resultListId;
public TestListResultSink();
@Override
public void open(Configuration parameters) throws Exception;
@Override
public void invoke(T value) throws Exception;
@Override
public void close() throws Exception;
public List<T> getResult();
public List<T> getSortedResult();
}Singleton utility providing thread-safe access to multiple test result collections. Used internally by TestListResultSink for managing concurrent result collection.
public class TestListWrapper {
private List<List<? extends Comparable>> lists;
private TestListWrapper();
public static TestListWrapper getInstance();
public int createList();
public List<?> getList(int listId);
}Sink that validates element reception and asserts at least one element was received during close(), useful for ensuring data flow.
public final class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
private List<T> received;
public ReceiveCheckNoOpSink();
@Override
public void open(Configuration conf);
@Override
public void invoke(T tuple);
@Override
public void close();
}Pass-through map function for integers, used for pipeline testing without transformation overhead.
public class NoOpIntMap implements MapFunction<Integer, Integer> {
@Override
public Integer map(Integer value) throws Exception;
}@Test
public void testStreamPartitioning() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Create source data
DataStream<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Split stream using EvenOddOutputSelector
SplitStream<Integer> splitStream = source.split(new EvenOddOutputSelector());
// Process even and odd streams separately
DataStream<Integer> evenStream = splitStream.select("even");
DataStream<Integer> oddStream = splitStream.select("odd");
// Collect results for verification
TestListResultSink<Integer> evenSink = new TestListResultSink<>();
TestListResultSink<Integer> oddSink = new TestListResultSink<>();
evenStream.addSink(evenSink);
oddStream.addSink(oddSink);
// Execute job
TestUtils.tryExecute(env, "Stream Partitioning Test");
// Verify results
List<Integer> evenResults = evenSink.getResult();
List<Integer> oddResults = oddSink.getResult();
assertEquals("Should have 5 even numbers", 5, evenResults.size());
assertEquals("Should have 5 odd numbers", 5, oddResults.size());
// Verify all even numbers are actually even
assertTrue("All results should be even",
evenResults.stream().allMatch(x -> x % 2 == 0));
assertTrue("All results should be odd",
oddResults.stream().allMatch(x -> x % 2 == 1));
}@Test
public void testResultCollection() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Create test data
List<String> testData = Arrays.asList("hello", "world", "flink", "streaming");
DataStream<String> source = env.fromCollection(testData);
// Transform data
DataStream<String> transformed = source
.map(String::toUpperCase)
.filter(s -> s.length() > 4);
// Collect results
TestListResultSink<String> resultSink = new TestListResultSink<>();
transformed.addSink(resultSink);
// Execute
TestUtils.tryExecute(env, "Result Collection Test");
// Validate results
List<String> results = resultSink.getResult();
assertEquals("Should have filtered results", 2, results.size());
assertTrue("Should contain HELLO", results.contains("HELLO"));
assertTrue("Should contain WORLD", results.contains("WORLD"));
assertTrue("Should contain FLINK", results.contains("FLINK"));
assertTrue("Should contain STREAMING", results.contains("STREAMING"));
}@Test
public void testStreamingPerformance() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// Generate large dataset
DataStream<Integer> source = env.fromSequence(1, 1000000);
// Apply no-op transformations to test overhead
DataStream<Integer> processed = source
.map(new NoOpIntMap()) // No-op transformation
.keyBy(x -> x % 100) // Partition by key
.map(new NoOpIntMap()) // Another no-op
.filter(x -> true); // Pass-through filter
// Use no-op sink to measure throughput
ReceiveCheckNoOpSink<Integer> sink = new ReceiveCheckNoOpSink<>();
processed.addSink(sink);
// Measure execution time
long startTime = System.currentTimeMillis();
TestUtils.tryExecute(env, "Performance Test");
long duration = System.currentTimeMillis() - startTime;
// Verify throughput
long processedCount = sink.getReceivedCount();
assertEquals("Should process all elements", 1000000, processedCount);
double throughput = processedCount / (duration / 1000.0);
System.out.println("Throughput: " + throughput + " elements/second");
// Assert minimum throughput requirement
assertTrue("Throughput should be reasonable", throughput > 10000);
}@Test
public void testMultiSinkValidation() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Split into multiple streams
SplitStream<Integer> split = source.split(new EvenOddOutputSelector());
// Create multiple sinks
TestListResultSink<Integer> allSink = new TestListResultSink<>();
TestListResultSink<Integer> evenSink = new TestListResultSink<>();
TestListResultSink<Integer> oddSink = new TestListResultSink<>();
// Connect streams to sinks
source.addSink(allSink);
split.select("even").addSink(evenSink);
split.select("odd").addSink(oddSink);
// Execute
TestUtils.tryExecute(env, "Multi-Sink Test");
// Validate consistency across sinks
List<Integer> allResults = allSink.getResult();
List<Integer> evenResults = evenSink.getResult();
List<Integer> oddResults = oddSink.getResult();
// Check total count
assertEquals("All sink should have all elements", 10, allResults.size());
assertEquals("Even + odd should equal total",
evenResults.size() + oddResults.size(), allResults.size());
// Check partitioning correctness
Set<Integer> combinedResults = new HashSet<>();
combinedResults.addAll(evenResults);
combinedResults.addAll(oddResults);
assertEquals("Combined results should match original",
new HashSet<>(allResults), combinedResults);
}@Test
public void testThreadSafeCollection() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // Multiple parallel instances
// Use wrapper for thread-safe collection
TestListWrapper<Integer> resultWrapper = new TestListWrapper<>();
DataStream<Integer> source = env.fromSequence(1, 1000);
// Custom sink using wrapper
source.addSink(new SinkFunction<Integer>() {
@Override
public void invoke(Integer value) {
resultWrapper.add(value * 2); // Transform and collect
}
});
TestUtils.tryExecute(env, "Thread-Safe Collection Test");
// Verify results
List<Integer> results = resultWrapper.getList();
assertEquals("Should have all elements", 1000, results.size());
// Verify transformation applied
assertTrue("All values should be even",
results.stream().allMatch(x -> x % 2 == 0));
// Verify range
assertTrue("Should contain expected values",
results.containsAll(Arrays.asList(2, 4, 6, 8, 10)));
}