or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

class-loading-programs.mdindex.mdmigration-testing.mdperformance-testing.mdstate-management-testing.mdstreaming-utilities.mdtest-base-classes.mdtest-data-generation.md
tile.json

streaming-utilities.mddocs/

Streaming Test Utilities

Thread-safe utilities for collecting and validating results in streaming Flink applications. These utilities provide mechanisms for result collection, test coordination, fault injection, validation, and streaming-specific helper functions that enable comprehensive testing of streaming topologies including failure scenarios.

Capabilities

Test List Result Sink

Thread-safe sink for collecting streaming results into lists for verification.

/**
 * Thread-safe sink that collects streaming results into a list for test verification
 */
public class TestListResultSink<T> extends RichSinkFunction<T> {
    
    /**
     * Default constructor creating empty result sink
     */
    public TestListResultSink();
    
    /**
     * Get the collected results as an unordered list
     * @return List containing all collected elements
     */
    public List<T> getResult();
    
    /**
     * Get the collected results sorted using natural ordering
     * @return Sorted list containing all collected elements
     */
    public List<T> getSortedResult();
    
    /**
     * Sink function implementation - adds element to internal collection
     * @param value Element to collect
     * @throws Exception if collection fails
     */
    public void invoke(T value) throws Exception;
}

Usage Example:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;

// Setup streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // Use parallelism 1 for deterministic results

// Create result collector
TestListResultSink<String> resultSink = new TestListResultSink<>();

// Build streaming topology
env.fromElements("hello", "world", "flink", "streaming")
   .map(String::toUpperCase)
   .addSink(resultSink);

// Execute and collect results
env.execute("Test Job");

// Verify results
List<String> results = resultSink.getResult();
assertEquals(4, results.size());
assertTrue(results.contains("HELLO"));
assertTrue(results.contains("WORLD"));

// Get sorted results for ordered verification
List<String> sortedResults = resultSink.getSortedResult();
assertEquals("FLINK", sortedResults.get(0));
assertEquals("HELLO", sortedResults.get(1));

Test List Wrapper

Singleton catalog for managing multiple test result lists across different test scenarios.

/**
 * Singleton catalog for managing multiple test result lists
 */
public class TestListWrapper {
    
    /**
     * Get the singleton instance of TestListWrapper
     * @return The singleton TestListWrapper instance
     */
    public static TestListWrapper getInstance();
    
    /**
     * Create a new result list and return its ID
     * @return Integer ID for the newly created list
     */
    public int createList();
    
    /**
     * Retrieve a result list by its ID
     * @param listId ID of the list to retrieve
     * @return List of collected objects, or null if ID doesn't exist
     */
    public List<Object> getList(int listId);
}

Usage Example:

import org.apache.flink.test.streaming.runtime.util.TestListWrapper;

// Get wrapper instance
TestListWrapper wrapper = TestListWrapper.getInstance();

// Create lists for different test scenarios
int sourceResultsId = wrapper.createList();
int processedResultsId = wrapper.createList();

// Use in streaming topology (pseudocode - would need custom sink implementation)
dataStream1.addSink(new ListCollectorSink(sourceResultsId));
dataStream2.addSink(new ListCollectorSink(processedResultsId));

// After execution, retrieve results
List<Object> sourceResults = wrapper.getList(sourceResultsId);
List<Object> processedResults = wrapper.getList(processedResultsId);

// Verify both result sets
assertEquals(expectedSourceCount, sourceResults.size());
assertEquals(expectedProcessedCount, processedResults.size());

Output Selectors and Transformations

Utility functions and selectors for streaming data routing and transformation.

/**
 * Output selector that routes integers based on even/odd criteria
 */
public class EvenOddOutputSelector implements OutputSelector<Integer> {
    
    /**
     * Select output names based on whether the integer is even or odd
     * @param value Integer value to evaluate
     * @return Iterable of output names ("even" or "odd")
     */
    public Iterable<String> select(Integer value);
}

