or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

api-completeness.mdcheckpointing.mddata-generation.mdexecution.mdfault-tolerance.mdindex.mdstreaming.md
tile.json

streaming.mddocs/

Streaming Utilities

Specialized components for testing streaming operations including output selectors, result collection, stream partitioning utilities, and no-op functions for performance testing.

Output Selectors

EvenOddOutputSelector

Output selector for splitting integer streams based on even/odd values, commonly used for stream partitioning tests.

public class EvenOddOutputSelector implements OutputSelector<Integer> {
    @Override
    public Iterable<String> select(Integer value);
}

The selector returns:

  • "even" for even numbers (value % 2 == 0)
  • "odd" for odd numbers (value % 2 != 0)

Result Collection and Validation

TestListResultSink

Thread-safe sink function that collects streaming results into a list for test verification.

public class TestListResultSink<T> extends RichSinkFunction<T> {
    private int resultListId;
    
    public TestListResultSink();
    
    @Override
    public void open(Configuration parameters) throws Exception;
    
    @Override
    public void invoke(T value) throws Exception;
    
    @Override
    public void close() throws Exception;
    
    public List<T> getResult();
    public List<T> getSortedResult();
}

TestListWrapper

Singleton utility providing thread-safe access to multiple test result collections. Used internally by TestListResultSink for managing concurrent result collection.

public class TestListWrapper {
    private List<List<? extends Comparable>> lists;
    
    private TestListWrapper();
    
    public static TestListWrapper getInstance();
    public int createList();
    public List<?> getList(int listId);
}

ReceiveCheckNoOpSink

Sink that validates element reception and asserts at least one element was received during close(), useful for ensuring data flow.

public final class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
    private List<T> received;
    
    public ReceiveCheckNoOpSink();
    
    @Override
    public void open(Configuration conf);
    
    @Override
    public void invoke(T tuple);
    
    @Override
    public void close();
}

Processing Functions

NoOpIntMap

Pass-through map function for integers, used for pipeline testing without transformation overhead.

public class NoOpIntMap implements MapFunction<Integer, Integer> {
    @Override
    public Integer map(Integer value) throws Exception;
}

Usage Examples

Stream Partitioning with Output Selector

@Test
public void testStreamPartitioning() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    
    // Create source data
    DataStream<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    
    // Split stream using EvenOddOutputSelector
    SplitStream<Integer> splitStream = source.split(new EvenOddOutputSelector());
    
    // Process even and odd streams separately
    DataStream<Integer> evenStream = splitStream.select("even");
    DataStream<Integer> oddStream = splitStream.select("odd");
    
    // Collect results for verification
    TestListResultSink<Integer> evenSink = new TestListResultSink<>();
    TestListResultSink<Integer> oddSink = new TestListResultSink<>();
    
    evenStream.addSink(evenSink);
    oddStream.addSink(oddSink);
    
    // Execute job
    TestUtils.tryExecute(env, "Stream Partitioning Test");
    
    // Verify results
    List<Integer> evenResults = evenSink.getResult();
    List<Integer> oddResults = oddSink.getResult();
    
    assertEquals("Should have 5 even numbers", 5, evenResults.size());
    assertEquals("Should have 5 odd numbers", 5, oddResults.size());
    
    // Verify all even numbers are actually even
    assertTrue("All results should be even", 
        evenResults.stream().allMatch(x -> x % 2 == 0));
    assertTrue("All results should be odd", 
        oddResults.stream().allMatch(x -> x % 2 == 1));
}

Result Collection and Validation

@Test  
public void testResultCollection() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    
    // Create test data
    List<String> testData = Arrays.asList("hello", "world", "flink", "streaming");
    DataStream<String> source = env.fromCollection(testData);
    
    // Transform data
    DataStream<String> transformed = source
        .map(String::toUpperCase)
        .filter(s -> s.length() > 4);
    
    // Collect results
    TestListResultSink<String> resultSink = new TestListResultSink<>();
    transformed.addSink(resultSink);
    
    // Execute
    TestUtils.tryExecute(env, "Result Collection Test");
    
    // Validate results
    List<String> results = resultSink.getResult();
    
    assertEquals("Should have filtered results", 2, results.size());
    assertTrue("Should contain HELLO", results.contains("HELLO"));
    assertTrue("Should contain WORLD", results.contains("WORLD"));
    assertTrue("Should contain FLINK", results.contains("FLINK"));
    assertTrue("Should contain STREAMING", results.contains("STREAMING"));
}

