Comprehensive testing utilities for Apache Flink stream and batch processing applications
—
Finite test sources for controlled data emission with checkpoint coordination and sample datasets for common algorithms like PageRank, K-means, and Connected Components. These utilities provide deterministic data sources for reliable testing scenarios.
Source function that emits a finite set of elements with coordinated checkpointing and configurable exit conditions, designed for deterministic testing scenarios.
public class FiniteTestSource<T> implements SourceFunction<T> {
public FiniteTestSource(T... elements);
public FiniteTestSource(Iterable<T> elements);
public FiniteTestSource(BooleanSupplier couldExit, Iterable<T> elements);
public FiniteTestSource(BooleanSupplier couldExit, long waitTimeOut, Iterable<T> elements);
}import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Create source with varargs
FiniteTestSource<Integer> numberSource = new FiniteTestSource<>(1, 2, 3, 4, 5);
// Create source with collection
List<String> words = Arrays.asList("hello", "world", "flink");
FiniteTestSource<String> wordSource = new FiniteTestSource<>(words);
// Use in streaming job
StreamExecutionEnvironment env = getTestEnvironment();
env.addSource(numberSource)
.map(x -> x * 2)
.print();import java.util.function.BooleanSupplier;
// Create source with custom exit condition
BooleanSupplier exitCondition = () -> System.currentTimeMillis() > startTime + 5000;
List<String> data = Arrays.asList("a", "b", "c", "d", "e");
FiniteTestSource<String> conditionalSource =
new FiniteTestSource<>(exitCondition, data);
// Create source with timeout
FiniteTestSource<String> timedSource =
new FiniteTestSource<>(exitCondition, 10000L, data);Utilities for collecting and managing test results from streaming jobs with thread-safe collection mechanisms.
public class TestListResultSink<T> extends RichSinkFunction<T> {
// Sink that collects results in a thread-safe list
}
public class TestListWrapper<T> {
// Wrapper for managing test result collections
}import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
// Create result sink
TestListResultSink<String> resultSink = new TestListResultSink<>();
// Use in streaming job
env.fromElements("a", "b", "c")
.map(String::toUpperCase)
.addSink(resultSink);
env.execute("Test Job");
// Retrieve results
List<String> results = resultSink.getResult();
assertEquals(Arrays.asList("A", "B", "C"), results);Predefined datasets for testing common graph and machine learning algorithms, providing both input data and expected results for validation.
Sample text data and expected word count results for testing text processing algorithms.
public class WordCountData {
public static final String TEXT;
public static final String EXPECTED_RESULT;
}import org.apache.flink.test.testdata.WordCountData;
// Use predefined text data
env.fromElements(WordCountData.TEXT.split("\\s+"))
.flatMap(new WordCountMapper())
.keyBy(0)
.sum(1)
.print();
// Verify against expected results
String actualResult = collectJobOutput();
TestBaseUtils.compareResultsByLinesInMemory(
WordCountData.EXPECTED_RESULT, actualResult);Graph data for testing connected components algorithms with vertices, edges, and expected component assignments.
public class ConnectedComponentsData {
public static final String VERTEX_DATA;
public static final String EDGE_DATA;
public static final String EXPECTED_RESULT;
}import org.apache.flink.test.testdata.ConnectedComponentsData;
// Create graph from test data
DataSet<Vertex> vertices = env.fromCollection(parseVertices(ConnectedComponentsData.VERTEX_DATA));
DataSet<Edge> edges = env.fromCollection(parseEdges(ConnectedComponentsData.EDGE_DATA));
// Run connected components algorithm
DataSet<Vertex> result = vertices.runOperation(new ConnectedComponents<>(maxIterations))
.withBroadcastSet(edges, "edges");
// Verify results
compareWithExpected(result, ConnectedComponentsData.EXPECTED_RESULT);Sample points and expected cluster assignments for testing K-means clustering implementations.
public class KMeansData {
public static final String DATAPOINTS;
public static final String INITIAL_CENTROIDS;
public static final String EXPECTED_RESULT;
}Web graph data with vertices, edges, and expected PageRank scores for testing PageRank algorithm implementations.
public class PageRankData {
public static final String VERTICES;
public static final String EDGES;
public static final String EXPECTED_RESULT;
}Graph data for testing triangle enumeration algorithms in social network analysis.
public class EnumTriangleData {
public static final String EDGES;
public static final String EXPECTED_RESULT;
}Graph data for testing transitive closure algorithms with expected reachability results.
public class TransitiveClosureData {
public static final String EDGES;
public static final String EXPECTED_RESULT;
}Sample web server log data and expected analysis results for testing log processing applications.
public class WebLogAnalysisData {
public static final String LOG_DATA;
public static final String EXPECTED_RESULT;
}Using finite sources for predictable test execution with guaranteed completion.
@Test
void testStreamingTransformation() throws Exception {
// Create deterministic source
List<Integer> inputData = Arrays.asList(1, 2, 3, 4, 5);
FiniteTestSource<Integer> source = new FiniteTestSource<>(inputData);
// Collect results
TestListResultSink<Integer> sink = new TestListResultSink<>();
// Build and execute job
env.addSource(source)
.map(x -> x * x)
.addSink(sink);
env.execute("Square Numbers");
// Verify results
List<Integer> expected = Arrays.asList(1, 4, 9, 16, 25);
assertEquals(expected, sink.getResult());
}Using predefined datasets to validate algorithm implementations.
@Test
void testPageRankAlgorithm() throws Exception {
// Use predefined PageRank test data
DataSet<Vertex> vertices = parseVertices(PageRankData.VERTICES);
DataSet<Edge> edges = parseEdges(PageRankData.EDGES);
// Run PageRank algorithm
DataSet<Vertex> result = runPageRank(vertices, edges, 10);
// Verify against expected results
List<String> actualResults = result.collect();
TestBaseUtils.compareResultAsText(actualResults, PageRankData.EXPECTED_RESULT);
}Using sources with timeout conditions for testing time-sensitive scenarios.
@Test
void testWithTimeout() throws Exception {
long timeoutMs = 5000L;
BooleanSupplier timeoutCondition = () ->
System.currentTimeMillis() > startTime + timeoutMs;
List<String> data = generateLargeDataset();
FiniteTestSource<String> source =
new FiniteTestSource<>(timeoutCondition, timeoutMs, data);
// Job should complete within timeout
env.addSource(source)
.map(String::toUpperCase)
.print();
JobExecutionResult result = env.execute("Timeout Test");
assertTrue(result.getNetRuntime() < timeoutMs);
}Using finite sources with checkpoint coordination for testing fault tolerance.
@Test
void testCheckpointingWithFiniteSource() throws Exception {
// Enable checkpointing
env.enableCheckpointing(1000);
// Create source that coordinates with checkpoints
FiniteTestSource<Long> source = new FiniteTestSource<>(1L, 2L, 3L, 4L, 5L);
env.addSource(source)
.keyBy(x -> x % 2)
.map(new StatefulMapper())
.print();
env.execute("Checkpoint Test");
// Verify checkpoints were created
// Implementation depends on your checkpoint verification strategy
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-test-utils