/**
 * No-operation mapper that returns input unchanged
 */
public class NoOpIntMap implements MapFunction<Integer, Integer> {
    
    /**
     * Identity mapping function for integers
     * @param value Input integer
     * @return Same integer unchanged
     * @throws Exception if mapping fails
     */
    public Integer map(Integer value) throws Exception;
}

/**
 * No-operation sink that receives elements but does nothing with them
 */
public class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
    
    /**
     * Constructor with element count tracking
     * @param expectedElementCount Expected number of elements to receive
     */
    public ReceiveCheckNoOpSink(int expectedElementCount);
    
    /**
     * Receive element but perform no operation
     * @param value Element to receive
     * @param context Sink context
     * @throws Exception if receive fails
     */
    public void invoke(T value, Context context) throws Exception;
    
    /**
     * Check if expected number of elements were received
     * @return true if expected count was reached
     */
    public boolean isExpectedCountReached();
}

Fault Injection Utilities

Sources and sinks for testing failure scenarios and fault tolerance in streaming applications.

/**
 * Source that introduces artificial failures for testing fault tolerance
 */
public class FailingSource<T> extends RichSourceFunction<T> {
    
    /**
     * Constructor for failing source with custom event generator
     * @param generator Custom generator for emitting events before failure
     * @param failAfterElements Number of elements to emit before inducing failure
     */
    public FailingSource(EventEmittingGenerator<T> generator, int failAfterElements);
    
    /**
     * Interface for custom event generation in failing source
     */
    public static interface EventEmittingGenerator<T> extends Serializable {
        /**
         * Emit a single event to the source context
         * @param ctx Source context for emitting events
         * @param eventSequenceNo Sequence number of current event
         */
        public void emitEvent(SourceContext<T> ctx, int eventSequenceNo);
    }
}

/**
 * Sink for validating streaming results with custom validation logic
 */
public class ValidatingSink<T> extends RichSinkFunction<T> {
    
    /**
     * Constructor with result checker and count updater
     * @param resultChecker Custom checker for validating individual results
     * @param countUpdater Updater for tracking element counts
     */
    public ValidatingSink(ResultChecker<T> resultChecker, CountUpdater countUpdater);
    
    /**
     * Interface for custom result validation logic
     */
    public static interface ResultChecker<T> extends Serializable {
        /**
         * Check if a result meets validation criteria
         * @param result Result element to validate
         * @return true if result passes validation
         */
        public boolean checkResult(T result);
    }
    
    /**
     * Interface for updating element counts during validation
     */
    public static interface CountUpdater extends Serializable {
        /**
         * Update the element count
         * @param count Current element count
         */
        public void updateCount(long count);
    }
}

Fault Injection Usage Examples:

import org.apache.flink.test.checkpointing.utils.FailingSource;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;

// Create a failing source that emits integers and fails after 1000 elements
FailingSource<Integer> failingSource = new FailingSource<>(
    new FailingSource.EventEmittingGenerator<Integer>() {
        @Override
        public void emitEvent(SourceContext<Integer> ctx, int eventSequenceNo) {
            ctx.collect(eventSequenceNo);
        }
    },
    1000 // Fail after 1000 elements
);

// Create a validating sink that checks for positive integers
ValidatingSink<Integer> validatingSink = new ValidatingSink<>(
    new ValidatingSink.ResultChecker<Integer>() {
        @Override
        public boolean checkResult(Integer result) {
            return result > 0; // Only accept positive integers
        }
    },
    new ValidatingSink.CountUpdater() {
        @Override
        public void updateCount(long count) {
            // Track count of validated elements
            System.out.println("Validated elements: " + count);
        }
    }
);

// Build fault-tolerant streaming topology
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000); // Enable checkpointing for fault tolerance

env.addSource(failingSource)
   .map(x -> Math.abs(x)) // Ensure positive values
   .addSink(validatingSink);

// Job will fail after 1000 elements, then restart and validate recovery
env.execute("Fault Tolerance Test");

