0
# Data Collection and Sources
1
2
Tools for creating controlled test data sources and collecting streaming results for validation in tests, including finite test sources with checkpoint synchronization and result collection utilities.
3
4
## Capabilities
5
6
### Finite Test Source
7
8
`FiniteTestSource` provides a controlled test source that emits elements in cycles with checkpoint synchronization, ideal for testing streaming applications with deterministic behavior.
9
10
```java { .api }
11
/**
12
* Test source that emits elements in cycles with checkpoint synchronization
13
* @param <T> Type of elements to emit
14
*/
15
public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
16
/**
17
* Create source with vararg elements
18
* @param elements Elements to emit
19
*/
20
public FiniteTestSource(T... elements);
21
22
/**
23
* Create source with iterable elements
24
* @param elements Elements to emit
25
*/
26
public FiniteTestSource(Iterable<T> elements);
27
28
/**
29
* Create source with exit condition and timeout
30
* @param exitCondition Supplier that returns true when source should stop
31
* @param timeoutMs Timeout in milliseconds
32
* @param elements Elements to emit
33
*/
34
public FiniteTestSource(BooleanSupplier exitCondition, long timeoutMs, Iterable<T> elements);
35
36
/**
37
* Create source with exit condition
38
* @param exitCondition Supplier that returns true when source should stop
39
* @param elements Elements to emit
40
*/
41
public FiniteTestSource(BooleanSupplier exitCondition, Iterable<T> elements);
42
43
/**
44
* Main source execution method
45
* @param ctx Source context for emitting elements
46
*/
47
public void run(SourceContext<T> ctx) throws Exception;
48
49
/**
50
* Cancels the source
51
*/
52
public void cancel();
53
54
/**
55
* Checkpoint completion notification
56
* @param checkpointId ID of completed checkpoint
57
*/
58
public void notifyCheckpointComplete(long checkpointId) throws Exception;
59
60
/**
61
* Checkpoint abortion notification
62
* @param checkpointId ID of aborted checkpoint
63
*/
64
public void notifyCheckpointAborted(long checkpointId) throws Exception;
65
}
66
```
67
68
**Usage Examples:**
69
70
```java
71
import org.apache.flink.streaming.util.FiniteTestSource;
72
import org.apache.flink.streaming.api.datastream.DataStream;
73
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
74
75
@Test
76
public void testFiniteSource() throws Exception {
77
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
78
79
// Simple finite source with vararg elements
80
DataStream<String> stream1 = env.addSource(
81
new FiniteTestSource<>("hello", "world", "flink")
82
);
83
84
// Source with collection of elements
85
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
86
DataStream<Integer> stream2 = env.addSource(
87
new FiniteTestSource<>(numbers)
88
);
89
90
// Source with exit condition - stops after 10 seconds or when condition is met
91
AtomicBoolean shouldStop = new AtomicBoolean(false);
92
DataStream<String> stream3 = env.addSource(
93
new FiniteTestSource<>(
94
() -> shouldStop.get(),
95
10000L,
96
Arrays.asList("data1", "data2", "data3")
97
)
98
);
99
100
// Process streams
101
stream1.print("Stream1");
102
stream2.map(x -> x * 2).print("Stream2");
103
stream3.print("Stream3");
104
105
// In another thread, signal to stop after some processing
106
Timer timer = new Timer();
107
timer.schedule(new TimerTask() {
108
@Override
109
public void run() {
110
shouldStop.set(true);
111
}
112
}, 5000);
113
114
env.execute("Finite Source Test");
115
}
116
```
117
118
### Stream Collector
119
120
`StreamCollector` provides JUnit integration for collecting all elements from a DataStream for testing and validation.
121
122
```java { .api }
123
/**
124
* JUnit rule for collecting all elements from a DataStream for testing
125
*/
126
public class StreamCollector extends ExternalResource {
127
/**
128
* Collects all elements from the stream
129
* @param stream DataStream to collect from
130
* @return CompletableFuture that completes with collected elements
131
*/
132
public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream);
133
}
134
```
135
136
**Usage Example:**
137
138
```java
139
import org.apache.flink.streaming.util.StreamCollector;
140
import org.apache.flink.streaming.api.datastream.DataStream;
141
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
142
import org.junit.Rule;
143
144
public class StreamCollectionTest {
145
146
@Rule
147
public StreamCollector streamCollector = new StreamCollector();
148
149
@Test
150
public void testStreamCollection() throws Exception {
151
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
152
153
// Create test stream
154
DataStream<String> input = env.fromElements("apple", "banana", "cherry");
155
DataStream<String> processed = input.map(String::toUpperCase);
156
157
// Collect results
158
CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);
159
160
// Execute the job
161
env.execute("Stream Collection Test");
162
163
// Get results and validate
164
Collection<String> results = resultFuture.get(10, TimeUnit.SECONDS);
165
assertEquals(3, results.size());
166
assertTrue(results.contains("APPLE"));
167
assertTrue(results.contains("BANANA"));
168
assertTrue(results.contains("CHERRY"));
169
}
170
171
@Test
172
public void testStreamWithFiltering() throws Exception {
173
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
174
175
// Create test stream with filtering
176
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
177
DataStream<Integer> evenNumbers = numbers.filter(x -> x % 2 == 0);
178
179
// Collect filtered results
180
CompletableFuture<Collection<Integer>> resultFuture = streamCollector.collect(evenNumbers);
181
182
env.execute("Filtering Test");
183
184
// Validate filtered results
185
Collection<Integer> results = resultFuture.get();
186
assertEquals(5, results.size());
187
assertTrue(results.contains(2));
188
assertTrue(results.contains(4));
189
assertTrue(results.contains(6));
190
assertTrue(results.contains(8));
191
assertTrue(results.contains(10));
192
}
193
}
194
```
195
196
## Advanced Data Collection Patterns
197
198
### Testing with Multiple Sources
199
200
```java
201
@Test
202
public void testMultipleSources() throws Exception {
203
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
204
205
// Create multiple finite sources
206
DataStream<String> source1 = env.addSource(
207
new FiniteTestSource<>("a1", "a2", "a3")
208
);
209
210
DataStream<String> source2 = env.addSource(
211
new FiniteTestSource<>("b1", "b2", "b3")
212
);
213
214
// Union the sources
215
DataStream<String> combined = source1.union(source2);
216
DataStream<String> processed = combined.map(s -> "processed-" + s);
217
218
// Collect results
219
CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);
220
221
env.execute("Multiple Sources Test");
222
223
// Validate combined results
224
Collection<String> results = resultFuture.get();
225
assertEquals(6, results.size());
226
assertTrue(results.contains("processed-a1"));
227
assertTrue(results.contains("processed-b1"));
228
}
229
```
230
231
### Testing with Windowing
232
233
```java
234
import org.apache.flink.streaming.api.windowing.time.Time;
235
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
236
237
@Test
238
public void testWindowedStream() throws Exception {
239
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
240
241
// Create source with timestamped data
242
DataStream<Tuple2<String, Integer>> input = env.addSource(
243
new FiniteTestSource<>(
244
Tuple2.of("key1", 1),
245
Tuple2.of("key1", 2),
246
Tuple2.of("key2", 3),
247
Tuple2.of("key1", 4),
248
Tuple2.of("key2", 5)
249
)
250
);
251
252
// Apply windowing and aggregation
253
DataStream<Tuple2<String, Integer>> windowed = input
254
.keyBy(t -> t.f0)
255
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
256
.sum(1);
257
258
// Collect windowed results
259
CompletableFuture<Collection<Tuple2<String, Integer>>> resultFuture =
260
streamCollector.collect(windowed);
261
262
env.execute("Windowed Stream Test");
263
264
// Validate aggregated results
265
Collection<Tuple2<String, Integer>> results = resultFuture.get();
266
assertFalse(results.isEmpty());
267
268
// Check that aggregation occurred
269
boolean foundKey1Sum = results.stream()
270
.anyMatch(t -> "key1".equals(t.f0) && t.f1 > 1);
271
assertTrue("Expected aggregated key1 values", foundKey1Sum);
272
}
273
```
274
275
### Testing with Checkpointing
276
277
```java
278
@Test
279
public void testSourceWithCheckpointing() throws Exception {
280
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
281
282
// Enable checkpointing
283
env.enableCheckpointing(100);
284
285
// Create source that will emit elements and handle checkpoints
286
List<String> elements = Arrays.asList("checkpoint1", "checkpoint2", "checkpoint3");
287
AtomicInteger checkpointCount = new AtomicInteger(0);
288
289
FiniteTestSource<String> source = new FiniteTestSource<String>(elements) {
290
@Override
291
public void notifyCheckpointComplete(long checkpointId) throws Exception {
292
super.notifyCheckpointComplete(checkpointId);
293
checkpointCount.incrementAndGet();
294
}
295
};
296
297
DataStream<String> stream = env.addSource(source);
298
DataStream<String> processed = stream.map(s -> "checkpoint-processed-" + s);
299
300
// Collect results
301
CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);
302
303
env.execute("Checkpointing Test");
304
305
// Validate results and checkpointing
306
Collection<String> results = resultFuture.get();
307
assertEquals(3, results.size());
308
assertTrue("Checkpoints should have been triggered", checkpointCount.get() > 0);
309
}
310
```
311
312
### Testing Source Cancellation
313
314
```java
315
@Test
316
public void testSourceCancellation() throws Exception {
317
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
318
319
// Create source with exit condition for controlled cancellation
320
AtomicBoolean cancelled = new AtomicBoolean(false);
321
List<String> elements = Arrays.asList("cancel1", "cancel2", "cancel3");
322
323
FiniteTestSource<String> source = new FiniteTestSource<String>(
324
() -> cancelled.get(),
325
5000L, // 5 second timeout
326
elements
327
) {
328
@Override
329
public void cancel() {
330
super.cancel();
331
cancelled.set(true);
332
}
333
};
334
335
DataStream<String> stream = env.addSource(source);
336
337
// Set up result collection
338
CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(stream);
339
340
// Cancel after short delay
341
Timer timer = new Timer();
342
timer.scheduleAtFixedRate(new TimerTask() {
343
@Override
344
public void run() {
345
cancelled.set(true);
346
}
347
}, 1000, 100);
348
349
env.execute("Cancellation Test");
350
351
// Validate that source was properly cancelled
352
Collection<String> results = resultFuture.get();
353
assertTrue("Source should have been cancelled", cancelled.get());
354
}
355
```