CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-tests-2-11

Comprehensive testing infrastructure and utilities for Apache Flink stream processing framework

Pending
Overview
Eval results
Files

streaming-utilities.mddocs/

Streaming Test Utilities

Thread-safe utilities for collecting and validating results in streaming Flink applications. These utilities provide mechanisms for result collection, test coordination, fault injection, validation, and streaming-specific helper functions that enable comprehensive testing of streaming topologies including failure scenarios.

Capabilities

Test List Result Sink

Thread-safe sink for collecting streaming results into lists for verification.

/**
 * Thread-safe sink that collects streaming results into a list for test verification
 */
public class TestListResultSink<T> extends RichSinkFunction<T> {
    
    /**
     * Default constructor creating empty result sink
     */
    public TestListResultSink();
    
    /**
     * Get the collected results as an unordered list
     * @return List containing all collected elements
     */
    public List<T> getResult();
    
    /**
     * Get the collected results sorted using natural ordering
     * @return Sorted list containing all collected elements
     */
    public List<T> getSortedResult();
    
    /**
     * Sink function implementation - adds element to internal collection
     * @param value Element to collect
     * @throws Exception if collection fails
     */
    public void invoke(T value) throws Exception;
}

Usage Example:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;

// Setup streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // Use parallelism 1 for deterministic results

// Create result collector
TestListResultSink<String> resultSink = new TestListResultSink<>();

// Build streaming topology
env.fromElements("hello", "world", "flink", "streaming")
   .map(String::toUpperCase)
   .addSink(resultSink);

// Execute and collect results
env.execute("Test Job");

// Verify results
List<String> results = resultSink.getResult();
assertEquals(4, results.size());
assertTrue(results.contains("HELLO"));
assertTrue(results.contains("WORLD"));

// Get sorted results for ordered verification
List<String> sortedResults = resultSink.getSortedResult();
assertEquals("FLINK", sortedResults.get(0));
assertEquals("HELLO", sortedResults.get(1));

Test List Wrapper

Singleton catalog for managing multiple test result lists across different test scenarios.

/**
 * Singleton catalog for managing multiple test result lists
 */
public class TestListWrapper {
    
    /**
     * Get the singleton instance of TestListWrapper
     * @return The singleton TestListWrapper instance
     */
    public static TestListWrapper getInstance();
    
    /**
     * Create a new result list and return its ID
     * @return Integer ID for the newly created list
     */
    public int createList();
    
    /**
     * Retrieve a result list by its ID
     * @param listId ID of the list to retrieve
     * @return List of collected objects, or null if ID doesn't exist
     */
    public List<Object> getList(int listId);
}

Usage Example:

import org.apache.flink.test.streaming.runtime.util.TestListWrapper;

// Get wrapper instance
TestListWrapper wrapper = TestListWrapper.getInstance();

// Create lists for different test scenarios
int sourceResultsId = wrapper.createList();
int processedResultsId = wrapper.createList();

// Use in streaming topology (pseudocode - would need custom sink implementation)
dataStream1.addSink(new ListCollectorSink(sourceResultsId));
dataStream2.addSink(new ListCollectorSink(processedResultsId));

// After execution, retrieve results
List<Object> sourceResults = wrapper.getList(sourceResultsId);
List<Object> processedResults = wrapper.getList(processedResultsId);

// Verify both result sets
assertEquals(expectedSourceCount, sourceResults.size());
assertEquals(expectedProcessedCount, processedResults.size());

Output Selectors and Transformations

Utility functions and selectors for streaming data routing and transformation.

/**
 * Output selector that routes integers based on even/odd criteria
 */
public class EvenOddOutputSelector implements OutputSelector<Integer> {
    
    /**
     * Select output names based on whether the integer is even or odd
     * @param value Integer value to evaluate
     * @return Iterable of output names ("even" or "odd")
     */
    public Iterable<String> select(Integer value);
}

/**
 * No-operation mapper that returns input unchanged
 */
public class NoOpIntMap implements MapFunction<Integer, Integer> {
    
