0
# Fault Tolerance and Recovery Testing
1
2
Multiple failure injection mechanisms and recovery testing utilities for validating Flink's fault tolerance capabilities. This framework enables testing of job recovery scenarios, restart strategies, and failure handling behavior.
3
4
## Capabilities
5
6
### Simple Recovery Test Base
7
8
Abstract base class for testing job recovery scenarios with configurable failure injection and restart validation.
9
10
```java { .api }
11
/**
12
* Base class for testing job recovery scenarios
13
*/
14
public abstract class SimpleRecoveryITCaseBase {
15
16
/**
17
* Run job with controlled cancellation for recovery testing
18
* @param jobGraph JobGraph to execute and cancel
19
* @throws Exception if job execution or cancellation fails
20
*/
21
protected void runAndCancelJob(JobGraph jobGraph) throws Exception;
22
23
/**
24
* Failing mapper function that fails after processing a specified number of elements
25
*/
26
public static class FailingMapper1 implements MapFunction<Integer, Integer> {
27
/**
28
* Constructor for failing mapper
29
* @param failAfterElements number of elements to process before failing
30
*/
31
public FailingMapper1(int failAfterElements);
32
33
@Override
34
public Integer map(Integer value) throws Exception;
35
}
36
37
/**
38
* Alternative failing mapper with different failure patterns
39
*/
40
public static class FailingMapper2 implements MapFunction<Integer, Integer> {
41
/**
42
* Constructor for alternative failing mapper
43
* @param failAfterElements number of elements before failure
44
*/
45
public FailingMapper2(int failAfterElements);
46
47
@Override
48
public Integer map(Integer value) throws Exception;
49
}
50
51
/**
52
* Third variant of failing mapper for complex failure scenarios
53
*/
54
public static class FailingMapper3 implements MapFunction<Integer, Integer> {
55
/**
56
* Constructor for third failing mapper variant
57
* @param failAfterElements number of elements before failure
58
*/
59
public FailingMapper3(int failAfterElements);
60
61
@Override
62
public Integer map(Integer value) throws Exception;
63
}
64
}
65
```
66
67
### Stream Fault Tolerance Test Base
68
69
Comprehensive parameterized base class for testing stream processing fault tolerance with checkpointing, failure injection, and recovery validation.
70
71
```java { .api }
72
/**
73
* Parameterized base class for comprehensive fault tolerance testing in streaming scenarios
74
*/
75
@RunWith(Parameterized.class)
76
public abstract class StreamFaultToleranceTestBase {
77
78
/** Default parallelism for fault tolerance tests */
79
public static final int PARALLELISM = 12;
80
/** Number of task managers for test cluster */
81
public static final int NUM_TASK_MANAGERS = 3;
82
/** Number of task slots per task manager */
83
public static final int NUM_TASK_SLOTS = 4;
84
85
/**
86
* Enumeration of available failover strategies for testing
87
*/
88
public enum FailoverStrategy {
89
RestartAllFailoverStrategy,
90
RestartPipelinedRegionFailoverStrategy
91
}
92
93
/**
94
* POJO for counting prefixed values in fault tolerance tests
95
*/
96
public static class PrefixCount {
97
/** Prefix string */
98
public String prefix;
99
/** Integer value */
100
public Integer value;
101
/** Count of occurrences */
102
public Long count;
103
104
/**
105
* Default constructor for PrefixCount
106
*/
107
public PrefixCount();
108
109
/**
110
* Constructor with field initialization
111
* @param prefix prefix string
112
* @param value integer value
113
* @param count occurrence count
114
*/
115
public PrefixCount(String prefix, Integer value, Long count);
116
}
117
118
/**
119
* Current failover strategy being tested
120
*/
121
protected final FailoverStrategy failoverStrategy;
122
123
/**
124
* Constructor for parameterized test
125
* @param failoverStrategy failover strategy to test
126
*/
127
public StreamFaultToleranceTestBase(FailoverStrategy failoverStrategy);
128
129
/**
130
* Abstract method to define the test program topology
131
* @param env StreamExecutionEnvironment for building the job
132
* @return DataStream representing the final result
133
* @throws Exception if program construction fails
134
*/
135
public abstract DataStream<PrefixCount> testProgram(StreamExecutionEnvironment env) throws Exception;
136
137
/**
138
* Abstract method for post-submission actions and validation
139
* @throws Exception if post-submission actions fail
140
*/
141
public abstract void postSubmit() throws Exception;
142
143
/**
144
* Run the complete checkpointed program with fault injection
145
* @throws Exception if test execution fails
146
*/
147
public void runCheckpointedProgram() throws Exception;
148
149
/**
150
* Get parameters for parameterized testing of different failover strategies
151
* @return Collection of failover strategy parameters
152
*/
153
@Parameterized.Parameters(name = "Failover strategy: {0}")
154
public static Collection<FailoverStrategy[]> parameters();
155
156
/**
157
* Create test environment configuration with fault tolerance settings
158
* @return Configuration for test environment
159
*/
160
protected Configuration createTestConfiguration();
161
162
/**
163
* Trigger failure in the running job for fault tolerance testing
164
* @param jobId JobID of the running job
165
* @throws Exception if failure triggering fails
166
*/
167
protected void triggerFailure(JobID jobId) throws Exception;
168
169
/**
170
* Wait for job to reach running state
171
* @param jobId JobID to monitor
172
* @param timeout timeout in milliseconds
173
* @throws Exception if job doesn't reach running state within timeout
174
*/
175
protected void waitForJobRunning(JobID jobId, long timeout) throws Exception;
176
177
/**
178
* Validate checkpointing behavior
179
* @param jobId JobID to validate
180
* @return boolean indicating if checkpointing is working correctly
181
* @throws Exception if validation fails
182
*/
183
protected boolean validateCheckpointing(JobID jobId) throws Exception;
184
}
185
```
186
187
### Restart Strategy Test Bases
188
189
Abstract base classes for testing different restart strategies with Flink's fault tolerance mechanisms.
190
191
```java { .api }
192
/**
193
* Base class for testing fixed delay restart strategy
194
*/
195
public abstract class SimpleRecoveryFixedDelayRestartStrategyITBase
196
extends SimpleRecoveryITCaseBase {
197
198
/**
199
* Test recovery with fixed delay between restart attempts
200
* @param delayMs delay in milliseconds between restarts
201
* @param maxAttempts maximum number of restart attempts
202
* @throws Exception if test execution fails
203
*/
204
protected void testFixedDelayRestart(long delayMs, int maxAttempts) throws Exception;
205
}
206
207
/**
208
* Base class for testing exponential delay restart strategy
209
*/
210
public abstract class SimpleRecoveryExponentialDelayRestartStrategyITBase
211
extends SimpleRecoveryITCaseBase {
212
213
/**
214
* Test recovery with exponential backoff delay
215
* @param initialDelayMs initial delay in milliseconds
216
* @param maxDelayMs maximum delay in milliseconds
217
* @param backoffMultiplier multiplier for exponential backoff
218
* @throws Exception if test execution fails
219
*/
220
protected void testExponentialDelayRestart(
221
long initialDelayMs,
222
long maxDelayMs,
223
double backoffMultiplier) throws Exception;
224
}
225
226
/**
227
* Base class for testing failure rate restart strategy
228
*/
229
public abstract class SimpleRecoveryFailureRateStrategyITBase
230
extends SimpleRecoveryITCaseBase {
231
232
/**
233
* Test recovery based on failure rate thresholds
234
* @param maxFailuresPerInterval maximum failures allowed per time interval
235
* @param failureRateIntervalMs time interval for failure rate calculation
236
* @param delayMs delay between restart attempts
237
* @throws Exception if test execution fails
238
*/
239
protected void testFailureRateRestart(
240
int maxFailuresPerInterval,
241
long failureRateIntervalMs,
242
long delayMs) throws Exception;
243
}
244
```
245
246
### Recovery Test Utilities
247
248
Utility classes providing common recovery testing functionality and helper methods.
249
250
```java { .api }
251
/**
252
* Utility class for recovery testing scenarios
253
*/
254
public class RecoveryTestUtils {
255
256
/**
257
* Create job graph with configurable failure injection
258
* @param sourceParallelism parallelism for source operator
259
* @param mapParallelism parallelism for map operator
260
* @param failAfterElements elements to process before failure
261
* @return JobGraph configured for recovery testing
262
*/
263
public static JobGraph createJobWithFailure(
264
int sourceParallelism,
265
int mapParallelism,
266
int failAfterElements);
267
268
/**
269
* Validate job recovery metrics and behavior
270
* @param jobExecutionResult result from job execution
271
* @param expectedRestarts expected number of restarts
272
* @return boolean indicating if recovery behavior is correct
273
*/
274
public static boolean validateRecoveryBehavior(
275
JobExecutionResult jobExecutionResult,
276
int expectedRestarts);
277
}
278
```
279
280
**Usage Examples:**
281
282
```java
283
import org.apache.flink.test.recovery.SimpleRecoveryITCaseBase;
284
import org.apache.flink.test.recovery.SimpleRecoveryFixedDelayRestartStrategyITBase;
285
import org.apache.flink.test.recovery.utils.RecoveryTestUtils;
286
287
// Basic recovery test
288
public class JobRecoveryTest extends SimpleRecoveryITCaseBase {
289
290
@Test
291
public void testSimpleJobRecovery() throws Exception {
292
// Create job with failing mapper
293
JobGraph job = new JobGraph();
294
295
// Add source
296
JobVertex source = new JobVertex("source");
297
source.setInvokableClass(NumberSequenceSource.class);
298
source.setParallelism(1);
299
job.addVertex(source);
300
301
// Add failing mapper
302
JobVertex mapper = new JobVertex("mapper");
303
mapper.setInvokableClass(FailingMapper1.class);
304
mapper.getConfiguration().setInteger("fail-after", 50);
305
mapper.setParallelism(2);
306
job.addVertex(mapper);
307
308
// Connect vertices
309
mapper.connectNewDataSetAsInput(source, DistributionPattern.REBALANCE);
310
311
// Test recovery
312
runAndCancelJob(job);
313
}
314
315
@Test
316
public void testMultipleFailureRecovery() throws Exception {
317
JobGraph job = RecoveryTestUtils.createJobWithFailure(1, 2, 30);
318
319
// Configure restart strategy
320
job.getJobConfiguration().setString(
321
"restart-strategy", "fixed-delay");
322
job.getJobConfiguration().setString(
323
"restart-strategy.fixed-delay.attempts", "3");
324
job.getJobConfiguration().setString(
325
"restart-strategy.fixed-delay.delay", "1s");
326
327
JobExecutionResult result = runJobWithExpectedFailures(job);
328
329
// Validate recovery behavior
330
assertTrue(RecoveryTestUtils.validateRecoveryBehavior(result, 2));
331
}
332
}
333
334
// Fixed delay restart strategy test
335
public class FixedDelayRecoveryTest extends SimpleRecoveryFixedDelayRestartStrategyITBase {
336
337
@Test
338
public void testFixedDelayStrategy() throws Exception {
339
// Test with 2 second delay, maximum 3 attempts
340
testFixedDelayRestart(2000L, 3);
341
}
342
343
@Test
344
public void testFixedDelayWithQuickRecovery() throws Exception {
345
// Test with 500ms delay for quick recovery scenarios
346
testFixedDelayRestart(500L, 5);
347
}
348
}
349
350
// Comprehensive recovery testing
351
public class ComprehensiveRecoveryTest extends SimpleRecoveryITCaseBase {
352
353
@Test
354
public void testCascadingFailures() throws Exception {
355
JobGraph job = new JobGraph();
356
357
// Chain multiple failing mappers
358
JobVertex source = createSourceVertex();
359
JobVertex mapper1 = createMapperVertex(new FailingMapper1(20));
360
JobVertex mapper2 = createMapperVertex(new FailingMapper2(40));
361
JobVertex mapper3 = createMapperVertex(new FailingMapper3(60));
362
JobVertex sink = createSinkVertex();
363
364
// Connect in chain
365
mapper1.connectNewDataSetAsInput(source, DistributionPattern.FORWARD);
366
mapper2.connectNewDataSetAsInput(mapper1, DistributionPattern.FORWARD);
367
mapper3.connectNewDataSetAsInput(mapper2, DistributionPattern.FORWARD);
368
sink.connectNewDataSetAsInput(mapper3, DistributionPattern.FORWARD);
369
370
job.addVertex(source);
371
job.addVertex(mapper1);
372
job.addVertex(mapper2);
373
job.addVertex(mapper3);
374
job.addVertex(sink);
375
376
// Test complex recovery scenario
377
runAndCancelJob(job);
378
}
379
}
380
```