or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-collection.mdindex.mdmetrics-testing.mdsecurity-testing.mdtest-data-providers.mdtest-environments.md
tile.json

data-collection.mddocs/

Data Collection and Sources

Tools for creating controlled test data sources and collecting streaming results for validation in tests, including finite test sources with checkpoint synchronization and result collection utilities.

Capabilities

Finite Test Source

FiniteTestSource provides a controlled test source that emits elements in cycles with checkpoint synchronization, ideal for testing streaming applications with deterministic behavior.

/**
 * Test source that emits elements in cycles with checkpoint synchronization
 * @param <T> Type of elements to emit
 */
public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
    /**
     * Create source with vararg elements
     * @param elements Elements to emit
     */
    public FiniteTestSource(T... elements);

    /**
     * Create source with iterable elements
     * @param elements Elements to emit
     */
    public FiniteTestSource(Iterable<T> elements);

    /**
     * Create source with exit condition and timeout
     * @param exitCondition Supplier that returns true when source should stop
     * @param timeoutMs Timeout in milliseconds
     * @param elements Elements to emit
     */
    public FiniteTestSource(BooleanSupplier exitCondition, long timeoutMs, Iterable<T> elements);

    /**
     * Create source with exit condition
     * @param exitCondition Supplier that returns true when source should stop
     * @param elements Elements to emit
     */
    public FiniteTestSource(BooleanSupplier exitCondition, Iterable<T> elements);

    /**
     * Main source execution method
     * @param ctx Source context for emitting elements
     */
    public void run(SourceContext<T> ctx) throws Exception;

    /**
     * Cancels the source
     */
    public void cancel();

    /**
     * Checkpoint completion notification
     * @param checkpointId ID of completed checkpoint
     */
    public void notifyCheckpointComplete(long checkpointId) throws Exception;

    /**
     * Checkpoint abortion notification  
     * @param checkpointId ID of aborted checkpoint
     */
    public void notifyCheckpointAborted(long checkpointId) throws Exception;
}

Usage Examples:

import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

@Test
public void testFiniteSource() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // Simple finite source with vararg elements
    DataStream<String> stream1 = env.addSource(
        new FiniteTestSource<>("hello", "world", "flink")
    );
    
    // Source with collection of elements
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
    DataStream<Integer> stream2 = env.addSource(
        new FiniteTestSource<>(numbers)
    );
    
    // Source with exit condition - stops after 10 seconds or when condition is met
    AtomicBoolean shouldStop = new AtomicBoolean(false);
    DataStream<String> stream3 = env.addSource(
        new FiniteTestSource<>(
            () -> shouldStop.get(),
            10000L,
            Arrays.asList("data1", "data2", "data3")
        )
    );
    
    // Process streams
    stream1.print("Stream1");
    stream2.map(x -> x * 2).print("Stream2");
    stream3.print("Stream3");
    
    // In another thread, signal to stop after some processing
    Timer timer = new Timer();
    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            shouldStop.set(true);
        }
    }, 5000);
    
    env.execute("Finite Source Test");
}

Stream Collector

StreamCollector provides JUnit integration for collecting all elements from a DataStream for testing and validation.

/**
 * JUnit rule for collecting all elements from a DataStream for testing
 */
public class StreamCollector extends ExternalResource {
    /**
     * Collects all elements from the stream
     * @param stream DataStream to collect from
     * @return CompletableFuture that completes with collected elements
     */
    public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream);
}

Usage Example:

import org.apache.flink.streaming.util.StreamCollector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Rule;

public class StreamCollectionTest {
    
    @Rule
    public StreamCollector streamCollector = new StreamCollector();
    
    @Test
    public void testStreamCollection() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Create test stream
        DataStream<String> input = env.fromElements("apple", "banana", "cherry");
        DataStream<String> processed = input.map(String::toUpperCase);
        
        // Collect results
        CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);
        
        // Execute the job
        env.execute("Stream Collection Test");
        
        // Get results and validate
        Collection<String> results = resultFuture.get(10, TimeUnit.SECONDS);
        assertEquals(3, results.size());
        assertTrue(results.contains("APPLE"));
        assertTrue(results.contains("BANANA"));
        assertTrue(results.contains("CHERRY"));
    }
    
    @Test
    public void testStreamWithFiltering() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Create test stream with filtering
        DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        DataStream<Integer> evenNumbers = numbers.filter(x -> x % 2 == 0);
        
        // Collect filtered results
        CompletableFuture<Collection<Integer>> resultFuture = streamCollector.collect(evenNumbers);
        
        env.execute("Filtering Test");
        
        // Validate filtered results
        Collection<Integer> results = resultFuture.get();
        assertEquals(5, results.size());
        assertTrue(results.contains(2));
        assertTrue(results.contains(4));
        assertTrue(results.contains(6));
        assertTrue(results.contains(8));
        assertTrue(results.contains(10));
    }
}

