Test utilities library for Apache Flink applications providing streaming environments, test data sources, result collection mechanisms, and metrics testing utilities.
Tools for creating controlled test data sources and collecting streaming results for validation in tests, including finite test sources with checkpoint synchronization and result collection utilities.
FiniteTestSource provides a controlled test source that emits elements in cycles with checkpoint synchronization, ideal for testing streaming applications with deterministic behavior.
/**
* Test source that emits elements in cycles with checkpoint synchronization
* @param <T> Type of elements to emit
*/
public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
/**
* Create source with vararg elements
* @param elements Elements to emit
*/
public FiniteTestSource(T... elements);
/**
* Create source with iterable elements
* @param elements Elements to emit
*/
public FiniteTestSource(Iterable<T> elements);
/**
* Create source with exit condition and timeout
* @param exitCondition Supplier that returns true when source should stop
* @param timeoutMs Timeout in milliseconds
* @param elements Elements to emit
*/
public FiniteTestSource(BooleanSupplier exitCondition, long timeoutMs, Iterable<T> elements);
/**
* Create source with exit condition
* @param exitCondition Supplier that returns true when source should stop
* @param elements Elements to emit
*/
public FiniteTestSource(BooleanSupplier exitCondition, Iterable<T> elements);
/**
* Main source execution method
* @param ctx Source context for emitting elements
*/
public void run(SourceContext<T> ctx) throws Exception;
/**
* Cancels the source
*/
public void cancel();
/**
* Checkpoint completion notification
* @param checkpointId ID of completed checkpoint
*/
public void notifyCheckpointComplete(long checkpointId) throws Exception;
/**
* Checkpoint abortion notification
* @param checkpointId ID of aborted checkpoint
*/
public void notifyCheckpointAborted(long checkpointId) throws Exception;
}Usage Examples:
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@Test
public void testFiniteSource() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Simple finite source with vararg elements
DataStream<String> stream1 = env.addSource(
new FiniteTestSource<>("hello", "world", "flink")
);
// Source with collection of elements
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
DataStream<Integer> stream2 = env.addSource(
new FiniteTestSource<>(numbers)
);
// Source with exit condition - stops after 10 seconds or when condition is met
AtomicBoolean shouldStop = new AtomicBoolean(false);
DataStream<String> stream3 = env.addSource(
new FiniteTestSource<>(
() -> shouldStop.get(),
10000L,
Arrays.asList("data1", "data2", "data3")
)
);
// Process streams
stream1.print("Stream1");
stream2.map(x -> x * 2).print("Stream2");
stream3.print("Stream3");
// In another thread, signal to stop after some processing
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
shouldStop.set(true);
}
}, 5000);
env.execute("Finite Source Test");
}StreamCollector provides JUnit integration for collecting all elements from a DataStream for testing and validation.
/**
* JUnit rule for collecting all elements from a DataStream for testing
*/
public class StreamCollector extends ExternalResource {
/**
* Collects all elements from the stream
* @param stream DataStream to collect from
* @return CompletableFuture that completes with collected elements
*/
public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream);
}Usage Example:
import org.apache.flink.streaming.util.StreamCollector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Rule;
public class StreamCollectionTest {
@Rule
public StreamCollector streamCollector = new StreamCollector();
@Test
public void testStreamCollection() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create test stream
DataStream<String> input = env.fromElements("apple", "banana", "cherry");
DataStream<String> processed = input.map(String::toUpperCase);
// Collect results
CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);
// Execute the job
env.execute("Stream Collection Test");
// Get results and validate
Collection<String> results = resultFuture.get(10, TimeUnit.SECONDS);
assertEquals(3, results.size());
assertTrue(results.contains("APPLE"));
assertTrue(results.contains("BANANA"));
assertTrue(results.contains("CHERRY"));
}
@Test
public void testStreamWithFiltering() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create test stream with filtering
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
DataStream<Integer> evenNumbers = numbers.filter(x -> x % 2 == 0);
// Collect filtered results
CompletableFuture<Collection<Integer>> resultFuture = streamCollector.collect(evenNumbers);
env.execute("Filtering Test");
// Validate filtered results
Collection<Integer> results = resultFuture.get();
assertEquals(5, results.size());
assertTrue(results.contains(2));
assertTrue(results.contains(4));
assertTrue(results.contains(6));
assertTrue(results.contains(8));
assertTrue(results.contains(10));
}
}@Test
public void testMultipleSources() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create multiple finite sources
DataStream<String> source1 = env.addSource(
new FiniteTestSource<>("a1", "a2", "a3")
);
DataStream<String> source2 = env.addSource(
new FiniteTestSource<>("b1", "b2", "b3")
);
// Union the sources
DataStream<String> combined = source1.union(source2);
DataStream<String> processed = combined.map(s -> "processed-" + s);
// Collect results
CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);
env.execute("Multiple Sources Test");
// Validate combined results
Collection<String> results = resultFuture.get();
assertEquals(6, results.size());
assertTrue(results.contains("processed-a1"));
assertTrue(results.contains("processed-b1"));
}import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
@Test
public void testWindowedStream() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create source with timestamped data
DataStream<Tuple2<String, Integer>> input = env.addSource(
new FiniteTestSource<>(
Tuple2.of("key1", 1),
Tuple2.of("key1", 2),
Tuple2.of("key2", 3),
Tuple2.of("key1", 4),
Tuple2.of("key2", 5)
)
);
// Apply windowing and aggregation
DataStream<Tuple2<String, Integer>> windowed = input
.keyBy(t -> t.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.sum(1);
// Collect windowed results
CompletableFuture<Collection<Tuple2<String, Integer>>> resultFuture =
streamCollector.collect(windowed);
env.execute("Windowed Stream Test");
// Validate aggregated results
Collection<Tuple2<String, Integer>> results = resultFuture.get();
assertFalse(results.isEmpty());
// Check that aggregation occurred
boolean foundKey1Sum = results.stream()
.anyMatch(t -> "key1".equals(t.f0) && t.f1 > 1);
assertTrue("Expected aggregated key1 values", foundKey1Sum);
}@Test
public void testSourceWithCheckpointing() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing
env.enableCheckpointing(100);
// Create source that will emit elements and handle checkpoints
List<String> elements = Arrays.asList("checkpoint1", "checkpoint2", "checkpoint3");
AtomicInteger checkpointCount = new AtomicInteger(0);
FiniteTestSource<String> source = new FiniteTestSource<String>(elements) {
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
checkpointCount.incrementAndGet();
}
};
DataStream<String> stream = env.addSource(source);
DataStream<String> processed = stream.map(s -> "checkpoint-processed-" + s);
// Collect results
CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);
env.execute("Checkpointing Test");
// Validate results and checkpointing
Collection<String> results = resultFuture.get();
assertEquals(3, results.size());
assertTrue("Checkpoints should have been triggered", checkpointCount.get() > 0);
}@Test
public void testSourceCancellation() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create source with exit condition for controlled cancellation
AtomicBoolean cancelled = new AtomicBoolean(false);
List<String> elements = Arrays.asList("cancel1", "cancel2", "cancel3");
FiniteTestSource<String> source = new FiniteTestSource<String>(
() -> cancelled.get(),
5000L, // 5 second timeout
elements
) {
@Override
public void cancel() {
super.cancel();
cancelled.set(true);
}
};
DataStream<String> stream = env.addSource(source);
// Set up result collection
CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(stream);
// Cancel after short delay
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
cancelled.set(true);
}
}, 1000, 100);
env.execute("Cancellation Test");
// Validate that source was properly cancelled
Collection<String> results = resultFuture.get();
assertTrue("Source should have been cancelled", cancelled.get());
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-test-utils-2-12