The test data package provides pre-built datasets and validation utilities for common Flink algorithms and testing scenarios. These datasets are commonly used in Flink examples and benchmarks.
Test data for PageRank algorithm implementations.
public class PageRankData {
public static final int NUM_VERTICES = 5;
public static final String VERTICES;
public static final String EDGES;
public static final String RANKS_AFTER_3_ITERATIONS;
public static final String RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
}Usage Example:
@Test
public void testPageRank() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Use provided test data
DataSet<String> vertices = env.fromElements(PageRankData.VERTICES.split("\n"));
DataSet<String> edges = env.fromElements(PageRankData.EDGES.split("\n"));
// Run PageRank algorithm
// ... PageRank implementation
// Validate against expected results
TestBaseUtils.compareResultsByLinesInMemory(
PageRankData.RANKS_AFTER_3_ITERATIONS,
resultPath
);
}Data Format:
vertexId (5 vertices total)sourceVertexId targetVertexIdvertexId pageRankValueTest data for WordCount implementations using German text from Goethe's Faust tragedy.
public class WordCountData {
public static final String TEXT;
public static final String COUNTS;
public static final String STREAMING_COUNTS_AS_TUPLES;
public static final String COUNTS_AS_TUPLES;
}Usage Example:
@Test
public void testWordCount() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Use provided German text
DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new Tokenizer())
.groupBy(0)
.sum(1);
List<Tuple2<String, Integer>> result = wordCounts.collect();
// Compare with expected counts
TestBaseUtils.compareResultAsTuples(result, WordCountData.COUNTS_AS_TUPLES);
}Data Content:
word count format(word,count) tuplesTest data for K-Means clustering algorithm with both 2D and 3D datasets.
public class KMeansData {
// 3D clustering data
public static final String DATAPOINTS;
public static final String INITIAL_CENTERS;
public static final String CENTERS_AFTER_ONE_STEP;
public static final String CENTERS_AFTER_ONE_STEP_SINGLE_DIGIT;
public static final String CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT;
public static final String CENTERS_AFTER_20_ITERATIONS_DOUBLE_DIGIT;
// 2D clustering data
public static final String DATAPOINTS_2D;
public static final String INITIAL_CENTERS_2D;
public static final String CENTERS_2D_AFTER_SINGLE_ITERATION_DOUBLE_DIGIT;
public static final String CENTERS_2D_AFTER_20_ITERATIONS_DOUBLE_DIGIT;
// Validation utility
public static void checkResultsWithDelta(String expectedResults, List<String> resultLines, double maxDelta);
}Usage Example:
@Test
public void testKMeans3D() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Use 3D test data
DataSet<String> points = env.fromElements(KMeansData.DATAPOINTS.split("\n"));
DataSet<String> centers = env.fromElements(KMeansData.INITIAL_CENTERS.split("\n"));
// Run K-Means algorithm for 20 iterations
// ... K-Means implementation
List<String> finalCenters = resultCenters.collect();
// Validate with delta tolerance for floating point comparison
KMeansData.checkResultsWithDelta(
KMeansData.CENTERS_AFTER_20_ITERATIONS_DOUBLE_DIGIT,
finalCenters,
0.01
);
}
@Test
public void testKMeans2D() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Use 2D test data
DataSet<String> points = env.fromElements(KMeansData.DATAPOINTS_2D.split("\n"));
DataSet<String> centers = env.fromElements(KMeansData.INITIAL_CENTERS_2D.split("\n"));
// ... K-Means implementation
List<String> result = resultCenters.collect();
KMeansData.checkResultsWithDelta(
KMeansData.CENTERS_2D_AFTER_20_ITERATIONS_DOUBLE_DIGIT,
result,
0.01
);
}Data Format:
pointId x y z (100 data points)pointId x ycenterId x y z (3D) or centerId x y (2D)Test data and validation utilities for Connected Components algorithm.
public class ConnectedComponentsData {
// Generate test vertices
public static String getEnumeratingVertices(int num);
// Generate random edges with odd/even pattern
public static String getRandomOddEvenEdges(int numEdges, int numVertices, long seed);
// Validate connected components results
public static void checkOddEvenResult(BufferedReader result) throws IOException;
public static void checkOddEvenResult(List<Tuple2<Long, Long>> lines) throws IOException;
}Usage Example:
@Test
public void testConnectedComponents() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Generate test data
String vertices = ConnectedComponentsData.getEnumeratingVertices(100);
String edges = ConnectedComponentsData.getRandomOddEvenEdges(150, 100, 12345L);
DataSet<String> vertexData = env.fromElements(vertices.split("\n"));
DataSet<String> edgeData = env.fromElements(edges.split("\n"));
// Run Connected Components algorithm
// ... implementation
List<Tuple2<Long, Long>> components = result.collect();
// Validate odd/even component structure
ConnectedComponentsData.checkOddEvenResult(components);
}Test data for Transitive Closure algorithm with validation utilities.
public class TransitiveClosureData {
// Validate transitive closure results
public static void checkOddEvenResult(BufferedReader result) throws IOException;
}Usage Example:
@Test
public void testTransitiveClosure() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create test edges with odd/even pattern
DataSet<Tuple2<Long, Long>> edges = env.fromElements(
new Tuple2<>(1L, 2L),
new Tuple2<>(2L, 3L),
new Tuple2<>(3L, 4L)
);
// Run Transitive Closure algorithm
// ... implementation
// Write results to file
result.writeAsText(outputPath);
env.execute();
// Validate closure properties
BufferedReader reader = new BufferedReader(new FileReader(outputPath));
TransitiveClosureData.checkOddEvenResult(reader);
reader.close();
}Test data for triangle enumeration in graphs.
public class EnumTriangleData {
public static final String EDGES;
public static final String TRIANGLES_BY_ID;
public static final String TRIANGLES_BY_DEGREE;
}Usage Example:
@Test
public void testTriangleEnumeration() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Use provided edge data
DataSet<String> edges = env.fromElements(EnumTriangleData.EDGES.split("\n"));
// Run triangle enumeration algorithm
// ... implementation
List<String> triangles = result.collect();
// Compare with expected triangles (sorted by ID)
TestBaseUtils.compareResultsByLinesInMemory(
EnumTriangleData.TRIANGLES_BY_ID,
triangles
);
}Data Format:
vertexId1 vertexId2 representing undirected edgesTest data for web log analysis and web graph algorithms.
public class WebLogAnalysisData {
public static final String DOCS;
public static final String RANKS;
public static final String VISITS;
public static final String EXCEPTED_RESULT;
}Usage Example:
@Test
public void testWebLogAnalysis() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Load web data
DataSet<String> documents = env.fromElements(WebLogAnalysisData.DOCS.split("\n"));
DataSet<String> pageRanks = env.fromElements(WebLogAnalysisData.RANKS.split("\n"));
DataSet<String> visits = env.fromElements(WebLogAnalysisData.VISITS.split("\n"));
// Run web log analysis
// ... implementation combining docs, ranks, and visits
List<String> analysis = result.collect();
// Validate analysis results
TestBaseUtils.compareResultsByLinesInMemory(
WebLogAnalysisData.EXCEPTED_RESULT,
analysis
);
}Data Format:
url|content - Web documents with URL and contenturl rank - Page rank values for URLsurl visitCount - Visit statistics for URLs// Split multi-line data into DataSet
DataSet<String> dataSet = env.fromElements(TestData.SAMPLE_DATA.split("\n"));
// Parse structured data
DataSet<Tuple2<String, Integer>> tuples = dataSet
.map(line -> {
String[] parts = line.split(",");
return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));
});// For floating-point comparisons
KMeansData.checkResultsWithDelta(expectedResults, actualResults, 0.001);
// For key-value pairs with tolerance
TestBaseUtils.compareKeyValuePairsWithDelta(expected, resultPath, ",", 0.01);// Implement custom validation for specific algorithms
List<String> results = algorithm.collect();
for (String result : results) {
// Custom validation logic
assertTrue("Result format validation", result.matches("\\d+,\\d+\\.\\d+"));
}// Use ConnectedComponentsData for reproducible random data
String randomEdges = ConnectedComponentsData.getRandomOddEvenEdges(1000, 500, 42L);
DataSet<String> edges = env.fromElements(randomEdges.split("\n"));These test data classes integrate seamlessly with all test base classes:
public class AlgorithmTest extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Use any test data class
DataSet<String> input = env.fromElements(WordCountData.TEXT.split("\\s+"));
// Run algorithm and validate
List<String> result = processInput(input).collect();
TestBaseUtils.compareResultAsText(result, expectedOutput);
}
}