0
# Streaming Test Utilities
1
2
Thread-safe utilities for collecting and validating results in streaming Flink applications. These utilities provide mechanisms for result collection, test coordination, fault injection, validation, and streaming-specific helper functions that enable comprehensive testing of streaming topologies including failure scenarios.
3
4
## Capabilities
5
6
### Test List Result Sink
7
8
Thread-safe sink for collecting streaming results into lists for verification.
9
10
```java { .api }
11
/**
12
* Thread-safe sink that collects streaming results into a list for test verification
13
*/
14
public class TestListResultSink<T> extends RichSinkFunction<T> {
15
16
/**
17
* Default constructor creating empty result sink
18
*/
19
public TestListResultSink();
20
21
/**
22
* Get the collected results as an unordered list
23
* @return List containing all collected elements
24
*/
25
public List<T> getResult();
26
27
/**
28
* Get the collected results sorted using natural ordering
29
* @return Sorted list containing all collected elements
30
*/
31
public List<T> getSortedResult();
32
33
/**
34
* Sink function implementation - adds element to internal collection
35
* @param value Element to collect
36
* @throws Exception if collection fails
37
*/
38
public void invoke(T value) throws Exception;
39
}
40
```
41
42
**Usage Example:**
43
44
```java
45
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
46
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
47
48
// Setup streaming environment
49
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
50
env.setParallelism(1); // Use parallelism 1 for deterministic results
51
52
// Create result collector
53
TestListResultSink<String> resultSink = new TestListResultSink<>();
54
55
// Build streaming topology
56
env.fromElements("hello", "world", "flink", "streaming")
57
.map(String::toUpperCase)
58
.addSink(resultSink);
59
60
// Execute and collect results
61
env.execute("Test Job");
62
63
// Verify results
64
List<String> results = resultSink.getResult();
65
assertEquals(4, results.size());
66
assertTrue(results.contains("HELLO"));
67
assertTrue(results.contains("WORLD"));
68
69
// Get sorted results for ordered verification
70
List<String> sortedResults = resultSink.getSortedResult();
71
assertEquals("FLINK", sortedResults.get(0));
72
assertEquals("HELLO", sortedResults.get(1));
73
```
74
75
### Test List Wrapper
76
77
Singleton catalog for managing multiple test result lists across different test scenarios.
78
79
```java { .api }
80
/**
81
* Singleton catalog for managing multiple test result lists
82
*/
83
public class TestListWrapper {
84
85
/**
86
* Get the singleton instance of TestListWrapper
87
* @return The singleton TestListWrapper instance
88
*/
89
public static TestListWrapper getInstance();
90
91
/**
92
* Create a new result list and return its ID
93
* @return Integer ID for the newly created list
94
*/
95
public int createList();
96
97
/**
98
* Retrieve a result list by its ID
99
* @param listId ID of the list to retrieve
100
* @return List of collected objects, or null if ID doesn't exist
101
*/
102
public List<Object> getList(int listId);
103
}
104
```
105
106
**Usage Example:**
107
108
```java
109
import org.apache.flink.test.streaming.runtime.util.TestListWrapper;
110
111
// Get wrapper instance
112
TestListWrapper wrapper = TestListWrapper.getInstance();
113
114
// Create lists for different test scenarios
115
int sourceResultsId = wrapper.createList();
116
int processedResultsId = wrapper.createList();
117
118
// Use in streaming topology (pseudocode - would need custom sink implementation)
119
dataStream1.addSink(new ListCollectorSink(sourceResultsId));
120
dataStream2.addSink(new ListCollectorSink(processedResultsId));
121
122
// After execution, retrieve results
123
List<Object> sourceResults = wrapper.getList(sourceResultsId);
124
List<Object> processedResults = wrapper.getList(processedResultsId);
125
126
// Verify both result sets
127
assertEquals(expectedSourceCount, sourceResults.size());
128
assertEquals(expectedProcessedCount, processedResults.size());
129
```
130
131
### Output Selectors and Transformations
132
133
Utility functions and selectors for streaming data routing and transformation.
134
135
```java { .api }
136
/**
137
* Output selector that routes integers based on even/odd criteria
138
*/
139
public class EvenOddOutputSelector implements OutputSelector<Integer> {
140
141
/**
142
* Select output names based on whether the integer is even or odd
143
* @param value Integer value to evaluate
144
* @return Iterable of output names ("even" or "odd")
145
*/
146
public Iterable<String> select(Integer value);
147
}
148
149
/**
150
* No-operation mapper that returns input unchanged
151
*/
152
public class NoOpIntMap implements MapFunction<Integer, Integer> {
153
154
/**
155
* Identity mapping function for integers
156
* @param value Input integer
157
* @return Same integer unchanged
158
* @throws Exception if mapping fails
159
*/
160
public Integer map(Integer value) throws Exception;
161
}
162
163
/**
164
* No-operation sink that receives elements but does nothing with them
165
*/
166
public class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
167
168
/**
169
* Constructor with element count tracking
170
* @param expectedElementCount Expected number of elements to receive
171
*/
172
public ReceiveCheckNoOpSink(int expectedElementCount);
173
174
/**
175
* Receive element but perform no operation
176
* @param value Element to receive
177
* @param context Sink context
178
* @throws Exception if receive fails
179
*/
180
public void invoke(T value, Context context) throws Exception;
181
182
/**
183
* Check if expected number of elements were received
184
* @return true if expected count was reached
185
*/
186
public boolean isExpectedCountReached();
187
}
188
```
189
190
### Fault Injection Utilities
191
192
Sources and sinks for testing failure scenarios and fault tolerance in streaming applications.
193
194
```java { .api }
195
/**
196
* Source that introduces artificial failures for testing fault tolerance
197
*/
198
public class FailingSource<T> extends RichSourceFunction<T> {
199
200
/**
201
* Constructor for failing source with custom event generator
202
* @param generator Custom generator for emitting events before failure
203
* @param failAfterElements Number of elements to emit before inducing failure
204
*/
205
public FailingSource(EventEmittingGenerator<T> generator, int failAfterElements);
206
207
/**
208
* Interface for custom event generation in failing source
209
*/
210
public static interface EventEmittingGenerator<T> extends Serializable {
211
/**
212
* Emit a single event to the source context
213
* @param ctx Source context for emitting events
214
* @param eventSequenceNo Sequence number of current event
215
*/
216
public void emitEvent(SourceContext<T> ctx, int eventSequenceNo);
217
}
218
}
219
220
/**
221
* Sink for validating streaming results with custom validation logic
222
*/
223
public class ValidatingSink<T> extends RichSinkFunction<T> {
224
225
/**
226
* Constructor with result checker and count updater
227
* @param resultChecker Custom checker for validating individual results
228
* @param countUpdater Updater for tracking element counts
229
*/
230
public ValidatingSink(ResultChecker<T> resultChecker, CountUpdater countUpdater);
231
232
/**
233
* Interface for custom result validation logic
234
*/
235
public static interface ResultChecker<T> extends Serializable {
236
/**
237
* Check if a result meets validation criteria
238
* @param result Result element to validate
239
* @return true if result passes validation
240
*/
241
public boolean checkResult(T result);
242
}
243
244
/**
245
* Interface for updating element counts during validation
246
*/
247
public static interface CountUpdater extends Serializable {
248
/**
249
* Update the element count
250
* @param count Current element count
251
*/
252
public void updateCount(long count);
253
}
254
}
255
```
256
257
**Fault Injection Usage Examples:**
258
259
```java
260
import org.apache.flink.test.checkpointing.utils.FailingSource;
261
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
262
263
// Create a failing source that emits integers and fails after 1000 elements
264
FailingSource<Integer> failingSource = new FailingSource<>(
265
new FailingSource.EventEmittingGenerator<Integer>() {
266
@Override
267
public void emitEvent(SourceContext<Integer> ctx, int eventSequenceNo) {
268
ctx.collect(eventSequenceNo);
269
}
270
},
271
1000 // Fail after 1000 elements
272
);
273
274
// Create a validating sink that checks for positive integers
275
ValidatingSink<Integer> validatingSink = new ValidatingSink<>(
276
new ValidatingSink.ResultChecker<Integer>() {
277
@Override
278
public boolean checkResult(Integer result) {
279
return result > 0; // Only accept positive integers
280
}
281
},
282
new ValidatingSink.CountUpdater() {
283
@Override
284
public void updateCount(long count) {
285
// Track count of validated elements
286
System.out.println("Validated elements: " + count);
287
}
288
}
289
);
290
291
// Build fault-tolerant streaming topology
292
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
293
env.enableCheckpointing(1000); // Enable checkpointing for fault tolerance
294
295
env.addSource(failingSource)
296
.map(x -> Math.abs(x)) // Ensure positive values
297
.addSink(validatingSink);
298
299
// Job will fail after 1000 elements, then restart and validate recovery
300
env.execute("Fault Tolerance Test");
301
```
302
303
### Test Execution Utilities
304
305
Utilities for executing streaming tests with specialized exception handling.
306
307
```java { .api }
308
/**
309
* Utility class for streaming test execution
310
*/
311
public class TestUtils {
312
313
/**
314
* Execute streaming job with SuccessException handling
315
* @param see StreamExecutionEnvironment configured for execution
316
* @param name Job name for identification
317
* @throws Exception if execution fails for reasons other than SuccessException
318
*/
319
public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception;
320
}
321
322
/**
323
* Exception thrown to indicate successful test completion
324
*/
325
public class SuccessException extends RuntimeException {
326
327
/**
328
* Default constructor for success indication
329
*/
330
public SuccessException();
331
332
/**
333
* Constructor with success message
334
* @param message Success message
335
*/
336
public SuccessException(String message);
337
}
338
```
339
340
### Streaming Test Patterns
341
342
Common patterns for implementing streaming tests:
343
344
**Basic Result Collection Pattern:**
345
346
```java
347
@Test
348
public void testStreamingTransformation() throws Exception {
349
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
350
env.setParallelism(1);
351
352
// Create result collector
353
TestListResultSink<Integer> resultSink = new TestListResultSink<>();
354
355
// Build topology
356
env.fromElements(1, 2, 3, 4, 5)
357
.map(x -> x * 2)
358
.filter(x -> x > 5)
359
.addSink(resultSink);
360
361
// Execute and verify
362
env.execute("Transformation Test");
363
364
List<Integer> results = resultSink.getSortedResult();
365
assertEquals(Arrays.asList(6, 8, 10), results);
366
}
367
```
368
369
**Multi-Stream Result Collection:**
370
371
```java
372
@Test
373
public void testMultiStreamProcessing() throws Exception {
374
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
375
376
TestListWrapper wrapper = TestListWrapper.getInstance();
377
int evenResultsId = wrapper.createList();
378
int oddResultsId = wrapper.createList();
379
380
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6);
381
382
// Split stream using output selector
383
SplitStream<Integer> splitStream = numbers.split(new EvenOddOutputSelector());
384
385
splitStream.select("even").addSink(new CustomListSink(evenResultsId));
386
splitStream.select("odd").addSink(new CustomListSink(oddResultsId));
387
388
env.execute("Multi-Stream Test");
389
390
// Verify results
391
List<Object> evenResults = wrapper.getList(evenResultsId);
392
List<Object> oddResults = wrapper.getList(oddResultsId);
393
394
assertEquals(3, evenResults.size()); // 2, 4, 6
395
assertEquals(3, oddResults.size()); // 1, 3, 5
396
}
397
```
398
399
**Success Exception Pattern:**
400
401
```java
402
@Test
403
public void testSuccessfulCompletion() throws Exception {
404
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
405
406
env.fromElements("test")
407
.map(value -> {
408
// Simulate successful completion
409
throw new SuccessException("Test completed successfully");
410
})
411
.addSink(new DiscardingSink<>());
412
413
// Execute with success exception handling
414
TestUtils.tryExecute(env, "Success Test");
415
416
// Test passes if SuccessException was caught and handled
417
}
418
```
419
420
**Parallel Result Collection:**
421
422
```java
423
@Test
424
public void testParallelResultCollection() throws Exception {
425
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
426
env.setParallelism(4);
427
428
TestListResultSink<String> resultSink = new TestListResultSink<>();
429
430
env.fromCollection(generateLargeDataset())
431
.map(data -> processData(data))
432
.addSink(resultSink);
433
434
env.execute("Parallel Test");
435
436
// Results from parallel execution - order not guaranteed
437
List<String> results = resultSink.getResult();
438
assertEquals(expectedTotalCount, results.size());
439
440
// Use sorted results for deterministic verification
441
List<String> sortedResults = resultSink.getSortedResult();
442
assertEquals(expectedFirstElement, sortedResults.get(0));
443
assertEquals(expectedLastElement, sortedResults.get(sortedResults.size() - 1));
444
}
445
```
446
447
**Custom Sink with Count Verification:**
448
449
```java
450
@Test
451
public void testElementCountVerification() throws Exception {
452
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
453
454
int expectedCount = 1000;
455
ReceiveCheckNoOpSink<Integer> countingSink = new ReceiveCheckNoOpSink<>(expectedCount);
456
457
env.fromCollection(generateIntegerSequence(expectedCount))
458
.addSink(countingSink);
459
460
env.execute("Count Verification Test");
461
462
assertTrue("Expected count not reached", countingSink.isExpectedCountReached());
463
}
464
```
465
466
These streaming utilities provide the foundation for reliable, deterministic testing of streaming Flink applications, enabling comprehensive verification of streaming transformations, windowing operations, and stateful processing.