Pre-built test datasets for common algorithms and use cases, providing consistent test data for PageRank, KMeans, WordCount, and other standard Flink examples with validation utilities.
WordCountData provides test data for word counting algorithms with expected results.
/**
* Test data for WordCount programs
*/
public class WordCountData {
/**
* Goethe Faust text for word counting
*/
public static final String TEXT;
/**
* Expected word counts
*/
public static final String COUNTS;
/**
* Expected streaming word count tuples
*/
public static final String STREAMING_COUNTS_AS_TUPLES;
/**
* Expected word count tuples
*/
public static final String COUNTS_AS_TUPLES;
}Usage Example:
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@Test
public void testWordCount() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Use test data
DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\\n"));
// Implement word count
DataSet<Tuple2<String, Integer>> counts = text
.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
for (String word : line.toLowerCase().split("\\W+")) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
})
.groupBy(0)
.sum(1);
List<Tuple2<String, Integer>> result = counts.collect();
// Validate against expected results
String expectedCounts = WordCountData.COUNTS;
// Compare result with expectedCounts...
}KMeansData provides test datasets for K-means clustering algorithms with multiple dimensional variants.
/**
* Test data for KMeans programs
*/
public class KMeansData {
// 3D Data Constants
/**
* 3D data points for clustering
*/
public static final String DATAPOINTS;
/**
* Initial cluster centers for 3D data
*/
public static final String INITIAL_CENTERS;
/**
* Centers after one iteration for 3D data
*/
public static final String CENTERS_AFTER_ONE_STEP;
// Additional iteration results available...
// 2D Data Constants
/**
* 2D data points for clustering
*/
public static final String DATAPOINTS_2D;
/**
* Initial 2D cluster centers
*/
public static final String INITIAL_CENTERS_2D;
// Additional 2D iteration results available...
/**
* Validates K-means results with delta tolerance
* @param expectedResult Expected result string
* @param result Actual result list
* @param maxDelta Maximum allowed delta for floating point comparison
*/
public static void checkResultsWithDelta(
String expectedResult,
List<String> result,
double maxDelta
) throws Exception;
}Usage Example:
import org.apache.flink.test.testdata.KMeansData;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@Test
public void testKMeans() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Load test data points
DataSet<String> dataPoints = env.fromElements(KMeansData.DATAPOINTS.split("\\n"));
DataSet<String> initialCenters = env.fromElements(KMeansData.INITIAL_CENTERS.split("\\n"));
// Parse data points into Point objects
DataSet<Point> points = dataPoints.map(line -> {
String[] coords = line.split(" ");
return new Point(
Double.parseDouble(coords[0]),
Double.parseDouble(coords[1]),
Double.parseDouble(coords[2])
);
});
DataSet<Centroid> centroids = initialCenters.map(line -> {
String[] coords = line.split(" ");
return new Centroid(
Integer.parseInt(coords[0]),
Double.parseDouble(coords[1]),
Double.parseDouble(coords[2]),
Double.parseDouble(coords[3])
);
});
// Run one iteration of K-means
DataSet<Centroid> newCentroids = points
.map(new SelectNearestCenter()).withBroadcastSet(centroids, "centroids")
.groupBy(0)
.reduceGroup(new CentroidAccumulator());
List<String> result = newCentroids.map(c -> c.toString()).collect();
// Validate with expected results and delta tolerance
KMeansData.checkResultsWithDelta(
KMeansData.CENTERS_AFTER_ONE_STEP,
result,
0.01
);
}PageRankData provides test data for PageRank algorithms with graph structures and expected rankings.
/**
* Test data for PageRank programs
*/
public class PageRankData {
/**
* Number of vertices in test graph
*/
public static final int NUM_VERTICES = 5;
/**
* Vertex data for PageRank
*/
public static final String VERTICES;
/**
* Edge data for PageRank graph
*/
public static final String EDGES;
/**
* Expected ranks after 3 iterations
*/
public static final String RANKS_AFTER_3_ITERATIONS;
/**
* Expected ranks after convergence with epsilon 0.0001
*/
public static final String RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
}Usage Example:
import org.apache.flink.test.testdata.PageRankData;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@Test
public void testPageRank() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Load vertices and edges
DataSet<String> verticesText = env.fromElements(PageRankData.VERTICES.split("\\n"));
DataSet<String> edgesText = env.fromElements(PageRankData.EDGES.split("\\n"));
// Parse into Vertex and Edge objects
DataSet<Tuple2<Long, Double>> vertices = verticesText.map(line -> {
String[] parts = line.split("\\s+");
return new Tuple2<>(Long.parseLong(parts[0]), Double.parseDouble(parts[1]));
});
DataSet<Tuple2<Long, Long>> edges = edgesText.map(line -> {
String[] parts = line.split("\\s+");
return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
});
// Run PageRank for 3 iterations
DataSet<Tuple2<Long, Double>> ranks = runPageRank(vertices, edges, 3);
List<String> result = ranks.map(r -> r.f0 + " " + r.f1).collect();
// Validate against expected results
String expected = PageRankData.RANKS_AFTER_3_ITERATIONS;
// Compare results...
}ConnectedComponentsData provides test data for connected components algorithms.
/**
* Test data for Connected Components programs
*/
public class ConnectedComponentsData {
/**
* Generates enumerated vertices
* @param num Number of vertices to generate
* @return String with vertex data
*/
public static String getEnumeratingVertices(int num);
/**
* Generates odd/even connected edges
* @param numVertices Number of vertices
* @param numEdges Number of edges to generate
* @param seed Random seed for reproducible results
* @return String with edge data
*/
public static String getRandomOddEvenEdges(int numVertices, int numEdges, long seed);
/**
* Validates odd/even connected component results
* @param result BufferedReader with results
*/
public static void checkOddEvenResult(BufferedReader result) throws Exception;
/**
* Validates tuple-based connected component results
* @param result List of component tuples
*/
public static void checkOddEvenResult(List<Tuple2<Long, Long>> result) throws Exception;
}Usage Example:
import org.apache.flink.test.testdata.ConnectedComponentsData;
@Test
public void testConnectedComponents() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Generate test data
String verticesData = ConnectedComponentsData.getEnumeratingVertices(10);
String edgesData = ConnectedComponentsData.getRandomOddEvenEdges(10, 15, 12345L);
DataSet<String> verticesText = env.fromElements(verticesData.split("\\n"));
DataSet<String> edgesText = env.fromElements(edgesData.split("\\n"));
// Parse vertices and edges
DataSet<Tuple2<Long, Long>> vertices = verticesText.map(line -> {
String[] parts = line.split("\\s+");
return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[0]));
});
DataSet<Tuple2<Long, Long>> edges = edgesText.map(line -> {
String[] parts = line.split("\\s+");
return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
});
// Run connected components algorithm
DataSet<Tuple2<Long, Long>> components = runConnectedComponents(vertices, edges);
List<Tuple2<Long, Long>> result = components.collect();
// Validate results
ConnectedComponentsData.checkOddEvenResult(result);
}TransitiveClosureData provides test data for transitive closure algorithms.
/**
* Test data for Transitive Closure programs
*/
public class TransitiveClosureData {
/**
* Validates odd/even transitive closure results
* @param result BufferedReader with results
*/
public static void checkOddEvenResult(BufferedReader result) throws Exception;
}WebLogAnalysisData provides test data for web log analysis programs.
/**
* Test data for Web Log Analysis programs
*/
public class WebLogAnalysisData {
/**
* Document data for analysis
*/
public static final String DOCS;
/**
* Ranking data
*/
public static final String RANKS;
/**
* Visit log data
*/
public static final String VISITS;
/**
* Expected analysis results
*/
public static final String EXCEPTED_RESULT;
}EnumTriangleData provides test data for triangle enumeration algorithms.
/**
* Test data for Enum Triangle programs
*/
public class EnumTriangleData {
/**
* Graph edges for triangle enumeration
*/
public static final String EDGES;
/**
* Expected triangles by ID
*/
public static final String TRIANGLES_BY_ID;
/**
* Expected triangles by degree
*/
public static final String TRIANGLES_BY_DEGREE;
}import org.apache.flink.test.util.TestBaseUtils;
@Test
public void testCustomDataValidation() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Use multiple data providers
DataSet<String> wordCountInput = env.fromElements(WordCountData.TEXT.split("\\n"));
DataSet<String> kmeansInput = env.fromElements(KMeansData.DATAPOINTS.split("\\n"));
// Process data
DataSet<String> wordCounts = wordCountInput
.flatMap(new WordCountTokenizer())
.groupBy(0)
.sum(1)
.map(tuple -> tuple.f0 + " " + tuple.f1);
// Collect and validate using TestBaseUtils
List<String> results = wordCounts.collect();
// Use utility methods for validation
TestBaseUtils.compareResultAsText(results, WordCountData.COUNTS);
}@Test
public void testMultipleDatasets() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Combine different data sources
DataSet<String> combinedData = env.fromElements(
WordCountData.TEXT.split("\\n")
).union(
env.fromElements("Additional test data", "More test content")
);
// Process combined data
DataSet<Integer> wordLengths = combinedData
.flatMap((String line, Collector<String> out) -> {
for (String word : line.split("\\s+")) {
out.collect(word);
}
})
.map(word -> word.length());
List<Integer> results = wordLengths.collect();
assertFalse("Should have results", results.isEmpty());
}@Test
public void testWithExpectedResults() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Use PageRank data
DataSet<String> vertices = env.fromElements(PageRankData.VERTICES.split("\\n"));
DataSet<String> edges = env.fromElements(PageRankData.EDGES.split("\\n"));
// Run algorithm and collect results
List<String> actualResults = runPageRankAlgorithm(vertices, edges, 3);
// Parse expected results for comparison
String[] expectedLines = PageRankData.RANKS_AFTER_3_ITERATIONS.split("\\n");
List<String> expectedResults = Arrays.asList(expectedLines);
// Use TestBaseUtils for comparison with tolerance
TestBaseUtils.compareResultCollections(
expectedResults,
actualResults,
new PageRankComparator(0.01) // Custom comparator with delta tolerance
);
}@Test
public void testPerformanceWithLargeDataset() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// Generate larger dataset based on KMeans data
List<String> largeDataset = new ArrayList<>();
String[] basePoints = KMeansData.DATAPOINTS.split("\\n");
// Replicate data points for performance testing
for (int i = 0; i < 1000; i++) {
for (String point : basePoints) {
largeDataset.add(point);
}
}
DataSet<String> largePointSet = env.fromCollection(largeDataset);
long startTime = System.currentTimeMillis();
// Process large dataset
List<String> results = largePointSet
.map(new PointProcessor())
.collect();
long endTime = System.currentTimeMillis();
long processingTime = endTime - startTime;
// Validate performance and results
assertFalse("Should have processed data", results.isEmpty());
assertTrue("Processing should complete within reasonable time",
processingTime < 30000); // 30 seconds
}