0
# Recovery and Fault Tolerance
1
2
Testing infrastructure for job recovery scenarios, restart strategies, failure simulation, and fault tolerance validation. This includes support for TaskManager process failures, coordinated failure injection, and comprehensive recovery testing patterns.
3
4
## Capabilities
5
6
### SimpleRecoveryITCaseBase
7
8
Base class for testing job recovery scenarios with failure simulation and restart strategy validation.
9
10
```java { .api }
11
/**
12
* Base class for testing job recovery scenarios
13
* Provides infrastructure for testing failed runs followed by successful runs
14
* Tests restart strategies and multiple failure scenarios
15
*/
16
public abstract class SimpleRecoveryITCaseBase {
17
18
/**
19
* Create execution plan that will fail during execution
20
* Implementation should define a plan that fails at a predictable point
21
* @return Plan that will fail during execution
22
*/
23
protected abstract Plan getFailingPlan();
24
25
/**
26
* Create execution plan that should succeed after recovery
27
* Implementation should define a plan that completes successfully
28
* @return Plan that will succeed
29
*/
30
protected abstract Plan getSuccessfulPlan();
31
32
/**
33
* Execute the complete recovery test cycle
34
* 1. Run failing plan and verify failure
35
* 2. Run successful plan and verify completion
36
*/
37
@Test
38
public void testRecovery() throws Exception;
39
40
/**
41
* Test restart strategy with multiple failures
42
* Verifies that jobs can recover from multiple consecutive failures
43
*/
44
@Test
45
public void testMultipleFailures() throws Exception;
46
}
47
```
48
49
**Usage Example:**
50
51
```java
52
public class MyRecoveryTest extends SimpleRecoveryITCaseBase {
53
54
@Override
55
protected Plan getFailingPlan() {
56
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
57
env.fromElements(1, 2, 3, 4, 5)
58
.map(new FailingMapper()) // Fails on specific element
59
.output(new DiscardingOutputFormat<>());
60
return env.createProgramPlan("Failing Plan");
61
}
62
63
@Override
64
protected Plan getSuccessfulPlan() {
65
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
66
env.fromElements(1, 2, 3, 4, 5)
67
.map(new SuccessfulMapper()) // Completes successfully
68
.output(new DiscardingOutputFormat<>());
69
return env.createProgramPlan("Successful Plan");
70
}
71
72
private static class FailingMapper implements MapFunction<Integer, Integer> {
73
@Override
74
public Integer map(Integer value) throws Exception {
75
if (value == 3) {
76
throw new RuntimeException("Simulated failure");
77
}
78
return value * 2;
79
}
80
}
81
82
private static class SuccessfulMapper implements MapFunction<Integer, Integer> {
83
@Override
84
public Integer map(Integer value) throws Exception {
85
return value * 2;
86
}
87
}
88
}
89
```
90
91
### SimpleRecoveryFixedDelayRestartStrategyITBase
92
93
Specialized base class for testing fixed delay restart strategy scenarios.
94
95
```java { .api }
96
/**
97
* Base class for testing fixed delay restart strategy
98
* Configures cluster with fixed delay restart strategy and tests recovery behavior
99
*/
100
public abstract class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase {
101
102
/**
103
* Get restart strategy configuration for fixed delay testing
104
* @return RestartStrategy configuration with fixed delay
105
*/
106
protected abstract RestartStrategies.RestartStrategyConfiguration getRestartStrategy();
107
108
/**
109
* Get expected number of restart attempts
110
* @return Number of expected restart attempts
111
*/
112
protected abstract int getExpectedRestartAttempts();
113
}
114
```
115
116
### SimpleRecoveryFailureRateStrategyITBase
117
118
Specialized base class for testing failure rate restart strategy scenarios.
119
120
```java { .api }
121
/**
122
* Base class for testing failure rate restart strategy
123
* Configures cluster with failure rate restart strategy and tests recovery behavior
124
*/
125
public abstract class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase {
126
127
/**
128
* Get restart strategy configuration for failure rate testing
129
* @return RestartStrategy configuration with failure rate limits
130
*/
131
protected abstract RestartStrategies.RestartStrategyConfiguration getRestartStrategy();
132
133
/**
134
* Get failure rate configuration
135
* @return Failure rate configuration (failures per time interval)
136
*/
137
protected abstract FailureRateRestartStrategyConfiguration getFailureRateConfig();
138
}
139
```
140
141
### AbstractTaskManagerProcessFailureRecoveryTest
142
143
Base class for testing TaskManager process failures and recovery with actual JVM process spawning.
144
145
```java { .api }
146
/**
147
* Base class for testing TaskManager process failures and recovery
148
* Spawns actual JVM processes for TaskManagers and coordinates failure scenarios
149
*/
150
public abstract class AbstractTaskManagerProcessFailureRecoveryTest {
151
152
// File-based coordination constants
153
protected static final String READY_MARKER_FILE_PREFIX = "ready-";
154
protected static final String PROCEED_MARKER_FILE = "proceed";
155
protected static final String FINISH_MARKER_FILE_PREFIX = "finish-";
156
157
/**
158
* Create and submit job that will experience TaskManager failure
159
* @return JobGraph for the job that will experience failure
160
*/
161
protected abstract JobGraph createJobGraph();
162
163
/**
164
* Trigger TaskManager process failure at appropriate time
165
* Implementation should coordinate with job execution to trigger failure
166
*/
167
protected abstract void triggerTaskManagerFailure() throws Exception;
168
169
/**
170
* Verify job recovery after TaskManager failure
171
* Implementation should validate that job recovered successfully
172
*/
173
protected abstract void verifyRecovery() throws Exception;
174
175
/**
176
* Execute complete TaskManager failure and recovery test
177
* 1. Start job execution
178
* 2. Trigger TaskManager failure
179
* 3. Verify job recovery
180
*/
181
@Test
182
public void testTaskManagerFailureRecovery() throws Exception;
183
184
/**
185
* Create ready marker file for coordination
186
* @param taskManagerId ID of TaskManager that is ready
187
*/
188
protected void createReadyMarker(String taskManagerId) throws IOException;
189
190
/**
191
* Wait for proceed marker file
192
* TaskManager processes wait for this signal to continue
193
*/
194
protected void waitForProceedMarker() throws InterruptedException;
195
196
/**
197
* Create finish marker file
198
* @param taskManagerId ID of TaskManager that finished
199
*/
200
protected void createFinishMarker(String taskManagerId) throws IOException;
201
}
202
```
203
204
**Usage Example:**
205
206
```java
207
public class TaskManagerFailureTest extends AbstractTaskManagerProcessFailureRecoveryTest {
208
209
@Override
210
protected JobGraph createJobGraph() {
211
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
212
env.setParallelism(4);
213
env.enableCheckpointing(1000);
214
215
env.addSource(new ContinuousSource())
216
.keyBy(x -> x % 4)
217
.map(new StatefulProcessingFunction())
218
.addSink(new CheckpointedSink<>());
219
220
return env.getStreamGraph().getJobGraph();
221
}
222
223
@Override
224
protected void triggerTaskManagerFailure() throws Exception {
225
// Wait for job to be running
226
Thread.sleep(5000);
227
228
// Kill one TaskManager process
229
ProcessHandle.allProcesses()
230
.filter(p -> p.info().command().orElse("").contains("TaskManager"))
231
.findFirst()
232
.ifPresent(ProcessHandle::destroyForcibly);
233
}
234
235
@Override
236
protected void verifyRecovery() throws Exception {
237
// Wait for job to recover
238
Thread.sleep(10000);
239
240
// Verify job is running with all TaskManagers
241
ClusterClient<?> client = miniCluster.getClusterClient();
242
CompletableFuture<Collection<TaskManagerInfo>> taskManagersFuture =
243
client.listTaskManagers();
244
245
Collection<TaskManagerInfo> taskManagers = taskManagersFuture.get();
246
assertEquals("All TaskManagers should be running", 2, taskManagers.size());
247
}
248
}
249
```
250
251
## Failure Simulation Patterns
252
253
### Coordinated Failure Pattern
254
255
Using file-based coordination for precise failure timing:
256
257
```java
258
public class CoordinatedFailureTest extends AbstractTaskManagerProcessFailureRecoveryTest {
259
260
@Override
261
protected JobGraph createJobGraph() {
262
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
263
264
env.addSource(new SourceFunction<Integer>() {
265
@Override
266
public void run(SourceContext<Integer> ctx) throws Exception {
267
// Signal readiness
268
createReadyMarker("source-tm");
269
270
// Wait for proceed signal
271
waitForProceedMarker();
272
273
// Generate data
274
for (int i = 0; i < 1000; i++) {
275
ctx.collect(i);
276
Thread.sleep(10);
277
}
278
279
// Signal completion
280
createFinishMarker("source-tm");
281
}
282
283
@Override
284
public void cancel() {}
285
}).addSink(new DiscardingSink<>());
286
287
return env.getStreamGraph().getJobGraph();
288
}
289
}
290
```
291
292
### Exception-based Failure Pattern
293
294
Using exceptions for controlled failure simulation:
295
296
```java
297
public class ExceptionFailureTest extends SimpleRecoveryITCaseBase {
298
299
private static class ControlledFailureMapper implements MapFunction<Integer, Integer> {
300
private static volatile boolean shouldFail = true;
301
private int failureCount = 0;
302
303
@Override
304
public Integer map(Integer value) throws Exception {
305
if (shouldFail && failureCount < 3) {
306
failureCount++;
307
throw new RuntimeException("Controlled failure #" + failureCount);
308
}
309
shouldFail = false; // Stop failing after 3 attempts
310
return value * 2;
311
}
312
}
313
314
@Override
315
protected Plan getFailingPlan() {
316
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
317
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 1000));
318
319
env.fromElements(1, 2, 3, 4, 5)
320
.map(new ControlledFailureMapper())
321
.output(new DiscardingOutputFormat<>());
322
323
return env.createProgramPlan();
324
}
325
}
326
```
327
328
## Recovery Validation Patterns
329
330
### State Consistency Validation
331
332
Ensuring state remains consistent across failures:
333
334
```java
335
public class StateConsistencyTest extends StreamFaultToleranceTestBase {
336
337
@Override
338
public void testProgram(StreamExecutionEnvironment env) {
339
env.addSource(new CheckpointedCountingSource())
340
.keyBy(x -> x % 4)
341
.map(new StatefulCounter()) // Maintains count state
342
.addSink(new ValidatingCountSink());
343
}
344
345
@Override
346
public void postSubmit() throws Exception {
347
// Verify final counts are consistent
348
Map<Integer, Long> finalCounts = ValidatingCountSink.getFinalCounts();
349
350
for (Map.Entry<Integer, Long> entry : finalCounts.entrySet()) {
351
assertTrue("Count should be positive", entry.getValue() > 0);
352
assertTrue("Count should be reasonable", entry.getValue() < 10000);
353
}
354
}
355
}
356
```
357
358
### Checkpoint Validation
359
360
Verifying checkpoint behavior during failures:
361
362
```java
363
public class CheckpointValidationTest extends StreamFaultToleranceTestBase {
364
365
@Override
366
public void testProgram(StreamExecutionEnvironment env) {
367
env.enableCheckpointing(500); // Checkpoint every 500ms
368
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
369
370
env.addSource(new FailureInjectingSource())
371
.map(new CheckpointValidatingFunction())
372
.addSink(new CheckpointTrackingSink<>());
373
}
374
375
@Override
376
public void postSubmit() throws Exception {
377
List<Long> checkpointIds = CheckpointTrackingSink.getObservedCheckpoints();
378
379
// Verify checkpoints were created
380
assertFalse("Should have observed checkpoints", checkpointIds.isEmpty());
381
382
// Verify checkpoint IDs are increasing
383
for (int i = 1; i < checkpointIds.size(); i++) {
384
assertTrue("Checkpoint IDs should increase",
385
checkpointIds.get(i) > checkpointIds.get(i-1));
386
}
387
}
388
}
389
```
390
391
## Restart Strategy Testing
392
393
### Fixed Delay Restart Testing
394
395
```java
396
public class FixedDelayRestartTest extends SimpleRecoveryFixedDelayRestartStrategyITBase {
397
398
@Override
399
protected RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
400
return RestartStrategies.fixedDelayRestart(3, 1000); // 3 attempts, 1 second delay
401
}
402
403
@Override
404
protected int getExpectedRestartAttempts() {
405
return 3;
406
}
407
408
@Override
409
protected Plan getFailingPlan() {
410
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
411
env.setRestartStrategy(getRestartStrategy());
412
413
env.fromElements(1, 2, 3)
414
.map(new AlwaysFailingMapper()) // Always fails
415
.output(new DiscardingOutputFormat<>());
416
417
return env.createProgramPlan();
418
}
419
}
420
```
421
422
### Failure Rate Restart Testing
423
424
```java
425
public class FailureRateRestartTest extends SimpleRecoveryFailureRateStrategyITBase {
426
427
@Override
428
protected RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
429
return RestartStrategies.failureRateRestart(
430
3, // max failures
431
Time.minutes(1), // within 1 minute
432
Time.seconds(5) // delay between restarts
433
);
434
}
435
436
@Override
437
protected Plan getFailingPlan() {
438
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
439
env.setRestartStrategy(getRestartStrategy());
440
441
env.fromElements(1, 2, 3, 4, 5)
442
.map(new RateLimitedFailingMapper()) // Fails at controlled rate
443
.output(new DiscardingOutputFormat<>());
444
445
return env.createProgramPlan();
446
}
447
}
448
```
449
450
## Error Handling Patterns
451
452
### Graceful Degradation
453
454
Testing graceful handling of partial failures:
455
456
```java
457
env.addSource(new RobustSource())
458
.map(new FaultTolerantMapper()) // Handles individual record failures
459
.filter(Objects::nonNull) // Filter out failed records
460
.addSink(new ResilientSink<>());
461
```
462
463
### Failure Isolation
464
465
Ensuring failures in one operator don't affect others unnecessarily:
466
467
```java
468
DataStream<Integer> mainStream = env.addSource(new ReliableSource());
469
470
// Separate processing branches with different failure characteristics
471
DataStream<Integer> criticalPath = mainStream
472
.filter(x -> x % 2 == 0)
473
.map(new CriticalProcessor()); // Must not fail
474
475
DataStream<Integer> bestEffortPath = mainStream
476
.filter(x -> x % 2 == 1)
477
.map(new BestEffortProcessor()); // Can tolerate failures
478
479
criticalPath.addSink(new GuaranteedSink<>());
480
bestEffortPath.addSink(new BestEffortSink<>());
481
```