0
# Test Data Sources
1
2
Finite test sources for controlled data emission with checkpoint coordination and sample datasets for common algorithms like PageRank, K-means, and Connected Components. These utilities provide deterministic data sources for reliable testing scenarios.
3
4
## Capabilities
5
6
### Finite Test Source
7
8
Source function that emits a finite set of elements with coordinated checkpointing and configurable exit conditions, designed for deterministic testing scenarios.
9
10
```java { .api }
11
public class FiniteTestSource<T> implements SourceFunction<T> {
12
public FiniteTestSource(T... elements);
13
public FiniteTestSource(Iterable<T> elements);
14
public FiniteTestSource(BooleanSupplier couldExit, Iterable<T> elements);
15
public FiniteTestSource(BooleanSupplier couldExit, long waitTimeOut, Iterable<T> elements);
16
}
17
```
18
19
#### Basic Usage
20
21
```java
22
import org.apache.flink.streaming.util.FiniteTestSource;
23
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
24
25
// Create source with varargs
26
FiniteTestSource<Integer> numberSource = new FiniteTestSource<>(1, 2, 3, 4, 5);
27
28
// Create source with collection
29
List<String> words = Arrays.asList("hello", "world", "flink");
30
FiniteTestSource<String> wordSource = new FiniteTestSource<>(words);
31
32
// Use in streaming job
33
StreamExecutionEnvironment env = getTestEnvironment();
34
env.addSource(numberSource)
35
.map(x -> x * 2)
36
.print();
37
```
38
39
#### Advanced Usage with Exit Conditions
40
41
```java
42
import java.util.function.BooleanSupplier;
43
44
// Create source with custom exit condition
45
BooleanSupplier exitCondition = () -> System.currentTimeMillis() > startTime + 5000;
46
List<String> data = Arrays.asList("a", "b", "c", "d", "e");
47
48
FiniteTestSource<String> conditionalSource =
49
new FiniteTestSource<>(exitCondition, data);
50
51
// Create source with timeout
52
FiniteTestSource<String> timedSource =
53
new FiniteTestSource<>(exitCondition, 10000L, data);
54
```
55
56
### Test Result Collection
57
58
Utilities for collecting and managing test results from streaming jobs with thread-safe collection mechanisms.
59
60
```java { .api }
61
public class TestListResultSink<T> extends RichSinkFunction<T> {
62
// Sink that collects results in a thread-safe list
63
}
64
65
public class TestListWrapper<T> {
66
// Wrapper for managing test result collections
67
}
68
```
69
70
#### Usage Example
71
72
```java
73
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
74
75
// Create result sink
76
TestListResultSink<String> resultSink = new TestListResultSink<>();
77
78
// Use in streaming job
79
env.fromElements("a", "b", "c")
80
.map(String::toUpperCase)
81
.addSink(resultSink);
82
83
env.execute("Test Job");
84
85
// Retrieve results
86
List<String> results = resultSink.getResult();
87
assertEquals(Arrays.asList("A", "B", "C"), results);
88
```
89
90
## Sample Algorithm Data
91
92
Predefined datasets for testing common graph and machine learning algorithms, providing both input data and expected results for validation.
93
94
### Word Count Data
95
96
Sample text data and expected word count results for testing text processing algorithms.
97
98
```java { .api }
99
public class WordCountData {
100
public static final String TEXT;
101
public static final String EXPECTED_RESULT;
102
}
103
```
104
105
#### Usage Example
106
107
```java
108
import org.apache.flink.test.testdata.WordCountData;
109
110
// Use predefined text data
111
env.fromElements(WordCountData.TEXT.split("\\s+"))
112
.flatMap(new WordCountMapper())
113
.keyBy(0)
114
.sum(1)
115
.print();
116
117
// Verify against expected results
118
String actualResult = collectJobOutput();
119
TestBaseUtils.compareResultsByLinesInMemory(
120
WordCountData.EXPECTED_RESULT, actualResult);
121
```
122
123
### Connected Components Data
124
125
Graph data for testing connected components algorithms with vertices, edges, and expected component assignments.
126
127
```java { .api }
128
public class ConnectedComponentsData {
129
public static final String VERTEX_DATA;
130
public static final String EDGE_DATA;
131
public static final String EXPECTED_RESULT;
132
}
133
```
134
135
#### Usage Example
136
137
```java
138
import org.apache.flink.test.testdata.ConnectedComponentsData;
139
140
// Create graph from test data
141
DataSet<Vertex> vertices = env.fromCollection(parseVertices(ConnectedComponentsData.VERTEX_DATA));
142
DataSet<Edge> edges = env.fromCollection(parseEdges(ConnectedComponentsData.EDGE_DATA));
143
144
// Run connected components algorithm
145
DataSet<Vertex> result = vertices.runOperation(new ConnectedComponents<>(maxIterations))
146
.withBroadcastSet(edges, "edges");
147
148
// Verify results
149
compareWithExpected(result, ConnectedComponentsData.EXPECTED_RESULT);
150
```
151
152
### K-Means Clustering Data
153
154
Sample points and expected cluster assignments for testing K-means clustering implementations.
155
156
```java { .api }
157
public class KMeansData {
158
public static final String DATAPOINTS;
159
public static final String INITIAL_CENTROIDS;
160
public static final String EXPECTED_RESULT;
161
}
162
```
163
164
### PageRank Data
165
166
Web graph data with vertices, edges, and expected PageRank scores for testing PageRank algorithm implementations.
167
168
```java { .api }
169
public class PageRankData {
170
public static final String VERTICES;
171
public static final String EDGES;
172
public static final String EXPECTED_RESULT;
173
}
174
```
175
176
### Triangle Enumeration Data
177
178
Graph data for testing triangle enumeration algorithms in social network analysis.
179
180
```java { .api }
181
public class EnumTriangleData {
182
public static final String EDGES;
183
public static final String EXPECTED_RESULT;
184
}
185
```
186
187
### Transitive Closure Data
188
189
Graph data for testing transitive closure algorithms with expected reachability results.
190
191
```java { .api }
192
public class TransitiveClosureData {
193
public static final String EDGES;
194
public static final String EXPECTED_RESULT;
195
}
196
```
197
198
### Web Log Analysis Data
199
200
Sample web server log data and expected analysis results for testing log processing applications.
201
202
```java { .api }
203
public class WebLogAnalysisData {
204
public static final String LOG_DATA;
205
public static final String EXPECTED_RESULT;
206
}
207
```
208
209
## Usage Patterns
210
211
### Deterministic Testing
212
213
Using finite sources for predictable test execution with guaranteed completion.
214
215
```java
216
@Test
217
void testStreamingTransformation() throws Exception {
218
// Create deterministic source
219
List<Integer> inputData = Arrays.asList(1, 2, 3, 4, 5);
220
FiniteTestSource<Integer> source = new FiniteTestSource<>(inputData);
221
222
// Collect results
223
TestListResultSink<Integer> sink = new TestListResultSink<>();
224
225
// Build and execute job
226
env.addSource(source)
227
.map(x -> x * x)
228
.addSink(sink);
229
230
env.execute("Square Numbers");
231
232
// Verify results
233
List<Integer> expected = Arrays.asList(1, 4, 9, 16, 25);
234
assertEquals(expected, sink.getResult());
235
}
236
```
237
238
### Algorithm Validation
239
240
Using predefined datasets to validate algorithm implementations.
241
242
```java
243
@Test
244
void testPageRankAlgorithm() throws Exception {
245
// Use predefined PageRank test data
246
DataSet<Vertex> vertices = parseVertices(PageRankData.VERTICES);
247
DataSet<Edge> edges = parseEdges(PageRankData.EDGES);
248
249
// Run PageRank algorithm
250
DataSet<Vertex> result = runPageRank(vertices, edges, 10);
251
252
// Verify against expected results
253
List<String> actualResults = result.collect();
254
TestBaseUtils.compareResultAsText(actualResults, PageRankData.EXPECTED_RESULT);
255
}
256
```
257
258
### Timeout-Based Testing
259
260
Using sources with timeout conditions for testing time-sensitive scenarios.
261
262
```java
263
@Test
264
void testWithTimeout() throws Exception {
265
long timeoutMs = 5000L;
266
BooleanSupplier timeoutCondition = () ->
267
System.currentTimeMillis() > startTime + timeoutMs;
268
269
List<String> data = generateLargeDataset();
270
FiniteTestSource<String> source =
271
new FiniteTestSource<>(timeoutCondition, timeoutMs, data);
272
273
// Job should complete within timeout
274
env.addSource(source)
275
.map(String::toUpperCase)
276
.print();
277
278
JobExecutionResult result = env.execute("Timeout Test");
279
assertTrue(result.getNetRuntime() < timeoutMs);
280
}
281
```
282
283
### Checkpoint Integration
284
285
Using finite sources with checkpoint coordination for testing fault tolerance.
286
287
```java
288
@Test
289
void testCheckpointingWithFiniteSource() throws Exception {
290
// Enable checkpointing
291
env.enableCheckpointing(1000);
292
293
// Create source that coordinates with checkpoints
294
FiniteTestSource<Long> source = new FiniteTestSource<>(1L, 2L, 3L, 4L, 5L);
295
296
env.addSource(source)
297
.keyBy(x -> x % 2)
298
.map(new StatefulMapper())
299
.print();
300
301
env.execute("Checkpoint Test");
302
303
// Verify checkpoints were created
304
// Implementation depends on your checkpoint verification strategy
305
}
306
```