    /**
     * Identity mapping function for integers
     * @param value Input integer
     * @return Same integer unchanged
     * @throws Exception if mapping fails
     */
    public Integer map(Integer value) throws Exception;
}

/**
 * No-operation sink that receives elements but does nothing with them
 */
public class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
    
    /**
     * Constructor with element count tracking
     * @param expectedElementCount Expected number of elements to receive
     */
    public ReceiveCheckNoOpSink(int expectedElementCount);
    
    /**
     * Receive element but perform no operation
     * @param value Element to receive
     * @param context Sink context
     * @throws Exception if receive fails
     */
    public void invoke(T value, Context context) throws Exception;
    
    /**
     * Check if expected number of elements were received
     * @return true if expected count was reached
     */
    public boolean isExpectedCountReached();
}

Fault Injection Utilities

Sources and sinks for testing failure scenarios and fault tolerance in streaming applications.

/**
 * Source that introduces artificial failures for testing fault tolerance
 */
public class FailingSource<T> extends RichSourceFunction<T> {
    
    /**
     * Constructor for failing source with custom event generator
     * @param generator Custom generator for emitting events before failure
     * @param failAfterElements Number of elements to emit before inducing failure
     */
    public FailingSource(EventEmittingGenerator<T> generator, int failAfterElements);
    
    /**
     * Interface for custom event generation in failing source
     */
    public static interface EventEmittingGenerator<T> extends Serializable {
        /**
         * Emit a single event to the source context
         * @param ctx Source context for emitting events
         * @param eventSequenceNo Sequence number of current event
         */
        public void emitEvent(SourceContext<T> ctx, int eventSequenceNo);
    }
}

/**
 * Sink for validating streaming results with custom validation logic
 */
public class ValidatingSink<T> extends RichSinkFunction<T> {
    
    /**
     * Constructor with result checker and count updater
     * @param resultChecker Custom checker for validating individual results
     * @param countUpdater Updater for tracking element counts
     */
    public ValidatingSink(ResultChecker<T> resultChecker, CountUpdater countUpdater);
    
    /**
     * Interface for custom result validation logic
     */
    public static interface ResultChecker<T> extends Serializable {
        /**
         * Check if a result meets validation criteria
         * @param result Result element to validate
         * @return true if result passes validation
         */
        public boolean checkResult(T result);
    }
    
    /**
     * Interface for updating element counts during validation
     */
    public static interface CountUpdater extends Serializable {
        /**
         * Update the element count
         * @param count Current element count
         */
        public void updateCount(long count);
    }
}

Fault Injection Usage Examples:

import org.apache.flink.test.checkpointing.utils.FailingSource;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;

// Create a failing source that emits integers and fails after 1000 elements
FailingSource<Integer> failingSource = new FailingSource<>(
    new FailingSource.EventEmittingGenerator<Integer>() {
        @Override
        public void emitEvent(SourceContext<Integer> ctx, int eventSequenceNo) {
            ctx.collect(eventSequenceNo);
        }
    },
    1000 // Fail after 1000 elements
);

// Create a validating sink that checks for positive integers
ValidatingSink<Integer> validatingSink = new ValidatingSink<>(
    new ValidatingSink.ResultChecker<Integer>() {
        @Override
        public boolean checkResult(Integer result) {
            return result > 0; // Only accept positive integers
        }
    },
    new ValidatingSink.CountUpdater() {
        @Override
        public void updateCount(long count) {
            // Track count of validated elements
            System.out.println("Validated elements: " + count);
        }
    }
);

// Build fault-tolerant streaming topology
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000); // Enable checkpointing for fault tolerance

env.addSource(failingSource)
   .map(x -> Math.abs(x)) // Ensure positive values
   .addSink(validatingSink);

// Job will fail after 1000 elements, then restart and validate recovery
env.execute("Fault Tolerance Test");

Test Execution Utilities

Utilities for executing streaming tests with specialized exception handling.

/**
 * Utility class for streaming test execution
 */
public class TestUtils {
    
