Tools for creating controlled test data sources and collecting streaming results for validation in tests, including finite test sources with checkpoint synchronization and result collection utilities.
FiniteTestSource provides a controlled test source that emits elements in cycles with checkpoint synchronization, ideal for testing streaming applications with deterministic behavior.
/**
* Test source that emits elements in cycles with checkpoint synchronization
* @param <T> Type of elements to emit
*/
public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
/**
* Create source with vararg elements
* @param elements Elements to emit
*/
public FiniteTestSource(T... elements);
/**
* Create source with iterable elements
* @param elements Elements to emit
*/
public FiniteTestSource(Iterable<T> elements);
/**
* Create source with exit condition and timeout
* @param exitCondition Supplier that returns true when source should stop
* @param timeoutMs Timeout in milliseconds
* @param elements Elements to emit
*/
public FiniteTestSource(BooleanSupplier exitCondition, long timeoutMs, Iterable<T> elements);
/**
* Create source with exit condition
* @param exitCondition Supplier that returns true when source should stop
* @param elements Elements to emit
*/
public FiniteTestSource(BooleanSupplier exitCondition, Iterable<T> elements);
/**
* Main source execution method
* @param ctx Source context for emitting elements
*/
public void run(SourceContext<T> ctx) throws Exception;
/**
* Cancels the source
*/
public void cancel();
/**
* Checkpoint completion notification
* @param checkpointId ID of completed checkpoint
*/
public void notifyCheckpointComplete(long checkpointId) throws Exception;
/**
* Checkpoint abortion notification
* @param checkpointId ID of aborted checkpoint
*/
public void notifyCheckpointAborted(long checkpointId) throws Exception;
}Usage Examples:
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@Test
public void testFiniteSource() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Simple finite source with vararg elements
DataStream<String> stream1 = env.addSource(
new FiniteTestSource<>("hello", "world", "flink")
);
// Source with collection of elements
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
DataStream<Integer> stream2 = env.addSource(
new FiniteTestSource<>(numbers)
);
// Source with exit condition - stops after 10 seconds or when condition is met
AtomicBoolean shouldStop = new AtomicBoolean(false);
DataStream<String> stream3 = env.addSource(
new FiniteTestSource<>(
() -> shouldStop.get(),
10000L,
Arrays.asList("data1", "data2", "data3")
)
);
// Process streams
stream1.print("Stream1");
stream2.map(x -> x * 2).print("Stream2");
stream3.print("Stream3");
// In another thread, signal to stop after some processing
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
shouldStop.set(true);
}
}, 5000);
env.execute("Finite Source Test");
}StreamCollector provides JUnit integration for collecting all elements from a DataStream for testing and validation.
/**
* JUnit rule for collecting all elements from a DataStream for testing
*/
public class StreamCollector extends ExternalResource {
/**
* Collects all elements from the stream
* @param stream DataStream to collect from
* @return CompletableFuture that completes with collected elements
*/
public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream);
}Usage Example:
import org.apache.flink.streaming.util.StreamCollector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Rule;
public class StreamCollectionTest {
@Rule
public StreamCollector streamCollector = new StreamCollector();
@Test
public void testStreamCollection() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create test stream
DataStream<String> input = env.fromElements("apple", "banana", "cherry");
DataStream<String> processed = input.map(String::toUpperCase);
// Collect results
CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);
// Execute the job
env.execute("Stream Collection Test");
// Get results and validate
Collection<String> results = resultFuture.get(10, TimeUnit.SECONDS);
assertEquals(3, results.size());
assertTrue(results.contains("APPLE"));
assertTrue(results.contains("BANANA"));
assertTrue(results.contains("CHERRY"));
}
@Test
public void testStreamWithFiltering() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create test stream with filtering
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
DataStream<Integer> evenNumbers = numbers.filter(x -> x % 2 == 0);
// Collect filtered results
CompletableFuture<Collection<Integer>> resultFuture = streamCollector.collect(evenNumbers);
env.execute("Filtering Test");
// Validate filtered results
Collection<Integer> results = resultFuture.get();
assertEquals(5, results.size());
assertTrue(results.contains(2));
assertTrue(results.contains(4));
assertTrue(results.contains(6));
assertTrue(results.contains(8));
assertTrue(results.contains(10));
}
}@Test
public void testMultipleSources() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create multiple finite sources
DataStream<String> source1 = env.addSource(
new FiniteTestSource<>("a1", "a2", "a3")
);
DataStream<String> source2 = env.addSource(
new FiniteTestSource<>("b1", "b2", "b3")
);
// Union the sources
DataStream<String> combined = source1.union(source2);
DataStream<String> processed = combined.map(s -> "processed-" + s);
// Collect results
CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);
env.execute("Multiple Sources Test");
// Validate combined results
Collection<String> results = resultFuture.get();
assertEquals(6, results.size());
assertTrue(results.contains("processed-a1"));
assertTrue(results.contains("processed-b1"));
}import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
@Test
public void testWindowedStream() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create source with timestamped data
DataStream<Tuple2<String, Integer>> input = env.addSource(
new FiniteTestSource<>(
Tuple2.of("key1", 1),
Tuple2.of("key1", 2),
Tuple2.of("key2", 3),
Tuple2.of("key1", 4),
Tuple2.of("key2", 5)
)
);
// Apply windowing and aggregation
DataStream<Tuple2<String, Integer>> windowed = input
.keyBy(t -> t.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.sum(1);
// Collect windowed results
CompletableFuture<Collection<Tuple2<String, Integer>>> resultFuture =
streamCollector.collect(windowed);
env.execute("Windowed Stream Test");
// Validate aggregated results
Collection<Tuple2<String, Integer>> results = resultFuture.get();
assertFalse(results.isEmpty());
// Check that aggregation occurred
boolean foundKey1Sum = results.stream()
.anyMatch(t -> "key1".equals(t.f0) && t.f1 > 1);
assertTrue("Expected aggregated key1 values", foundKey1Sum);
}@Test
public void testSourceWithCheckpointing() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing
env.enableCheckpointing(100);
// Create source that will emit elements and handle checkpoints
List<String> elements = Arrays.asList("checkpoint1", "checkpoint2", "checkpoint3");
AtomicInteger checkpointCount = new AtomicInteger(0);
FiniteTestSource<String> source = new FiniteTestSource<String>(elements) {
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
checkpointCount.incrementAndGet();
}
};
DataStream<String> stream = env.addSource(source);
DataStream<String> processed = stream.map(s -> "checkpoint-processed-" + s);
// Collect results
CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);
env.execute("Checkpointing Test");
// Validate results and checkpointing
Collection<String> results = resultFuture.get();
assertEquals(3, results.size());
assertTrue("Checkpoints should have been triggered", checkpointCount.get() > 0);
}@Test
public void testSourceCancellation() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create source with exit condition for controlled cancellation
AtomicBoolean cancelled = new AtomicBoolean(false);
List<String> elements = Arrays.asList("cancel1", "cancel2", "cancel3");
FiniteTestSource<String> source = new FiniteTestSource<String>(
() -> cancelled.get(),
5000L, // 5 second timeout
elements
) {
@Override
public void cancel() {
super.cancel();
cancelled.set(true);
}
};
DataStream<String> stream = env.addSource(source);
// Set up result collection
CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(stream);
// Cancel after short delay
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
cancelled.set(true);
}
}, 1000, 100);
env.execute("Cancellation Test");
// Validate that source was properly cancelled
Collection<String> results = resultFuture.get();
assertTrue("Source should have been cancelled", cancelled.get());
}