0
# Fault Tolerance
1
2
Base classes and utilities for testing job cancellation, task failure recovery, and fault tolerance mechanisms. Provides controlled failure injection and recovery validation for comprehensive resilience testing.
3
4
## Core Base Classes
5
6
### CancelingTestBase
7
8
Abstract base class providing infrastructure for testing job cancellation scenarios with cluster management and execution control.
9
10
```java { .api }
11
public abstract class CancelingTestBase {
12
protected LocalFlinkMiniCluster cluster;
13
protected Configuration config;
14
15
@Before
16
public void setup() throws Exception;
17
18
@After
19
public void cleanup() throws Exception;
20
21
// Cluster lifecycle management
22
protected void startCluster() throws Exception;
23
protected void stopCluster() throws Exception;
24
25
// Job execution and cancellation
26
protected void runAndCancelJob(JobGraph jobGraph, int msecsTillCanceling) throws Exception;
27
protected void runAndCancelJob(JobGraph jobGraph, int msecsTillCanceling, boolean waitForCancel) throws Exception;
28
29
// Abstract methods for test implementation
30
protected abstract void testProgram(ExecutionEnvironment env);
31
protected abstract JobGraph getJobGraph() throws Exception;
32
}
33
```
34
35
### SimpleRecoveryITCaseBase
36
37
Abstract base class for testing task failure recovery scenarios with multi-attempt execution and failure injection.
38
39
```java { .api }
40
public abstract class SimpleRecoveryITCaseBase {
41
protected LocalFlinkMiniCluster cluster;
42
protected Configuration config;
43
protected int parallelism;
44
45
@Before
46
public void setup() throws Exception;
47
48
@After
49
public void cleanup() throws Exception;
50
51
// Recovery testing workflow
52
protected void execute() throws Exception;
53
protected void preSubmit() throws Exception;
54
protected void postSubmit() throws Exception;
55
56
// Abstract test program definition
57
protected abstract void testProgram(ExecutionEnvironment env);
58
}
59
```
60
61
## Recovery Execution Environment
62
63
### RecoveryITCaseBase
64
65
Extended base class providing additional recovery testing capabilities with configurable parallelism and failure scenarios.
66
67
```java { .api }
68
public abstract class RecoveryITCaseBase extends SimpleRecoveryITCaseBase {
69
protected int numTaskManagers;
70
protected int slotsPerTaskManager;
71
72
public RecoveryITCaseBase();
73
public RecoveryITCaseBase(Configuration config, int parallelism);
74
75
// Extended setup with custom configuration
76
protected void setupCluster(Configuration config, int numTaskManagers, int slotsPerTaskManager) throws Exception;
77
78
// Failure injection utilities
79
protected void injectTaskFailure(JobID jobId, int taskIndex) throws Exception;
80
protected void waitForRecovery(JobID jobId) throws Exception;
81
}
82
```
83
84
## Cancellation Testing Utilities
85
86
### CancelableInfiniteInputFormat
87
88
Input format that generates infinite data streams for cancellation testing.
89
90
```java { .api }
91
public class CancelableInfiniteInputFormat extends GenericInputFormat<Integer> {
92
private volatile boolean canceled;
93
94
public CancelableInfiniteInputFormat();
95
96
@Override
97
public boolean reachedEnd();
98
99
@Override
100
public Integer nextRecord(Integer reuse);
101
102
@Override
103
public void cancel();
104
}
105
```
106
107
### SlowlyDeserializingInputFormat
108
109
Input format with controllable deserialization delays for timeout and cancellation testing.
110
111
```java { .api }
112
public class SlowlyDeserializingInputFormat extends GenericInputFormat<Integer> {
113
private long deserializationDelay;
114
private int elementsToReturn;
115
116
public SlowlyDeserializingInputFormat(long deserializationDelay, int elementsToReturn);
117
118
@Override
119
public boolean reachedEnd();
120
121
@Override
122
public Integer nextRecord(Integer reuse);
123
}
124
```
125
126
## Recovery Testing Functions
127
128
### FailingMapper
129
130
MapFunction that intentionally fails after processing a specified number of elements.
131
132
```java { .api }
133
public class FailingMapper<T> implements MapFunction<T, T> {
134
private int failAfterElements;
135
private static volatile int processedElements;
136
137
public FailingMapper(int failAfterElements);
138
139
@Override
140
public T map(T value) throws Exception;
141
142
public static void reset();
143
}
144
```
145
146
### RecoveringFunction
147
148
Base class for functions that track failure and recovery across restart attempts.
149
150
```java { .api }
151
public abstract class RecoveringFunction {
152
protected static volatile int attemptNumber;
153
protected static volatile boolean hasFailed;
154
155
protected void trackAttempt();
156
protected boolean shouldFail();
157
protected void simulateFailure() throws Exception;
158
159
public static void reset();
160
public static int getAttemptNumber();
161
}
162
```
163
164
## Usage Examples
165
166
### Job Cancellation Testing
167
168
```java
169
public class MyCancellationTest extends CancelingTestBase {
170
171
@Test
172
public void testJobCancellation() throws Exception {
173
JobGraph jobGraph = getJobGraph();
174
175
// Run job and cancel after 5 seconds
176
runAndCancelJob(jobGraph, 5000);
177
178
// Verify clean cancellation
179
assertTrue("Job should be cancelled", jobWasCancelled);
180
}
181
182
@Override
183
protected void testProgram(ExecutionEnvironment env) {
184
// Create a long-running job that can be cancelled
185
env.createInput(new CancelableInfiniteInputFormat())
186
.map(x -> x * 2)
187
.map(new SlowProcessingMapper()) // Adds processing delay
188
.writeAsText("/tmp/cancellation-test-output");
189
}
190
191
@Override
192
protected JobGraph getJobGraph() throws Exception {
193
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
194
env.setParallelism(4);
195
testProgram(env);
196
return env.createProgramPlan().getJobGraph();
197
}
198
}
199
```
200
201
### Recovery Testing with Failure Injection
202
203
```java
204
public class MyRecoveryTest extends SimpleRecoveryITCaseBase {
205
206
@Test
207
public void testTaskFailureRecovery() throws Exception {
208
FailingMapper.reset(); // Reset failure counter
209
execute(); // Run test with recovery
210
}
211
212
@Override
213
protected void testProgram(ExecutionEnvironment env) {
214
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
215
216
env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
217
.map(new FailingMapper<>(5)) // Fails after 5 elements
218
.map(new RecoveringMapper()) // Handles recovery
219
.writeAsText("/tmp/recovery-test-output");
220
}
221
222
@Override
223
protected void postSubmit() throws Exception {
224
// Verify that recovery occurred
225
assertTrue("Should have failed at least once", FailingMapper.hasFailed());
226
assertTrue("Should have recovered", RecoveringMapper.hasRecovered());
227
228
// Verify output correctness
229
verifyOutputFile("/tmp/recovery-test-output");
230
}
231
}
232
```
233
234
### Custom Recovery Function Implementation
235
236
```java
237
public class StatefulRecoveringMapper extends RecoveringFunction implements MapFunction<Integer, Integer> {
238
private ValueState<Integer> counterState;
239
240
@Override
241
public void open(Configuration parameters) throws Exception {
242
ValueStateDescriptor<Integer> descriptor =
243
new ValueStateDescriptor<>("counter", Integer.class);
244
counterState = getRuntimeContext().getState(descriptor);
245
}
246
247
@Override
248
public Integer map(Integer value) throws Exception {
249
trackAttempt();
250
251
Integer count = counterState.value();
252
if (count == null) count = 0;
253
254
count++;
255
counterState.update(count);
256
257
// Fail on first attempt after processing 3 elements
258
if (getAttemptNumber() == 1 && count == 3) {
259
simulateFailure();
260
}
261
262
return value * count;
263
}
264
}
265
```
266
267
### Advanced Cancellation with Timeout
268
269
```java
270
public class TimeoutCancellationTest extends CancelingTestBase {
271
272
@Test
273
public void testCancellationWithTimeout() throws Exception {
274
JobGraph jobGraph = createSlowJobGraph();
275
276
long startTime = System.currentTimeMillis();
277
278
// Cancel job after 3 seconds, wait for cancellation
279
runAndCancelJob(jobGraph, 3000, true);
280
281
long duration = System.currentTimeMillis() - startTime;
282
283
// Verify cancellation happened within reasonable time
284
assertTrue("Cancellation took too long", duration < 10000);
285
}
286
287
private JobGraph createSlowJobGraph() throws Exception {
288
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
289
290
env.createInput(new SlowlyDeserializingInputFormat(1000, 100)) // 1 sec per element
291
.map(x -> {
292
Thread.sleep(500); // Additional processing delay
293
return x;
294
})
295
.writeAsText("/tmp/slow-job-output");
296
297
return env.createProgramPlan().getJobGraph();
298
}
299
}
300
```
301
302
### Multi-Stage Recovery Testing
303
304
```java
305
public class ComplexRecoveryTest extends RecoveryITCaseBase {
306
307
public ComplexRecoveryTest() {
308
super(new Configuration(), 4); // 4 parallel instances
309
}
310
311
@Test
312
public void testMultiStageRecovery() throws Exception {
313
// Configure multiple restart attempts
314
config.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
315
config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 5);
316
config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "1s");
317
318
execute();
319
}
320
321
@Override
322
protected void testProgram(ExecutionEnvironment env) {
323
env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
324
.map(new FailingMapper<>(3)) // Fail after 3 elements
325
.filter(new RecoveringFilter()) // Filter with recovery logic
326
.map(new ValidatingMapper()) // Validate state consistency
327
.collect(); // Force execution
328
}
329
330
@Override
331
protected void postSubmit() throws Exception {
332
// Verify multiple recovery attempts occurred
333
assertTrue("Should have multiple attempts", getAttemptNumber() > 1);
334
335
// Verify final state consistency
336
validateFinalResults();
337
}
338
}