    /**
     * Execute streaming job with SuccessException handling
     * @param see StreamExecutionEnvironment configured for execution
     * @param name Job name for identification
     * @throws Exception if execution fails for reasons other than SuccessException
     */
    public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception;
}

/**
 * Exception thrown to indicate successful test completion
 */
public class SuccessException extends RuntimeException {
    
    /**
     * Default constructor for success indication
     */
    public SuccessException();
    
    /**
     * Constructor with success message
     * @param message Success message
     */
    public SuccessException(String message);
}

Streaming Test Patterns

Common patterns for implementing streaming tests:

Basic Result Collection Pattern:

@Test
public void testStreamingTransformation() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    
    // Create result collector
    TestListResultSink<Integer> resultSink = new TestListResultSink<>();
    
    // Build topology
    env.fromElements(1, 2, 3, 4, 5)
       .map(x -> x * 2)
       .filter(x -> x > 5)
       .addSink(resultSink);
    
    // Execute and verify
    env.execute("Transformation Test");
    
    List<Integer> results = resultSink.getSortedResult();
    assertEquals(Arrays.asList(6, 8, 10), results);
}

Multi-Stream Result Collection:

@Test
public void testMultiStreamProcessing() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    TestListWrapper wrapper = TestListWrapper.getInstance();
    int evenResultsId = wrapper.createList();
    int oddResultsId = wrapper.createList();
    
    DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6);
    
    // Split stream using output selector
    SplitStream<Integer> splitStream = numbers.split(new EvenOddOutputSelector());
    
    splitStream.select("even").addSink(new CustomListSink(evenResultsId));
    splitStream.select("odd").addSink(new CustomListSink(oddResultsId));
    
    env.execute("Multi-Stream Test");
    
    // Verify results
    List<Object> evenResults = wrapper.getList(evenResultsId);
    List<Object> oddResults = wrapper.getList(oddResultsId);
    
    assertEquals(3, evenResults.size()); // 2, 4, 6
    assertEquals(3, oddResults.size());  // 1, 3, 5
}

Success Exception Pattern:

@Test
public void testSuccessfulCompletion() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    env.fromElements("test")
       .map(value -> {
           // Simulate successful completion
           throw new SuccessException("Test completed successfully");
       })
       .addSink(new DiscardingSink<>());
    
    // Execute with success exception handling
    TestUtils.tryExecute(env, "Success Test");
    
    // Test passes if SuccessException was caught and handled
}

Parallel Result Collection:

@Test
public void testParallelResultCollection() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(4);
    
    TestListResultSink<String> resultSink = new TestListResultSink<>();
    
    env.fromCollection(generateLargeDataset())
       .map(data -> processData(data))
       .addSink(resultSink);
    
    env.execute("Parallel Test");
    
    // Results from parallel execution - order not guaranteed
    List<String> results = resultSink.getResult();
    assertEquals(expectedTotalCount, results.size());
    
    // Use sorted results for deterministic verification
    List<String> sortedResults = resultSink.getSortedResult();
    assertEquals(expectedFirstElement, sortedResults.get(0));
    assertEquals(expectedLastElement, sortedResults.get(sortedResults.size() - 1));
}

Custom Sink with Count Verification:

@Test
public void testElementCountVerification() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    int expectedCount = 1000;
    ReceiveCheckNoOpSink<Integer> countingSink = new ReceiveCheckNoOpSink<>(expectedCount);
    
    env.fromCollection(generateIntegerSequence(expectedCount))
       .addSink(countingSink);
    
    env.execute("Count Verification Test");
    
    assertTrue("Expected count not reached", countingSink.isExpectedCountReached());
}

These streaming utilities provide the foundation for reliable, deterministic testing of streaming Flink applications, enabling comprehensive verification of streaming transformations, windowing operations, and stateful processing.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-tests-2-11

docs

class-loading-programs.md

index.md

migration-testing.md

performance-testing.md

state-management-testing.md

streaming-utilities.md

test-base-classes.md

test-data-generation.md

tile.json