Advanced Data Collection Patterns

Testing with Multiple Sources

@Test
public void testMultipleSources() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // Create multiple finite sources
    DataStream<String> source1 = env.addSource(
        new FiniteTestSource<>("a1", "a2", "a3")
    );
    
    DataStream<String> source2 = env.addSource(
        new FiniteTestSource<>("b1", "b2", "b3")
    );
    
    // Union the sources
    DataStream<String> combined = source1.union(source2);
    DataStream<String> processed = combined.map(s -> "processed-" + s);
    
    // Collect results
    CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);
    
    env.execute("Multiple Sources Test");
    
    // Validate combined results
    Collection<String> results = resultFuture.get();
    assertEquals(6, results.size());
    assertTrue(results.contains("processed-a1"));
    assertTrue(results.contains("processed-b1"));
}

Testing with Windowing

import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;

@Test
public void testWindowedStream() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // Create source with timestamped data
    DataStream<Tuple2<String, Integer>> input = env.addSource(
        new FiniteTestSource<>(
            Tuple2.of("key1", 1),
            Tuple2.of("key1", 2),
            Tuple2.of("key2", 3),
            Tuple2.of("key1", 4),
            Tuple2.of("key2", 5)
        )
    );
    
    // Apply windowing and aggregation
    DataStream<Tuple2<String, Integer>> windowed = input
        .keyBy(t -> t.f0)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
        .sum(1);
    
    // Collect windowed results
    CompletableFuture<Collection<Tuple2<String, Integer>>> resultFuture = 
        streamCollector.collect(windowed);
    
    env.execute("Windowed Stream Test");
    
    // Validate aggregated results
    Collection<Tuple2<String, Integer>> results = resultFuture.get();
    assertFalse(results.isEmpty());
    
    // Check that aggregation occurred
    boolean foundKey1Sum = results.stream()
        .anyMatch(t -> "key1".equals(t.f0) && t.f1 > 1);
    assertTrue("Expected aggregated key1 values", foundKey1Sum);
}

Testing with Checkpointing

@Test
public void testSourceWithCheckpointing() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // Enable checkpointing
    env.enableCheckpointing(100);
    
    // Create source that will emit elements and handle checkpoints
    List<String> elements = Arrays.asList("checkpoint1", "checkpoint2", "checkpoint3");
    AtomicInteger checkpointCount = new AtomicInteger(0);
    
    FiniteTestSource<String> source = new FiniteTestSource<String>(elements) {
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            super.notifyCheckpointComplete(checkpointId);
            checkpointCount.incrementAndGet();
        }
    };
    
    DataStream<String> stream = env.addSource(source);
    DataStream<String> processed = stream.map(s -> "checkpoint-processed-" + s);
    
    // Collect results
    CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);
    
    env.execute("Checkpointing Test");
    
    // Validate results and checkpointing
    Collection<String> results = resultFuture.get();
    assertEquals(3, results.size());
    assertTrue("Checkpoints should have been triggered", checkpointCount.get() > 0);
}

Testing Source Cancellation

@Test
public void testSourceCancellation() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // Create source with exit condition for controlled cancellation
    AtomicBoolean cancelled = new AtomicBoolean(false);
    List<String> elements = Arrays.asList("cancel1", "cancel2", "cancel3");
    
    FiniteTestSource<String> source = new FiniteTestSource<String>(
        () -> cancelled.get(),
        5000L, // 5 second timeout
        elements
    ) {
        @Override
        public void cancel() {
            super.cancel();
            cancelled.set(true);
        }
    };
    
    DataStream<String> stream = env.addSource(source);
    
    // Set up result collection
    CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(stream);
    
    // Cancel after short delay
    Timer timer = new Timer();
    timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            cancelled.set(true);
        }
    }, 1000, 100);
    
    env.execute("Cancellation Test");
    
    // Validate that source was properly cancelled
    Collection<String> results = resultFuture.get();
    assertTrue("Source should have been cancelled", cancelled.get());
}