Test Execution Utilities

Utilities for executing streaming tests with specialized exception handling.

/**
 * Utility class for streaming test execution
 */
public class TestUtils {
    
    /**
     * Execute streaming job with SuccessException handling
     * @param see StreamExecutionEnvironment configured for execution
     * @param name Job name for identification
     * @throws Exception if execution fails for reasons other than SuccessException
     */
    public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception;
}

/**
 * Exception thrown to indicate successful test completion
 */
public class SuccessException extends RuntimeException {
    
    /**
     * Default constructor for success indication
     */
    public SuccessException();
    
    /**
     * Constructor with success message
     * @param message Success message
     */
    public SuccessException(String message);
}

Streaming Test Patterns

Common patterns for implementing streaming tests:

Basic Result Collection Pattern:

@Test
public void testStreamingTransformation() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    
    // Create result collector
    TestListResultSink<Integer> resultSink = new TestListResultSink<>();
    
    // Build topology
    env.fromElements(1, 2, 3, 4, 5)
       .map(x -> x * 2)
       .filter(x -> x > 5)
       .addSink(resultSink);
    
    // Execute and verify
    env.execute("Transformation Test");
    
    List<Integer> results = resultSink.getSortedResult();
    assertEquals(Arrays.asList(6, 8, 10), results);
}

Multi-Stream Result Collection:

@Test
public void testMultiStreamProcessing() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    TestListWrapper wrapper = TestListWrapper.getInstance();
    int evenResultsId = wrapper.createList();
    int oddResultsId = wrapper.createList();
    
    DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6);
    
    // Split stream using output selector
    SplitStream<Integer> splitStream = numbers.split(new EvenOddOutputSelector());
    
    splitStream.select("even").addSink(new CustomListSink(evenResultsId));
    splitStream.select("odd").addSink(new CustomListSink(oddResultsId));
    
    env.execute("Multi-Stream Test");
    
    // Verify results
    List<Object> evenResults = wrapper.getList(evenResultsId);
    List<Object> oddResults = wrapper.getList(oddResultsId);
    
    assertEquals(3, evenResults.size()); // 2, 4, 6
    assertEquals(3, oddResults.size());  // 1, 3, 5
}

Success Exception Pattern:

@Test
public void testSuccessfulCompletion() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    env.fromElements("test")
       .map(value -> {
           // Simulate successful completion
           throw new SuccessException("Test completed successfully");
       })
       .addSink(new DiscardingSink<>());
    
    // Execute with success exception handling
    TestUtils.tryExecute(env, "Success Test");
    
    // Test passes if SuccessException was caught and handled
}

Parallel Result Collection:

@Test
public void testParallelResultCollection() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(4);
    
    TestListResultSink<String> resultSink = new TestListResultSink<>();
    
    env.fromCollection(generateLargeDataset())
       .map(data -> processData(data))
       .addSink(resultSink);
    
    env.execute("Parallel Test");
    
    // Results from parallel execution - order not guaranteed
    List<String> results = resultSink.getResult();
    assertEquals(expectedTotalCount, results.size());
    
    // Use sorted results for deterministic verification
    List<String> sortedResults = resultSink.getSortedResult();
    assertEquals(expectedFirstElement, sortedResults.get(0));
    assertEquals(expectedLastElement, sortedResults.get(sortedResults.size() - 1));
}

Custom Sink with Count Verification:

@Test
public void testElementCountVerification() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    int expectedCount = 1000;
    ReceiveCheckNoOpSink<Integer> countingSink = new ReceiveCheckNoOpSink<>(expectedCount);
    
    env.fromCollection(generateIntegerSequence(expectedCount))
       .addSink(countingSink);
    
    env.execute("Count Verification Test");
    
    assertTrue("Expected count not reached", countingSink.isExpectedCountReached());
}

These streaming utilities provide the foundation for reliable, deterministic testing of streaming Flink applications, enabling comprehensive verification of streaming transformations, windowing operations, and stateful processing.