0
# Checkpointing and Fault Tolerance
1
2
Apache Flink provides fault tolerance through checkpointing, which creates consistent snapshots of application state. This enables exactly-once processing guarantees and recovery from failures.
3
4
## Capabilities
5
6
### Checkpoint Configuration
7
8
Configure checkpointing behavior and fault tolerance settings.
9
10
```java { .api }
11
/**
12
* Enable checkpointing with specified interval
13
* @param interval - checkpoint interval in milliseconds
14
*/
15
StreamExecutionEnvironment enableCheckpointing(long interval);
16
17
/**
18
* Enable checkpointing with interval and mode
19
* @param interval - checkpoint interval in milliseconds
20
* @param mode - checkpointing mode (EXACTLY_ONCE or AT_LEAST_ONCE)
21
*/
22
StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode);
23
24
/**
25
* Get checkpoint configuration for advanced settings
26
*/
27
CheckpointConfig getCheckpointConfig();
28
```
29
30
**Usage Examples:**
31
32
```java
33
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
34
35
// Enable checkpointing every 5 seconds
36
env.enableCheckpointing(5000);
37
38
// Enable with specific mode
39
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
40
41
// Advanced configuration
42
CheckpointConfig config = env.getCheckpointConfig();
43
config.setMinPauseBetweenCheckpoints(500);
44
config.setCheckpointTimeout(60000);
45
config.setMaxConcurrentCheckpoints(1);
46
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
47
```
48
49
### Checkpoint Configuration Options
50
51
Fine-tune checkpoint behavior with various configuration options.
52
53
```java { .api }
54
/**
55
* Set checkpointing mode
56
* @param checkpointingMode - EXACTLY_ONCE or AT_LEAST_ONCE
57
*/
58
void setCheckpointingMode(CheckpointingMode checkpointingMode);
59
60
/**
61
* Set minimum pause between checkpoints
62
* @param minPauseBetweenCheckpoints - minimum pause in milliseconds
63
*/
64
void setMinPauseBetweenCheckpoints(long minPauseBetweenCheckpoints);
65
66
/**
67
* Set checkpoint timeout
68
* @param checkpointTimeout - timeout in milliseconds
69
*/
70
void setCheckpointTimeout(long checkpointTimeout);
71
72
/**
73
* Set maximum concurrent checkpoints
74
* @param maxConcurrentCheckpoints - maximum number of concurrent checkpoints
75
*/
76
void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints);
77
78
/**
79
* Enable externalized checkpoints
80
* @param cleanupMode - cleanup mode for externalized checkpoints
81
*/
82
void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanupMode);
83
84
/**
85
* Set whether to fail on checkpointing errors
86
* @param failOnCheckpointingErrors - true to fail job on checkpoint errors
87
*/
88
void setFailOnCheckpointingErrors(boolean failOnCheckpointingErrors);
89
90
/**
91
* Set tolerable checkpoint failure number
92
* @param tolerableCheckpointFailureNumber - number of tolerable failures
93
*/
94
void setTolerableCheckpointFailureNumber(int tolerableCheckpointFailureNumber);
95
```
96
97
**Usage Examples:**
98
99
```java
100
CheckpointConfig config = env.getCheckpointConfig();
101
102
// Set checkpointing mode
103
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
104
105
// Minimum pause between checkpoints (prevents too frequent checkpoints)
106
config.setMinPauseBetweenCheckpoints(500);
107
108
// Checkpoint timeout (checkpoint fails if not completed in time)
109
config.setCheckpointTimeout(60000);
110
111
// Maximum concurrent checkpoints (usually 1 for exactly-once)
112
config.setMaxConcurrentCheckpoints(1);
113
114
// Externalized checkpoints (persist checkpoints for recovery)
115
config.enableExternalizedCheckpoints(
116
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
117
);
118
119
// Continue job execution even if checkpoint fails
120
config.setFailOnCheckpointingErrors(false);
121
122
// Allow up to 3 consecutive checkpoint failures
123
config.setTolerableCheckpointFailureNumber(3);
124
```
125
126
### Stateful Functions
127
128
Implement stateful functions that participate in checkpointing.
129
130
```java { .api }
131
/**
132
* Interface for functions that need to checkpoint state
133
*/
134
interface CheckpointedFunction {
135
/**
136
* Take a snapshot of the function's state
137
* @param context - snapshot context
138
*/
139
void snapshotState(FunctionSnapshotContext context) throws Exception;
140
141
/**
142
* Initialize or restore function state
143
* @param context - initialization context
144
*/
145
void initializeState(FunctionInitializationContext context) throws Exception;
146
}
147
148
/**
149
* Listener for checkpoint events
150
*/
151
interface CheckpointListener {
152
/**
153
* Notified when checkpoint is completed
154
* @param checkpointId - ID of completed checkpoint
155
*/
156
void notifyCheckpointComplete(long checkpointId) throws Exception;
157
158
/**
159
* Notified when checkpoint is aborted
160
* @param checkpointId - ID of aborted checkpoint
161
*/
162
default void notifyCheckpointAborted(long checkpointId) throws Exception {}
163
}
164
```
165
166
**Usage Examples:**
167
168
```java
169
public class StatefulMapFunction extends RichMapFunction<String, String>
170
implements CheckpointedFunction {
171
172
private ValueState<Integer> countState;
173
private ListState<String> bufferedElements;
174
175
// Transient state not included in checkpoints
176
private transient List<String> localBuffer;
177
178
@Override
179
public void open(Configuration parameters) throws Exception {
180
super.open(parameters);
181
182
// Initialize state descriptors
183
ValueStateDescriptor<Integer> countDescriptor =
184
new ValueStateDescriptor<>("count", Integer.class);
185
countState = getRuntimeContext().getState(countDescriptor);
186
187
localBuffer = new ArrayList<>();
188
}
189
190
@Override
191
public String map(String value) throws Exception {
192
// Use state in processing
193
Integer count = countState.value();
194
if (count == null) count = 0;
195
196
countState.update(count + 1);
197
localBuffer.add(value);
198
199
return value + "_" + count;
200
}
201
202
@Override
203
public void snapshotState(FunctionSnapshotContext context) throws Exception {
204
// Clear previous checkpoint data
205
bufferedElements.clear();
206
207
// Add current local buffer to checkpointed state
208
for (String element : localBuffer) {
209
bufferedElements.add(element);
210
}
211
}
212
213
@Override
214
public void initializeState(FunctionInitializationContext context) throws Exception {
215
// Initialize checkpointed state
216
ListStateDescriptor<String> bufferDescriptor =
217
new ListStateDescriptor<>("bufferedElements", String.class);
218
bufferedElements = context.getOperatorState().getListState(bufferDescriptor);
219
220
// Restore local buffer from checkpointed state
221
localBuffer = new ArrayList<>();
222
if (context.isRestored()) {
223
for (String element : bufferedElements.get()) {
224
localBuffer.add(element);
225
}
226
}
227
}
228
}
229
230
// Usage with checkpoint listener
231
public class CheckpointAwareFunction extends RichMapFunction<String, String>
232
implements CheckpointedFunction, CheckpointListener {
233
234
private ValueState<Long> lastCheckpointId;
235
236
@Override
237
public void initializeState(FunctionInitializationContext context) throws Exception {
238
ValueStateDescriptor<Long> descriptor =
239
new ValueStateDescriptor<>("lastCheckpointId", Long.class);
240
lastCheckpointId = context.getKeyedState().getState(descriptor);
241
}
242
243
@Override
244
public void snapshotState(FunctionSnapshotContext context) throws Exception {
245
lastCheckpointId.update(context.getCheckpointId());
246
}
247
248
@Override
249
public void notifyCheckpointComplete(long checkpointId) throws Exception {
250
// Perform cleanup or external system commits
251
System.out.println("Checkpoint " + checkpointId + " completed successfully");
252
}
253
254
@Override
255
public String map(String value) throws Exception {
256
return value.toUpperCase();
257
}
258
}
259
```
260
261
### State Backends
262
263
Configure different state backends for storing checkpointed state.
264
265
```java { .api }
266
// Configure state backend (typically done via configuration)
267
// MemoryStateBackend - for development/testing
268
// FsStateBackend - for production with file system storage
269
// RocksDBStateBackend - for large state
270
271
// Configuration in flink-conf.yaml:
272
// state.backend: filesystem
273
// state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
274
```
275
276
**Usage Examples:**
277
278
```java
279
// Configure state backend programmatically (not recommended for production)
280
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
281
282
// Memory state backend (for testing only)
283
env.setStateBackend(new MemoryStateBackend(10 * 1024 * 1024)); // 10MB
284
285
// File system state backend
286
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
287
288
// RocksDB state backend (for large state)
289
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
290
```
291
292
### Restart Strategies
293
294
Configure how jobs should restart after failures.
295
296
```java { .api }
297
// Configure restart strategy
298
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
299
3, // number of restart attempts
300
Time.of(10, TimeUnit.SECONDS) // delay between attempts
301
));
302
303
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
304
Time.milliseconds(1), // initial delay
305
Time.milliseconds(1000), // max delay
306
1.5, // backoff multiplier
307
Time.minutes(5), // reset time
308
0.1 // jitter
309
));
310
311
env.setRestartStrategy(RestartStrategies.failureRateRestart(
312
3, // max failures per interval
313
Time.of(5, TimeUnit.MINUTES), // failure rate interval
314
Time.of(10, TimeUnit.SECONDS) // delay between attempts
315
));
316
```
317
318
## Types
319
320
### Checkpoint Configuration Types
321
322
```java { .api }
323
// Checkpointing mode
324
enum CheckpointingMode {
325
EXACTLY_ONCE, // Exactly-once processing guarantees
326
AT_LEAST_ONCE // At-least-once processing guarantees
327
}
328
329
// Externalized checkpoint cleanup
330
enum ExternalizedCheckpointCleanup {
331
RETAIN_ON_CANCELLATION, // Keep checkpoints when job is cancelled
332
DELETE_ON_CANCELLATION // Delete checkpoints when job is cancelled
333
}
334
335
// Checkpoint configuration
336
class CheckpointConfig {
337
void setCheckpointingMode(CheckpointingMode mode);
338
void setMinPauseBetweenCheckpoints(long minPause);
339
void setCheckpointTimeout(long timeout);
340
void setMaxConcurrentCheckpoints(int maxConcurrent);
341
void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanup);
342
void setFailOnCheckpointingErrors(boolean failOnErrors);
343
void setTolerableCheckpointFailureNumber(int tolerableFailures);
344
}
345
```
346
347
### Stateful Function Interfaces
348
349
```java { .api }
350
// Checkpointed function interface
351
interface CheckpointedFunction {
352
void snapshotState(FunctionSnapshotContext context) throws Exception;
353
void initializeState(FunctionInitializationContext context) throws Exception;
354
}
355
356
// Checkpoint listener interface
357
interface CheckpointListener {
358
void notifyCheckpointComplete(long checkpointId) throws Exception;
359
default void notifyCheckpointAborted(long checkpointId) throws Exception {}
360
}
361
362
// Function snapshot context
363
interface FunctionSnapshotContext {
364
long getCheckpointId();
365
long getCheckpointTimestamp();
366
}
367
368
// Function initialization context
369
interface FunctionInitializationContext {
370
boolean isRestored();
371
OperatorStateStore getOperatorState();
372
KeyedStateStore getKeyedState();
373
}
374
```
375
376
### State Store Interfaces
377
378
```java { .api }
379
// Operator state store for non-keyed state
380
interface OperatorStateStore {
381
<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
382
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
383
<S> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception;
384
}
385
386
// Keyed state store for keyed state
387
interface KeyedStateStore {
388
<T> ValueState<T> getState(ValueStateDescriptor<T> stateDescriptor) throws Exception;
389
<T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor) throws Exception;
390
<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateDescriptor) throws Exception;
391
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateDescriptor) throws Exception;
392
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor) throws Exception;
393
}
394
```
395
396
### Restart Strategy Types
397
398
```java { .api }
399
// Restart strategies
400
class RestartStrategies {
401
static RestartStrategyConfiguration noRestart();
402
static RestartStrategyConfiguration fallBackRestart();
403
static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, Time delayBetweenAttempts);
404
static RestartStrategyConfiguration exponentialDelayRestart(Time initialBackoff, Time maxBackoff, double backoffMultiplier, Time resetBackoffThreshold, double jitter);
405
static RestartStrategyConfiguration failureRateRestart(int failureRate, Time failureInterval, Time delayInterval);
406
}
407
408
// Time utility for restart strategies
409
class Time {
410
static Time of(long size, TimeUnit unit);
411
static Time milliseconds(long milliseconds);
412
static Time seconds(long seconds);
413
static Time minutes(long minutes);
414
static Time hours(long hours);
415
}
416
```