0
# Execution Utilities
1
2
Core utilities for executing test jobs with proper exception handling and result validation. These utilities handle the complexities of test execution in Flink environments and provide consistent patterns for test success validation.
3
4
## Core Execution Utilities
5
6
### TestUtils
7
8
Primary utility class for executing streaming jobs with proper exception handling and success validation.
9
10
```java { .api }
11
public class TestUtils {
12
public static JobExecutionResult tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception;
13
}
14
```
15
16
The `tryExecute` method:
17
- Executes the streaming job with the given name
18
- Catches `ProgramInvocationException` and `JobExecutionException`
19
- Searches for nested `SuccessException` to determine test success
20
- Fails the test if no `SuccessException` is found in the exception chain
21
- Returns `JobExecutionResult` on successful completion or `null` if terminated by `SuccessException`
22
23
### SuccessException
24
25
Custom exception used to indicate successful test completion and controlled program termination.
26
27
```java { .api }
28
public class SuccessException extends Exception {
29
public SuccessException();
30
}
31
```
32
33
This exception is typically thrown by:
34
- Custom sink functions when expected results are achieved
35
- Map/filter functions when target conditions are met
36
- Source functions when sufficient data has been processed
37
38
## Test Execution Patterns
39
40
### Controlled Termination Pattern
41
42
Use `SuccessException` to terminate streaming jobs when test conditions are met:
43
44
```java
45
public class ValidatingFunction implements MapFunction<Integer, Integer> {
46
private int processedCount = 0;
47
private final int targetCount;
48
49
public ValidatingFunction(int targetCount) {
50
this.targetCount = targetCount;
51
}
52
53
@Override
54
public Integer map(Integer value) throws Exception {
55
processedCount++;
56
57
// Perform validation logic
58
if (value < 0) {
59
throw new RuntimeException("Invalid negative value");
60
}
61
62
// Terminate successfully when target reached
63
if (processedCount >= targetCount) {
64
throw new SuccessException("Processed " + targetCount + " elements successfully");
65
}
66
67
return value * 2;
68
}
69
}
70
```
71
72
### Result Collection with Success Validation
73
74
```java
75
public class CollectingValidatingSink<T> implements SinkFunction<T> {
76
private final List<T> results = new ArrayList<>();
77
private final int expectedCount;
78
79
public CollectingValidatingSink(int expectedCount) {
80
this.expectedCount = expectedCount;
81
}
82
83
@Override
84
public void invoke(T value) throws Exception {
85
synchronized (results) {
86
results.add(value);
87
88
// Validate intermediate results
89
validateElement(value);
90
91
// Terminate when collection is complete
92
if (results.size() >= expectedCount) {
93
validateFinalResults();
94
throw new SuccessException("Successfully collected " + expectedCount + " elements");
95
}
96
}
97
}
98
99
private void validateElement(T value) throws Exception {
100
// Element-level validation logic
101
}
102
103
private void validateFinalResults() throws Exception {
104
// Final result validation logic
105
}
106
}
107
```
108
109
## Usage Examples
110
111
### Basic Streaming Job Execution
112
113
```java
114
@Test
115
public void testBasicStreamingExecution() throws Exception {
116
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
117
env.setParallelism(1);
118
119
env.fromElements(1, 2, 3, 4, 5)
120
.map(x -> x * 2)
121
.addSink(new PrintSinkFunction<>());
122
123
// Execute with proper exception handling
124
JobExecutionResult result = TestUtils.tryExecute(env, "Basic Streaming Test");
125
126
assertNotNull("Job should complete successfully", result);
127
}
128
```
129
130
### Success Exception Pattern
131
132
```java
133
@Test
134
public void testSuccessExceptionPattern() throws Exception {
135
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
136
env.setParallelism(1);
137
138
env.fromSequence(1, Long.MAX_VALUE) // Infinite source
139
.map(new ValidatingFunction(100)) // Terminates after 100 elements
140
.addSink(new DiscardingSink<>());
141
142
// Job will terminate via SuccessException
143
JobExecutionResult result = TestUtils.tryExecute(env, "Success Exception Test");
144
145
// Result will be null when terminated by SuccessException
146
assertNull("Job should terminate via SuccessException", result);
147
}
148
```
149
150
### Comprehensive Result Validation
151
152
```java
153
@Test
154
public void testComprehensiveValidation() throws Exception {
155
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
156
env.setParallelism(2);
157
158
List<Integer> inputData = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
159
160
env.fromCollection(inputData)
161
.keyBy(x -> x % 2) // Partition by even/odd
162
.map(x -> x * x) // Square each number
163
.addSink(new ValidatingResultSink(inputData.size()));
164
165
// Execute with validation
166
TestUtils.tryExecute(env, "Comprehensive Validation Test");
167
}
168
169
class ValidatingResultSink implements SinkFunction<Integer> {
170
private final Set<Integer> receivedValues = new HashSet<>();
171
private final int expectedCount;
172
173
public ValidatingResultSink(int expectedCount) {
174
this.expectedCount = expectedCount;
175
}
176
177
@Override
178
public void invoke(Integer value) throws Exception {
179
synchronized (receivedValues) {
180
// Validate value is a perfect square
181
int sqrt = (int) Math.sqrt(value);
182
if (sqrt * sqrt != value) {
183
throw new RuntimeException("Value " + value + " is not a perfect square");
184
}
185
186
receivedValues.add(value);
187
188
if (receivedValues.size() >= expectedCount) {
189
// Validate we received all expected squares
190
Set<Integer> expectedSquares = Set.of(1, 4, 9, 16, 25, 36, 49, 64, 81, 100);
191
if (!receivedValues.equals(expectedSquares)) {
192
throw new RuntimeException("Received values don't match expected squares");
193
}
194
195
throw new SuccessException("All " + expectedCount + " squares validated successfully");
196
}
197
}
198
}
199
}
200
```
201
202
### Timeout and Error Handling
203
204
```java
205
@Test
206
public void testExecutionWithTimeout() throws Exception {
207
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
208
env.setParallelism(1);
209
210
// Job that should complete within reasonable time
211
env.fromSequence(1, 1000)
212
.map(new SlowProcessingFunction(10)) // 10ms per element
213
.addSink(new TimeoutValidatingSink(5000)); // 5 second timeout
214
215
long startTime = System.currentTimeMillis();
216
217
try {
218
TestUtils.tryExecute(env, "Timeout Test");
219
} catch (Exception e) {
220
long duration = System.currentTimeMillis() - startTime;
221
assertTrue("Test should complete within timeout", duration < 10000);
222
throw e;
223
}
224
}
225
226
class TimeoutValidatingSink implements SinkFunction<Integer> {
227
private final long timeoutMs;
228
private final long startTime;
229
private int elementCount;
230
231
public TimeoutValidatingSink(long timeoutMs) {
232
this.timeoutMs = timeoutMs;
233
this.startTime = System.currentTimeMillis();
234
}
235
236
@Override
237
public void invoke(Integer value) throws Exception {
238
elementCount++;
239
240
long elapsed = System.currentTimeMillis() - startTime;
241
if (elapsed > timeoutMs) {
242
throw new RuntimeException("Test exceeded timeout of " + timeoutMs + "ms");
243
}
244
245
// Terminate after processing reasonable amount
246
if (elementCount >= 100) {
247
throw new SuccessException("Processed " + elementCount + " elements within timeout");
248
}
249
}
250
}
251
```
252
253
### Parallel Execution Validation
254
255
```java
256
@Test
257
public void testParallelExecutionValidation() throws Exception {
258
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
259
env.setParallelism(4);
260
261
env.fromSequence(1, 1000)
262
.map(new ParallelValidatingFunction())
263
.keyBy(x -> x % 4) // Distribute across 4 partitions
264
.addSink(new ParallelResultSink());
265
266
TestUtils.tryExecute(env, "Parallel Execution Test");
267
}
268
269
class ParallelValidatingFunction implements MapFunction<Long, String> {
270
@Override
271
public String map(Long value) throws Exception {
272
// Include subtask information for validation
273
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
274
return subtaskIndex + ":" + value;
275
}
276
}
277
278
class ParallelResultSink implements SinkFunction<String> {
279
private static final AtomicInteger totalReceived = new AtomicInteger(0);
280
private static final Set<Integer> activeSubtasks = ConcurrentHashMap.newKeySet();
281
282
@Override
283
public void invoke(String value) throws Exception {
284
// Parse subtask index
285
int subtaskIndex = Integer.parseInt(value.split(":")[0]);
286
activeSubtasks.add(subtaskIndex);
287
288
int received = totalReceived.incrementAndGet();
289
290
// Validate parallel execution
291
if (received >= 1000) {
292
if (activeSubtasks.size() < 4) {
293
throw new RuntimeException("Not all subtasks were active: " + activeSubtasks);
294
}
295
296
throw new SuccessException("Successfully validated parallel execution across " +
297
activeSubtasks.size() + " subtasks");
298
}
299
}
300
}
301
```
302
303
### Error Recovery Testing
304
305
```java
306
@Test
307
public void testErrorRecovery() throws Exception {
308
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
309
env.setParallelism(1);
310
env.enableCheckpointing(1000);
311
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 500));
312
313
env.fromSequence(1, 100)
314
.map(new RecoveringFunction())
315
.addSink(new RecoveryValidatingSink());
316
317
// Should succeed after recovery attempts
318
TestUtils.tryExecute(env, "Error Recovery Test");
319
}
320
321
class RecoveringFunction implements MapFunction<Long, Long> {
322
private static int attemptCount = 0;
323
324
@Override
325
public Long map(Long value) throws Exception {
326
// Fail on first attempt for certain values
327
if (attemptCount == 0 && value == 50) {
328
attemptCount++;
329
throw new RuntimeException("Simulated failure at value " + value);
330
}
331
332
return value;
333
}
334
}
335
```