0
# State Management and Checkpointing
1
2
Comprehensive state management with pluggable backends and distributed checkpointing for fault tolerance. Flink provides exactly-once processing guarantees through its advanced checkpointing mechanism and flexible state backends.
3
4
## Capabilities
5
6
### CheckpointCoordinator
7
8
Central coordinator for checkpointing in Flink jobs, managing checkpoint triggering, completion, and recovery across all tasks.
9
10
```java { .api }
11
/**
12
* The checkpoint coordinator coordinates the distributed snapshots of operators and state.
13
* It triggers the checkpoint by sending messages to the relevant tasks and collects the
14
* checkpoint acknowledgements. It also maintains and cleans up the checkpoint meta data.
15
*/
16
public class CheckpointCoordinator {
17
/** Trigger a periodic checkpoint for the job */
18
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic);
19
20
/** Trigger a checkpoint of the specified type */
21
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);
22
23
/** Trigger a savepoint */
24
public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
25
@Nullable String targetLocation,
26
SavepointFormatType formatType
27
);
28
29
30
/** Shutdown the checkpoint coordinator */
31
public void shutdown() throws Exception;
32
33
/** Get the checkpoint store */
34
public CompletedCheckpointStore getCheckpointStore();
35
36
/** Start the checkpoint scheduler */
37
public void startCheckpointScheduler();
38
39
/** Check if periodic checkpointing has been started */
40
public boolean isPeriodicCheckpointingStarted();
41
42
/** Restore from the latest checkpoint */
43
public boolean restoreLatestCheckpointedStateToAll(
44
Set<ExecutionJobVertex> tasks,
45
boolean errorIfNoCheckpoint
46
);
47
48
/** Acknowledge checkpoint from task */
49
public void receiveAcknowledgeMessage(
50
AcknowledgeCheckpoint message,
51
String taskManagerLocationInfo
52
) throws CheckpointException;
53
54
/** Handle checkpoint decline from task */
55
public void receiveDeclineMessage(
56
DeclineCheckpoint message,
57
String taskManagerLocationInfo
58
);
59
60
/** Get number of retained checkpoints */
61
public int getNumberOfRetainedSuccessfulCheckpoints();
62
63
/** Get number of pending checkpoints */
64
public int getNumberOfPendingCheckpoints();
65
66
/** Get checkpoint timeout */
67
public long getCheckpointTimeout();
68
69
/** Check if periodic checkpointing is enabled */
70
public boolean isPeriodicCheckpointingConfigured();
71
}
72
```
73
74
**Usage Examples:**
75
76
```java
77
// Create checkpoint coordinator configuration
78
CheckpointCoordinatorConfiguration checkpointConfig =
79
new CheckpointCoordinatorConfiguration.Builder()
80
.setCheckpointInterval(5000L) // 5 seconds
81
.setCheckpointTimeout(60000L) // 1 minute
82
.setMaxConcurrentCheckpoints(1)
83
.setMinPauseBetweenCheckpoints(1000L)
84
.setPreferCheckpointForRecovery(true)
85
.setTolerableCheckpointFailureNumber(3)
86
.build();
87
88
// Enable checkpointing on execution graph
89
executionGraph.enableCheckpointing(
90
checkpointConfig,
91
masterTriggerRestoreHooks,
92
checkpointIdCounter,
93
completedCheckpointStore,
94
stateBackend,
95
checkpointStorage,
96
statsTracker,
97
checkpointsCleaner
98
);
99
100
// Trigger manual checkpoint
101
CheckpointCoordinator coordinator = executionGraph.getCheckpointCoordinator();
102
CompletableFuture<CompletedCheckpoint> checkpointFuture =
103
coordinator.triggerCheckpoint(
104
CheckpointType.CHECKPOINT,
105
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
106
null, // external location
107
false, // not periodic
108
System.currentTimeMillis()
109
);
110
111
checkpointFuture.thenAccept(checkpoint -> {
112
System.out.println("Checkpoint " + checkpoint.getCheckpointId() + " completed");
113
});
114
```
115
116
### KeyedStateBackend
117
118
Backend for managing keyed state (state associated with keys) with support for different storage engines.
119
120
```java { .api }
121
/**
122
* A keyed state backend provides methods for managing keyed state.
123
*/
124
public interface KeyedStateBackend<K> extends KeyedState, Disposable {
125
/** Create or retrieve a keyed state */
126
<T> InternalKvState<K, ?, T> createState(
127
StateDescriptor<?, T> stateDescriptor,
128
TypeSerializer<T> namespaceSerializer
129
);
130
131
/** Get partitioned state for a specific namespace */
132
<N, S extends State, T> S getPartitionedState(
133
N namespace,
134
TypeSerializer<N> namespaceSerializer,
135
StateDescriptor<S, T> stateDescriptor
136
);
137
138
/** Take a snapshot of the keyed state */
139
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
140
long checkpointId,
141
long timestamp,
142
CheckpointStreamFactory streamFactory,
143
CheckpointOptions checkpointOptions
144
);
145
146
/** Restore from a state handle */
147
void restore(StateHandles<KeyedStateHandle> restoredState);
148
149
/** Get current key */
150
K getCurrentKey();
151
152
/** Set current key context */
153
void setCurrentKey(K newKey);
154
155
/** Get key serializer */
156
TypeSerializer<K> getKeySerializer();
157
158
/** Get key groups for this backend */
159
KeyGroupRange getKeyGroupRange();
160
161
/** Get number of key groups */
162
int getNumberOfKeyGroups();
163
164
/** Close the backend and release resources */
165
void close();
166
167
/** Dispose the backend */
168
void dispose();
169
170
/** Apply state to a key group */
171
void applyToAllKeys(
172
N namespace,
173
TypeSerializer<N> namespaceSerializer,
174
StateDescriptor<?, ?> stateDescriptor,
175
KeyedStateFunction<K, ?> function
176
);
177
178
/** Get approximate memory usage */
179
long getApproximateMemoryUsage();
180
}
181
```
182
183
### OperatorStateBackend
184
185
Backend for managing operator state (state not associated with keys) including list state and broadcast state.
186
187
```java { .api }
188
/**
189
* Interface for operator state backends. Operator state is state that is associated with
190
* parallel instances of an operator (tasks), as opposed to keyed state.
191
*/
192
public interface OperatorStateBackend extends Disposable {
193
/** Get list state for operator state */
194
<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor);
195
196
/** Get union list state (combines state from all parallel instances) */
197
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor);
198
199
/** Get broadcast state for coordinating across parallel instances */
200
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor);
201
202
/** Take a snapshot of the operator state */
203
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
204
long checkpointId,
205
long timestamp,
206
CheckpointStreamFactory factory,
207
CheckpointOptions checkpointOptions
208
);
209
210
/** Restore from state handles */
211
void restore(StateHandles<OperatorStateHandle> stateHandles);
212
213
/** Close the backend */
214
void close();
215
216
/** Dispose the backend */
217
void dispose();
218
}
219
```
220
221
**Usage Examples:**
222
223
```java
224
// Using KeyedStateBackend in a task
225
public class MyKeyedTask extends AbstractInvokable {
226
private KeyedStateBackend<String> keyedStateBackend;
227
private ValueState<Integer> countState;
228
229
@Override
230
public void invoke() throws Exception {
231
// Get keyed state backend from runtime context
232
keyedStateBackend = getRuntimeContext().getKeyedStateBackend();
233
234
// Create state descriptor
235
ValueStateDescriptor<Integer> descriptor =
236
new ValueStateDescriptor<>("count", Integer.class);
237
238
// Get state
239
countState = keyedStateBackend.getPartitionedState(
240
VoidNamespace.INSTANCE,
241
VoidNamespaceSerializer.INSTANCE,
242
descriptor
243
);
244
245
// Use state
246
keyedStateBackend.setCurrentKey("user123");
247
Integer currentCount = countState.value();
248
countState.update(currentCount == null ? 1 : currentCount + 1);
249
}
250
}
251
252
// Using OperatorStateBackend
253
public class MyOperatorTask extends AbstractInvokable {
254
private OperatorStateBackend operatorStateBackend;
255
private ListState<String> bufferState;
256
257
@Override
258
public void invoke() throws Exception {
259
operatorStateBackend = getRuntimeContext().getOperatorStateBackend();
260
261
// Create list state for buffering
262
ListStateDescriptor<String> descriptor =
263
new ListStateDescriptor<>("buffer", String.class);
264
bufferState = operatorStateBackend.getListState(descriptor);
265
266
// Use state
267
bufferState.add("new_item");
268
Iterable<String> bufferedItems = bufferState.get();
269
}
270
}
271
```
272
273
### CompletedCheckpoint
274
275
Represents a successfully completed checkpoint with metadata and state handles.
276
277
```java { .api }
278
/**
279
* A completed checkpoint represents a snapshot of the state of all operators
280
* that has been acknowledged by all tasks.
281
*/
282
public class CompletedCheckpoint implements Serializable {
283
/** Get the checkpoint ID */
284
public long getCheckpointId();
285
286
/** Get checkpoint timestamp */
287
public long getTimestamp();
288
289
/** Get checkpoint duration in milliseconds */
290
public long getDuration();
291
292
/** Get total checkpoint size in bytes */
293
public long getSize();
294
295
/** Get external pointer (path) for this checkpoint */
296
public String getExternalPointer();
297
298
/** Get operator states */
299
public Map<OperatorID, OperatorState> getOperatorStates();
300
301
/** Get master hook states */
302
public Collection<MasterState> getMasterHookStates();
303
304
/** Get checkpoint properties */
305
public CheckpointProperties getProperties();
306
307
/** Check if checkpoint is discarded */
308
public boolean isDiscarded();
309
310
/** Discard the checkpoint and clean up resources */
311
public CompletableFuture<Void> discardOnSubsume();
312
313
/** Discard the checkpoint on cancellation */
314
public CompletableFuture<Void> discardOnCancellation();
315
316
/** Discard the checkpoint on shutdown */
317
public CompletableFuture<Void> discardOnShutdown(JobStatus jobStatus);
318
319
/** Get state size statistics */
320
public CheckpointStatsSummarySnapshot getStatsSummary();
321
}
322
```
323
324
### StateBackend Configuration
325
326
Configuration and factory classes for different state backends.
327
328
```java { .api }
329
/**
330
* Base class for configurable state backends
331
*/
332
public abstract class ConfigurableStateBackend implements StateBackend, Configurable {
333
/** Configure the state backend from configuration */
334
public abstract StateBackend configure(ReadableConfig config, ClassLoader classLoader);
335
336
/** Get default savepoint directory */
337
public abstract String getDefaultSavepointDirectory();
338
339
/** Check if state backend supports async snapshots */
340
public abstract boolean supportsAsynchronousSnapshots();
341
}
342
343
/**
344
* State backend loader utility
345
*/
346
public class StateBackendLoader {
347
/** Load state backend from configuration */
348
public static StateBackend loadStateBackendFromConfig(
349
ReadableConfig config,
350
ClassLoader classLoader,
351
String defaultStateBackend
352
);
353
354
/** Create state backend from factory class name */
355
public static StateBackend fromApplicationOrConfigOrDefault(
356
StateBackend fromApplication,
357
Configuration config,
358
ClassLoader classLoader,
359
String defaultStateBackend
360
);
361
}
362
```
363
364
### Checkpoint Storage
365
366
Configuration for where checkpoints are stored (filesystem, S3, etc.).
367
368
```java { .api }
369
/**
370
* Checkpoint storage defines how checkpoint data and metadata are persisted
371
*/
372
public interface CheckpointStorage {
373
/** Check if storage supports highly available storage */
374
boolean supportsHighlyAvailableStorage();
375
376
/** Check if storage has default savepoint location */
377
boolean hasDefaultSavepointLocation();
378
379
/** Get default savepoint directory */
380
String getDefaultSavepointDirectory();
381
382
/** Resolve checkpoint storage from configuration */
383
CheckpointStorageAccess resolveCheckpointStorageAccess(
384
JobID jobId,
385
CheckpointStorageAccessCoordinatorView coordinatorView
386
);
387
388
/** Create checkpoint storage access for workers */
389
CheckpointStorageWorkerView createCheckpointStorageWorkerView(
390
Configuration configuration,
391
ResourceID resourceId
392
);
393
}
394
395
/**
396
* Configurable checkpoint storage
397
*/
398
public abstract class ConfigurableCheckpointStorage
399
implements CheckpointStorage, Configurable {
400
401
/** Configure checkpoint storage from configuration */
402
public abstract CheckpointStorage configure(
403
ReadableConfig config,
404
ClassLoader classLoader
405
);
406
}
407
```
408
409
**Usage Examples:**
410
411
```java
412
// Configure different state backends
413
Configuration config = new Configuration();
414
415
// Memory state backend
416
config.setString(StateBackendOptions.STATE_BACKEND, "hashmap");
417
418
// Filesystem state backend
419
config.setString(StateBackendOptions.STATE_BACKEND, "filesystem");
420
config.setString(StateBackendOptions.CHECKPOINTS_DIRECTORY, "hdfs://cluster/checkpoints");
421
422
// RocksDB state backend
423
config.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
424
config.setString(StateBackendOptions.CHECKPOINTS_DIRECTORY, "s3://bucket/checkpoints");
425
config.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY, true);
426
427
// Load state backend
428
StateBackend stateBackend = StateBackendLoader.loadStateBackendFromConfig(
429
config,
430
classLoader,
431
null
432
);
433
```
434
435
## Types
436
437
```java { .api }
438
// Checkpoint types and properties
439
public enum CheckpointType implements SnapshotType {
440
CHECKPOINT("Checkpoint"),
441
SAVEPOINT("Savepoint");
442
443
public String getName();
444
public boolean isSavepoint();
445
}
446
447
public class CheckpointProperties implements Serializable {
448
public static CheckpointProperties forCheckpoint(CheckpointRetentionPolicy retentionPolicy);
449
public static CheckpointProperties forSavepoint(boolean forced);
450
451
public CheckpointType getCheckpointType();
452
public CheckpointRetentionPolicy getRetentionPolicy();
453
public boolean isForced();
454
}
455
456
public enum CheckpointRetentionPolicy {
457
NEVER_RETAIN_AFTER_TERMINATION,
458
RETAIN_ON_CANCELLATION,
459
RETAIN_ON_FAILURE
460
}
461
462
// State handles
463
public interface StateHandle extends Serializable {
464
void discardState();
465
long getStateSize();
466
}
467
468
public interface KeyedStateHandle extends StateHandle {
469
KeyGroupRange getKeyGroupRange();
470
KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange);
471
}
472
473
public interface OperatorStateHandle extends StateHandle {
474
Map<String, OperatorStateHandle.StateMetaInfo> getStateNameToPartitionOffsets();
475
FSDataInputStream openInputStream();
476
}
477
478
// Checkpoint options and metadata
479
public class CheckpointOptions implements Serializable {
480
public CheckpointOptions(
481
CheckpointType checkpointType,
482
CheckpointStorageLocationReference targetLocation
483
);
484
485
public CheckpointType getCheckpointType();
486
public CheckpointStorageLocationReference getTargetLocation();
487
public boolean isExactlyOnceMode();
488
}
489
490
public class CheckpointMetaData implements Serializable {
491
public CheckpointMetaData(long checkpointId, long timestamp);
492
493
public long getCheckpointId();
494
public long getTimestamp();
495
}
496
497
// Snapshot results
498
public class SnapshotResult<T extends StateObject> implements Serializable {
499
public static <T extends StateObject> SnapshotResult<T> empty();
500
public static <T extends StateObject> SnapshotResult<T> of(T jobManagerState);
501
public static <T extends StateObject> SnapshotResult<T> withLocalState(
502
T jobManagerState,
503
T taskLocalState
504
);
505
506
public T getJobManagerOwnedSnapshot();
507
public T getTaskLocalSnapshot();
508
public long getStateSize();
509
}
510
```