Performance Testing with No-Op Functions

@Test
public void testStreamingPerformance() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(4);
    
    // Generate large dataset
    DataStream<Integer> source = env.fromSequence(1, 1000000);
    
    // Apply no-op transformations to test overhead
    DataStream<Integer> processed = source
        .map(new NoOpIntMap()) // No-op transformation
        .keyBy(x -> x % 100)   // Partition by key
        .map(new NoOpIntMap()) // Another no-op
        .filter(x -> true);    // Pass-through filter
    
    // Use no-op sink to measure throughput
    ReceiveCheckNoOpSink<Integer> sink = new ReceiveCheckNoOpSink<>();
    processed.addSink(sink);
    
    // Measure execution time
    long startTime = System.currentTimeMillis();
    TestUtils.tryExecute(env, "Performance Test");
    long duration = System.currentTimeMillis() - startTime;
    
    // Verify throughput
    long processedCount = sink.getReceivedCount();
    assertEquals("Should process all elements", 1000000, processedCount);
    
    double throughput = processedCount / (duration / 1000.0);
    System.out.println("Throughput: " + throughput + " elements/second");
    
    // Assert minimum throughput requirement
    assertTrue("Throughput should be reasonable", throughput > 10000);
}

Multi-Sink Result Validation

@Test
public void testMultiSinkValidation() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(2);
    
    DataStream<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    
    // Split into multiple streams
    SplitStream<Integer> split = source.split(new EvenOddOutputSelector());
    
    // Create multiple sinks
    TestListResultSink<Integer> allSink = new TestListResultSink<>();
    TestListResultSink<Integer> evenSink = new TestListResultSink<>();
    TestListResultSink<Integer> oddSink = new TestListResultSink<>();
    
    // Connect streams to sinks
    source.addSink(allSink);
    split.select("even").addSink(evenSink);
    split.select("odd").addSink(oddSink);
    
    // Execute
    TestUtils.tryExecute(env, "Multi-Sink Test");
    
    // Validate consistency across sinks
    List<Integer> allResults = allSink.getResult();
    List<Integer> evenResults = evenSink.getResult();
    List<Integer> oddResults = oddSink.getResult();
    
    // Check total count
    assertEquals("All sink should have all elements", 10, allResults.size());
    assertEquals("Even + odd should equal total", 
        evenResults.size() + oddResults.size(), allResults.size());
    
    // Check partitioning correctness
    Set<Integer> combinedResults = new HashSet<>();
    combinedResults.addAll(evenResults);
    combinedResults.addAll(oddResults);
    
    assertEquals("Combined results should match original", 
        new HashSet<>(allResults), combinedResults);
}

Thread-Safe Result Collection

@Test
public void testThreadSafeCollection() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(4); // Multiple parallel instances
    
    // Use wrapper for thread-safe collection
    TestListWrapper<Integer> resultWrapper = new TestListWrapper<>();
    
    DataStream<Integer> source = env.fromSequence(1, 1000);
    
    // Custom sink using wrapper
    source.addSink(new SinkFunction<Integer>() {
        @Override
        public void invoke(Integer value) {
            resultWrapper.add(value * 2); // Transform and collect
        }
    });
    
    TestUtils.tryExecute(env, "Thread-Safe Collection Test");
    
    // Verify results
    List<Integer> results = resultWrapper.getList();
    assertEquals("Should have all elements", 1000, results.size());
    
    // Verify transformation applied
    assertTrue("All values should be even", 
        results.stream().allMatch(x -> x % 2 == 0));
    
    // Verify range
    assertTrue("Should contain expected values",
        results.containsAll(Arrays.asList(2, 4, 6, 8, 10)));
}