0
# Test Utilities
1
2
Core utility classes and helper functions for common testing operations including execution handling, result collection, test coordination mechanisms, and data structures for testing scenarios.
3
4
## Capabilities
5
6
### TestUtils
7
8
General test utilities for Flink job execution with specialized exception handling.
9
10
```java { .api }
11
/**
12
* Test utilities for Flink job execution
13
*/
14
public class TestUtils {
15
16
/**
17
* Execute StreamExecutionEnvironment handling SuccessException for controlled test termination
18
* Searches through exception causes to find nested SuccessExceptions and treats them as success
19
* @param see StreamExecutionEnvironment to execute
20
* @param name Name for the execution job
21
* @return JobExecutionResult if job completes normally, null if terminated via SuccessException
22
* @throws Exception if job fails with non-SuccessException
23
*/
24
public static JobExecutionResult tryExecute(StreamExecutionEnvironment see, String name) throws Exception;
25
}
26
```
27
28
**Usage Example:**
29
30
```java
31
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
32
env.fromElements(1, 2, 3)
33
.map(x -> {
34
if (x == 3) throw new SuccessException(); // Signal test completion
35
return x * 2;
36
})
37
.print();
38
39
// This will catch SuccessException and return null, indicating successful test completion
40
JobExecutionResult result = TestUtils.tryExecute(env, "Test Job");
41
```
42
43
### SuccessException
44
45
Exception class used to signal successful test completion in controlled termination scenarios.
46
47
```java { .api }
48
/**
49
* Exception thrown to terminate a program and indicate success
50
* Used in conjunction with TestUtils.tryExecute() for controlled test termination
51
*/
52
public class SuccessException extends RuntimeException {
53
public SuccessException();
54
public SuccessException(String message);
55
}
56
```
57
58
### TestListResultSink
59
60
Thread-safe sink for collecting test results into an on-heap list with result retrieval methods.
61
62
```java { .api }
63
/**
64
* Thread-safe sink for collecting elements into an on-heap list
65
* Uses TestListWrapper for managing result storage across test executions
66
*/
67
public class TestListResultSink<T> extends RichSinkFunction<T> {
68
69
/**
70
* Get collected results as unordered list
71
* @return List of collected elements in collection order
72
*/
73
public List<T> getResult();
74
75
/**
76
* Get collected results sorted using natural ordering
77
* @return Sorted list of collected elements
78
*/
79
public List<T> getSortedResult();
80
81
/**
82
* Collect element into result list (thread-safe)
83
* @param value Element to collect
84
* @param context Sink context
85
*/
86
@Override
87
public void invoke(T value, Context context) throws Exception;
88
}
89
```
90
91
**Usage Example:**
92
93
```java
94
public class MyTest extends StreamFaultToleranceTestBase {
95
96
private TestListResultSink<Integer> resultSink = new TestListResultSink<>();
97
98
@Override
99
public void testProgram(StreamExecutionEnvironment env) {
100
env.fromElements(3, 1, 4, 1, 5)
101
.map(x -> x * 2)
102
.addSink(resultSink);
103
}
104
105
@Override
106
public void postSubmit() throws Exception {
107
List<Integer> results = resultSink.getResult();
108
assertEquals(Arrays.asList(6, 2, 8, 2, 10), results);
109
110
List<Integer> sortedResults = resultSink.getSortedResult();
111
assertEquals(Arrays.asList(2, 2, 6, 8, 10), sortedResults);
112
}
113
}
114
```
115
116
### TestListWrapper
117
118
Singleton catalog for managing test result lists with thread-safe operations for concurrent test access.
119
120
```java { .api }
121
/**
122
* Singleton catalog for lists stored by TestListResultSink
123
* Provides thread-safe operations for managing test result collections
124
*/
125
public class TestListWrapper {
126
127
/**
128
* Create new result list and return unique identifier
129
* @return Unique list ID for retrieval
130
*/
131
public static int createList();
132
133
/**
134
* Retrieve result list by unique identifier
135
* @param listId Unique identifier returned by createList()
136
* @return List of collected results
137
*/
138
public static <T> List<T> getList(int listId);
139
140
/**
141
* Clear all stored lists (for test cleanup)
142
*/
143
public static void clearAllLists();
144
}
145
```
146
147
### Tokenizer
148
149
FlatMap function for splitting strings into word count tuples, commonly used in testing scenarios.
150
151
```java { .api }
152
/**
153
* FlatMap function for splitting strings into word count tuples
154
* Commonly used in testing word count and text processing scenarios
155
*/
156
public class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
157
158
/**
159
* Split input line into words and emit (word, 1) tuples
160
* @param value Input string to tokenize
161
* @param out Collector for emitting word count tuples
162
*/
163
@Override
164
public void flatMap(String value, Collector<Tuple2<String, Integer>> out);
165
}
166
```
167
168
**Usage Example:**
169
170
```java
171
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
172
env.fromElements("hello world", "hello flink", "world of streaming")
173
.flatMap(new Tokenizer())
174
.keyBy(tuple -> tuple.f0)
175
.sum(1)
176
.print();
177
// Output: (hello,1), (world,1), (hello,2), (flink,1), (world,2), (of,1), (streaming,1)
178
```
179
180
## Configuration Classes
181
182
### GeneratorConfiguration
183
184
Configuration class for event generators in windowing tests with lateness and timing controls.
185
186
```java { .api }
187
/**
188
* Configuration for event generators in windowing tests
189
* Controls event timing, lateness behavior, and session gaps
190
*/
191
public class GeneratorConfiguration {
192
193
/** Allowed lateness in milliseconds */
194
public final long allowedLateness;
195
196
/** Number of late events within lateness per session */
197
public final int lateEventsWithinLateness;
198
199
/** Number of late events after lateness per session */
200
public final int lateEventsAfterLateness;
201
202
/** Maximum additional gap between event times */
203
public final long maxAdditionalSessionGap;
204
205
/**
206
* Create generator configuration
207
* @param allowedLateness Allowed lateness in milliseconds
208
* @param lateEventsWithinLateness Late events within lateness per session
209
* @param lateEventsAfterLateness Late events after lateness per session
210
* @param maxAdditionalSessionGap Maximum additional session gap
211
*/
212
public GeneratorConfiguration(long allowedLateness, int lateEventsWithinLateness,
213
int lateEventsAfterLateness, long maxAdditionalSessionGap);
214
}
215
```
216
217
### SessionGeneratorConfiguration
218
219
Specialized configuration for session window testing scenarios.
220
221
```java { .api }
222
/**
223
* Configuration for session window event generation
224
* Extends GeneratorConfiguration with session-specific parameters
225
*/
226
public class SessionGeneratorConfiguration extends GeneratorConfiguration {
227
// Session-specific configuration parameters
228
}
229
```
230
231
### SessionConfiguration
232
233
Configuration class for individual session parameters in windowing tests.
234
235
```java { .api }
236
/**
237
* Configuration for individual session parameters in windowing tests
238
* Defines session boundaries, event counts, and timing characteristics
239
*/
240
public class SessionConfiguration {
241
// Session boundary and timing configuration
242
}
243
```
244
245
## Utility Constants
246
247
### Common Test Constants
248
249
```java { .api }
250
/**
251
* Standard parallelism settings used across test base classes
252
*/
253
public static final int STANDARD_PARALLELISM = 4;
254
public static final int HIGH_PARALLELISM = 12;
255
public static final int NUM_TASK_MANAGERS = 3;
256
public static final int NUM_TASK_SLOTS = 4;
257
258
/**
259
* Timeout and deadline configurations
260
*/
261
public static final int DEFAULT_TIMEOUT_MINUTES = 5;
262
public static final int DEFAULT_DEADLINE_MINUTES = 2;
263
264
/**
265
* File coordination patterns for process-based testing
266
*/
267
public static final String READY_MARKER_FILE_PREFIX = "ready-";
268
public static final String PROCEED_MARKER_FILE = "proceed";
269
public static final String FINISH_MARKER_FILE_PREFIX = "finish-";
270
```
271
272
## Integration Patterns
273
274
### Result Collection Pattern
275
276
The typical pattern for collecting and verifying test results:
277
278
```java
279
// 1. Create result sink
280
TestListResultSink<MyType> resultSink = new TestListResultSink<>();
281
282
// 2. Add to topology
283
env.source()
284
.transform()
285
.addSink(resultSink);
286
287
// 3. Verify results in postSubmit()
288
List<MyType> results = resultSink.getResult();
289
assertThat(results, hasSize(expectedSize));
290
assertThat(results, containsInAnyOrder(expectedElements));
291
```
292
293
### Exception-based Test Control
294
295
Using SuccessException for controlled test termination:
296
297
```java
298
env.addSource(new SourceFunction<Integer>() {
299
@Override
300
public void run(SourceContext<Integer> ctx) throws Exception {
301
for (int i = 0; i < 100; i++) {
302
ctx.collect(i);
303
if (i == 50) throw new SuccessException("Reached target");
304
}
305
}
306
307
@Override
308
public void cancel() {}
309
});
310
311
TestUtils.tryExecute(env, "Controlled Test");
312
```
313
314
### Thread-Safe Result Access
315
316
TestListWrapper enables safe result access across multiple test threads:
317
318
```java
319
// Thread 1: Collect results
320
int listId = TestListWrapper.createList();
321
// Add elements to list via TestListResultSink
322
323
// Thread 2: Access results
324
List<String> results = TestListWrapper.getList(listId);
325
// Process results safely
326
```