0
# Test Data Providers
1
2
Pre-built test datasets for common algorithms and use cases, providing consistent test data for PageRank, KMeans, WordCount, and other standard Flink examples with validation utilities.
3
4
## Capabilities
5
6
### Word Count Data
7
8
`WordCountData` provides test data for word counting algorithms with expected results.
9
10
```java { .api }
11
/**
12
* Test data for WordCount programs
13
*/
14
public class WordCountData {
15
/**
16
* Goethe Faust text for word counting
17
*/
18
public static final String TEXT;
19
20
/**
21
* Expected word counts
22
*/
23
public static final String COUNTS;
24
25
/**
26
* Expected streaming word count tuples
27
*/
28
public static final String STREAMING_COUNTS_AS_TUPLES;
29
30
/**
31
* Expected word count tuples
32
*/
33
public static final String COUNTS_AS_TUPLES;
34
}
35
```
36
37
**Usage Example:**
38
39
```java
40
import org.apache.flink.test.testdata.WordCountData;
41
import org.apache.flink.api.java.DataSet;
42
import org.apache.flink.api.java.ExecutionEnvironment;
43
44
@Test
45
public void testWordCount() throws Exception {
46
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
47
48
// Use test data
49
DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\\n"));
50
51
// Implement word count
52
DataSet<Tuple2<String, Integer>> counts = text
53
.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
54
for (String word : line.toLowerCase().split("\\W+")) {
55
if (word.length() > 0) {
56
out.collect(new Tuple2<>(word, 1));
57
}
58
}
59
})
60
.groupBy(0)
61
.sum(1);
62
63
List<Tuple2<String, Integer>> result = counts.collect();
64
65
// Validate against expected results
66
String expectedCounts = WordCountData.COUNTS;
67
// Compare result with expectedCounts...
68
}
69
```
70
71
### K-Means Data
72
73
`KMeansData` provides test datasets for K-means clustering algorithms with multiple dimensional variants.
74
75
```java { .api }
76
/**
77
* Test data for KMeans programs
78
*/
79
public class KMeansData {
80
// 3D Data Constants
81
/**
82
* 3D data points for clustering
83
*/
84
public static final String DATAPOINTS;
85
86
/**
87
* Initial cluster centers for 3D data
88
*/
89
public static final String INITIAL_CENTERS;
90
91
/**
92
* Centers after one iteration for 3D data
93
*/
94
public static final String CENTERS_AFTER_ONE_STEP;
95
96
// Additional iteration results available...
97
98
// 2D Data Constants
99
/**
100
* 2D data points for clustering
101
*/
102
public static final String DATAPOINTS_2D;
103
104
/**
105
* Initial 2D cluster centers
106
*/
107
public static final String INITIAL_CENTERS_2D;
108
109
// Additional 2D iteration results available...
110
111
/**
112
* Validates K-means results with delta tolerance
113
* @param expectedResult Expected result string
114
* @param result Actual result list
115
* @param maxDelta Maximum allowed delta for floating point comparison
116
*/
117
public static void checkResultsWithDelta(
118
String expectedResult,
119
List<String> result,
120
double maxDelta
121
) throws Exception;
122
}
123
```
124
125
**Usage Example:**
126
127
```java
128
import org.apache.flink.test.testdata.KMeansData;
129
import org.apache.flink.api.java.DataSet;
130
import org.apache.flink.api.java.ExecutionEnvironment;
131
132
@Test
133
public void testKMeans() throws Exception {
134
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
135
136
// Load test data points
137
DataSet<String> dataPoints = env.fromElements(KMeansData.DATAPOINTS.split("\\n"));
138
DataSet<String> initialCenters = env.fromElements(KMeansData.INITIAL_CENTERS.split("\\n"));
139
140
// Parse data points into Point objects
141
DataSet<Point> points = dataPoints.map(line -> {
142
String[] coords = line.split(" ");
143
return new Point(
144
Double.parseDouble(coords[0]),
145
Double.parseDouble(coords[1]),
146
Double.parseDouble(coords[2])
147
);
148
});
149
150
DataSet<Centroid> centroids = initialCenters.map(line -> {
151
String[] coords = line.split(" ");
152
return new Centroid(
153
Integer.parseInt(coords[0]),
154
Double.parseDouble(coords[1]),
155
Double.parseDouble(coords[2]),
156
Double.parseDouble(coords[3])
157
);
158
});
159
160
// Run one iteration of K-means
161
DataSet<Centroid> newCentroids = points
162
.map(new SelectNearestCenter()).withBroadcastSet(centroids, "centroids")
163
.groupBy(0)
164
.reduceGroup(new CentroidAccumulator());
165
166
List<String> result = newCentroids.map(c -> c.toString()).collect();
167
168
// Validate with expected results and delta tolerance
169
KMeansData.checkResultsWithDelta(
170
KMeansData.CENTERS_AFTER_ONE_STEP,
171
result,
172
0.01
173
);
174
}
175
```
176
177
### PageRank Data
178
179
`PageRankData` provides test data for PageRank algorithms with graph structures and expected rankings.
180
181
```java { .api }
182
/**
183
* Test data for PageRank programs
184
*/
185
public class PageRankData {
186
/**
187
* Number of vertices in test graph
188
*/
189
public static final int NUM_VERTICES = 5;
190
191
/**
192
* Vertex data for PageRank
193
*/
194
public static final String VERTICES;
195
196
/**
197
* Edge data for PageRank graph
198
*/
199
public static final String EDGES;
200
201
/**
202
* Expected ranks after 3 iterations
203
*/
204
public static final String RANKS_AFTER_3_ITERATIONS;
205
206
/**
207
* Expected ranks after convergence with epsilon 0.0001
208
*/
209
public static final String RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
210
}
211
```
212
213
**Usage Example:**
214
215
```java
216
import org.apache.flink.test.testdata.PageRankData;
217
import org.apache.flink.api.java.DataSet;
218
import org.apache.flink.api.java.ExecutionEnvironment;
219
220
@Test
221
public void testPageRank() throws Exception {
222
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
223
224
// Load vertices and edges
225
DataSet<String> verticesText = env.fromElements(PageRankData.VERTICES.split("\\n"));
226
DataSet<String> edgesText = env.fromElements(PageRankData.EDGES.split("\\n"));
227
228
// Parse into Vertex and Edge objects
229
DataSet<Tuple2<Long, Double>> vertices = verticesText.map(line -> {
230
String[] parts = line.split("\\s+");
231
return new Tuple2<>(Long.parseLong(parts[0]), Double.parseDouble(parts[1]));
232
});
233
234
DataSet<Tuple2<Long, Long>> edges = edgesText.map(line -> {
235
String[] parts = line.split("\\s+");
236
return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
237
});
238
239
// Run PageRank for 3 iterations
240
DataSet<Tuple2<Long, Double>> ranks = runPageRank(vertices, edges, 3);
241
242
List<String> result = ranks.map(r -> r.f0 + " " + r.f1).collect();
243
244
// Validate against expected results
245
String expected = PageRankData.RANKS_AFTER_3_ITERATIONS;
246
// Compare results...
247
}
248
```
249
250
### Connected Components Data
251
252
`ConnectedComponentsData` provides test data for connected components algorithms.
253
254
```java { .api }
255
/**
256
* Test data for Connected Components programs
257
*/
258
public class ConnectedComponentsData {
259
/**
260
* Generates enumerated vertices
261
* @param num Number of vertices to generate
262
* @return String with vertex data
263
*/
264
public static String getEnumeratingVertices(int num);
265
266
/**
267
* Generates odd/even connected edges
268
* @param numVertices Number of vertices
269
* @param numEdges Number of edges to generate
270
* @param seed Random seed for reproducible results
271
* @return String with edge data
272
*/
273
public static String getRandomOddEvenEdges(int numVertices, int numEdges, long seed);
274
275
/**
276
* Validates odd/even connected component results
277
* @param result BufferedReader with results
278
*/
279
public static void checkOddEvenResult(BufferedReader result) throws Exception;
280
281
/**
282
* Validates tuple-based connected component results
283
* @param result List of component tuples
284
*/
285
public static void checkOddEvenResult(List<Tuple2<Long, Long>> result) throws Exception;
286
}
287
```
288
289
**Usage Example:**
290
291
```java
292
import org.apache.flink.test.testdata.ConnectedComponentsData;
293
294
@Test
295
public void testConnectedComponents() throws Exception {
296
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
297
298
// Generate test data
299
String verticesData = ConnectedComponentsData.getEnumeratingVertices(10);
300
String edgesData = ConnectedComponentsData.getRandomOddEvenEdges(10, 15, 12345L);
301
302
DataSet<String> verticesText = env.fromElements(verticesData.split("\\n"));
303
DataSet<String> edgesText = env.fromElements(edgesData.split("\\n"));
304
305
// Parse vertices and edges
306
DataSet<Tuple2<Long, Long>> vertices = verticesText.map(line -> {
307
String[] parts = line.split("\\s+");
308
return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[0]));
309
});
310
311
DataSet<Tuple2<Long, Long>> edges = edgesText.map(line -> {
312
String[] parts = line.split("\\s+");
313
return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
314
});
315
316
// Run connected components algorithm
317
DataSet<Tuple2<Long, Long>> components = runConnectedComponents(vertices, edges);
318
319
List<Tuple2<Long, Long>> result = components.collect();
320
321
// Validate results
322
ConnectedComponentsData.checkOddEvenResult(result);
323
}
324
```
325
326
### Transitive Closure Data
327
328
`TransitiveClosureData` provides test data for transitive closure algorithms.
329
330
```java { .api }
331
/**
332
* Test data for Transitive Closure programs
333
*/
334
public class TransitiveClosureData {
335
/**
336
* Validates odd/even transitive closure results
337
* @param result BufferedReader with results
338
*/
339
public static void checkOddEvenResult(BufferedReader result) throws Exception;
340
}
341
```
342
343
### Web Log Analysis Data
344
345
`WebLogAnalysisData` provides test data for web log analysis programs.
346
347
```java { .api }
348
/**
349
* Test data for Web Log Analysis programs
350
*/
351
public class WebLogAnalysisData {
352
/**
353
* Document data for analysis
354
*/
355
public static final String DOCS;
356
357
/**
358
* Ranking data
359
*/
360
public static final String RANKS;
361
362
/**
363
* Visit log data
364
*/
365
public static final String VISITS;
366
367
/**
368
* Expected analysis results
369
*/
370
public static final String EXCEPTED_RESULT;
371
}
372
```
373
374
### Enum Triangle Data
375
376
`EnumTriangleData` provides test data for triangle enumeration algorithms.
377
378
```java { .api }
379
/**
380
* Test data for Enum Triangle programs
381
*/
382
public class EnumTriangleData {
383
/**
384
* Graph edges for triangle enumeration
385
*/
386
public static final String EDGES;
387
388
/**
389
* Expected triangles by ID
390
*/
391
public static final String TRIANGLES_BY_ID;
392
393
/**
394
* Expected triangles by degree
395
*/
396
public static final String TRIANGLES_BY_DEGREE;
397
}
398
```
399
400
## Advanced Data Provider Usage
401
402
### Custom Data Validation
403
404
```java
405
import org.apache.flink.test.util.TestBaseUtils;
406
407
@Test
408
public void testCustomDataValidation() throws Exception {
409
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
410
411
// Use multiple data providers
412
DataSet<String> wordCountInput = env.fromElements(WordCountData.TEXT.split("\\n"));
413
DataSet<String> kmeansInput = env.fromElements(KMeansData.DATAPOINTS.split("\\n"));
414
415
// Process data
416
DataSet<String> wordCounts = wordCountInput
417
.flatMap(new WordCountTokenizer())
418
.groupBy(0)
419
.sum(1)
420
.map(tuple -> tuple.f0 + " " + tuple.f1);
421
422
// Collect and validate using TestBaseUtils
423
List<String> results = wordCounts.collect();
424
425
// Use utility methods for validation
426
TestBaseUtils.compareResultAsText(results, WordCountData.COUNTS);
427
}
428
```
429
430
### Combining Multiple Test Datasets
431
432
```java
433
@Test
434
public void testMultipleDatasets() throws Exception {
435
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
436
437
// Combine different data sources
438
DataSet<String> combinedData = env.fromElements(
439
WordCountData.TEXT.split("\\n")
440
).union(
441
env.fromElements("Additional test data", "More test content")
442
);
443
444
// Process combined data
445
DataSet<Integer> wordLengths = combinedData
446
.flatMap((String line, Collector<String> out) -> {
447
for (String word : line.split("\\s+")) {
448
out.collect(word);
449
}
450
})
451
.map(word -> word.length());
452
453
List<Integer> results = wordLengths.collect();
454
assertFalse("Should have results", results.isEmpty());
455
}
456
```
457
458
### Testing with Expected Results Validation
459
460
```java
461
@Test
462
public void testWithExpectedResults() throws Exception {
463
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
464
465
// Use PageRank data
466
DataSet<String> vertices = env.fromElements(PageRankData.VERTICES.split("\\n"));
467
DataSet<String> edges = env.fromElements(PageRankData.EDGES.split("\\n"));
468
469
// Run algorithm and collect results
470
List<String> actualResults = runPageRankAlgorithm(vertices, edges, 3);
471
472
// Parse expected results for comparison
473
String[] expectedLines = PageRankData.RANKS_AFTER_3_ITERATIONS.split("\\n");
474
List<String> expectedResults = Arrays.asList(expectedLines);
475
476
// Use TestBaseUtils for comparison with tolerance
477
TestBaseUtils.compareResultCollections(
478
expectedResults,
479
actualResults,
480
new PageRankComparator(0.01) // Custom comparator with delta tolerance
481
);
482
}
483
```
484
485
### Performance Testing with Large Datasets
486
487
```java
488
@Test
489
public void testPerformanceWithLargeDataset() throws Exception {
490
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
491
env.setParallelism(4);
492
493
// Generate larger dataset based on KMeans data
494
List<String> largeDataset = new ArrayList<>();
495
String[] basePoints = KMeansData.DATAPOINTS.split("\\n");
496
497
// Replicate data points for performance testing
498
for (int i = 0; i < 1000; i++) {
499
for (String point : basePoints) {
500
largeDataset.add(point);
501
}
502
}
503
504
DataSet<String> largePointSet = env.fromCollection(largeDataset);
505
506
long startTime = System.currentTimeMillis();
507
508
// Process large dataset
509
List<String> results = largePointSet
510
.map(new PointProcessor())
511
.collect();
512
513
long endTime = System.currentTimeMillis();
514
long processingTime = endTime - startTime;
515
516
// Validate performance and results
517
assertFalse("Should have processed data", results.isEmpty());
518
assertTrue("Processing should complete within reasonable time",
519
processingTime < 30000); // 30 seconds
520
}
521
```