0
# State Management and Checkpointing
1
2
The state management system provides pluggable state backends and checkpointing mechanisms for fault tolerance and exactly-once processing guarantees. This system enables stateful stream processing applications to maintain state consistently across failures and restarts.
3
4
## State Backend System
5
6
### StateBackend
7
8
The primary interface for state storage and checkpointing backends. State backends determine where and how state is stored during execution and checkpointing.
9
10
```java { .api }
11
public interface StateBackend extends Serializable {
12
<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
13
Environment env,
14
JobID jobID,
15
String operatorIdentifier,
16
TypeSerializer<K> keySerializer,
17
int numberOfKeyGroups,
18
KeyGroupRange keyGroupRange,
19
TaskKvStateRegistry kvStateRegistry
20
) throws Exception;
21
22
OperatorStateBackend createOperatorStateBackend(
23
Environment env,
24
String operatorIdentifier
25
) throws Exception;
26
27
CompletableFuture<CheckpointStorageLocation> resolveCheckpoint(String checkpointPointer) throws IOException;
28
}
29
```
30
31
### StateBackendFactory
32
33
Factory interface for creating state backend instances from configuration.
34
35
```java { .api }
36
public interface StateBackendFactory<T extends StateBackend> {
37
T createFromConfig(Configuration config) throws IllegalConfigurationException;
38
String getIdentifier();
39
}
40
```
41
42
## Function State Contexts
43
44
### FunctionInitializationContext
45
46
Context provided to user functions during initialization to set up managed and keyed state.
47
48
```java { .api }
49
public interface FunctionInitializationContext {
50
boolean isRestored();
51
52
OperatorStateStore getOperatorStateStore();
53
KeyedStateStore getKeyedStateStore();
54
55
ManagedInitializationContext getManagedInitializationContext();
56
}
57
```
58
59
### FunctionSnapshotContext
60
61
Context provided to user functions during state snapshotting operations.
62
63
```java { .api }
64
public interface FunctionSnapshotContext {
65
long getCheckpointId();
66
long getCheckpointTimestamp();
67
}
68
```
69
70
### StateSnapshotContext
71
72
General context interface for state snapshotting operations.
73
74
```java { .api }
75
public interface StateSnapshotContext {
76
long getCheckpointId();
77
long getCheckpointTimestamp();
78
CheckpointStreamFactory getCheckpointStreamFactory();
79
}
80
```
81
82
## State Stores
83
84
### OperatorStateStore
85
86
Store for operator state that is partitioned per parallel operator instance.
87
88
```java { .api }
89
public interface OperatorStateStore {
90
<T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor) throws Exception;
91
<T> ListState<T> getUnionListState(ListStateDescriptor<T> stateDescriptor) throws Exception;
92
93
<T> BroadcastState<String, T> getBroadcastState(MapStateDescriptor<String, T> stateDescriptor) throws Exception;
94
95
Set<String> getRegisteredStateNames();
96
Set<String> getRegisteredBroadcastStateNames();
97
}
98
```
99
100
### KeyedStateStore
101
102
Store for keyed state that is partitioned and scoped by key.
103
104
```java { .api }
105
public interface KeyedStateStore {
106
<T> ValueState<T> getState(ValueStateDescriptor<T> stateDescriptor) throws Exception;
107
<T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor) throws Exception;
108
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateDescriptor) throws Exception;
109
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor) throws Exception;
110
<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateDescriptor) throws Exception;
111
<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateDescriptor) throws Exception;
112
}
113
```
114
115
## Stream Providers
116
117
### StatePartitionStreamProvider
118
119
Provides access to state partition streams during restore operations.
120
121
```java { .api }
122
public interface StatePartitionStreamProvider {
123
FSDataInputStream getStream() throws IOException;
124
}
125
```
126
127
## Checkpoint Coordination
128
129
### CheckpointCoordinator
130
131
Coordinates the distributed checkpointing process across all operators in a job.
132
133
```java { .api }
134
public class CheckpointCoordinator {
135
public CheckpointCoordinator(
136
JobID job,
137
CheckpointConfig chkConfig,
138
ExecutionVertex[] tasksToTrigger,
139
ExecutionVertex[] tasksToWaitFor,
140
ExecutionVertex[] tasksToCommitTo,
141
ClassLoader userClassLoader,
142
CheckpointIDCounter checkpointIdCounter,
143
CompletedCheckpointStore completedCheckpointStore,
144
StateBackend checkpointStateBackend,
145
Executor executor,
146
CheckpointFailureManager failureManager
147
);
148
149
public void startCheckpointScheduler();
150
public void stopCheckpointScheduler();
151
152
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
153
CheckpointProperties props,
154
String externalSavepointLocation,
155
boolean isPeriodic
156
);
157
158
public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message);
159
public void receiveDeclineMessage(DeclineCheckpoint message);
160
161
public void restoreLatestCheckpointedState(
162
Map<JobVertexID, ExecutionJobVertex> tasks,
163
boolean errorIfNoCheckpoint,
164
boolean allOrNothingState
165
) throws Exception;
166
}
167
```
168
169
### CheckpointMetaData
170
171
Metadata associated with checkpoints.
172
173
```java { .api }
174
public class CheckpointMetaData implements Serializable {
175
public CheckpointMetaData(long checkpointId, long timestamp);
176
177
public long getCheckpointId();
178
public long getTimestamp();
179
}
180
```
181
182
### CheckpointOptions
183
184
Configuration options for checkpoint behavior.
185
186
```java { .api }
187
public class CheckpointOptions implements Serializable {
188
public CheckpointOptions(CheckpointType checkpointType, CheckpointStorageLocationReference targetLocation);
189
190
public CheckpointType getCheckpointType();
191
public CheckpointStorageLocationReference getTargetLocation();
192
193
public static CheckpointOptions forCheckpointWithDefaultLocation();
194
public static CheckpointOptions forSavepoint(CheckpointStorageLocationReference location);
195
}
196
```
197
198
### CheckpointType
199
200
Enumeration of checkpoint types.
201
202
```java { .api }
203
public enum CheckpointType {
204
CHECKPOINT(false),
205
SAVEPOINT(true);
206
207
private final boolean isSavepoint;
208
209
public boolean isSavepoint();
210
}
211
```
212
213
## Exception Handling
214
215
### CheckpointException
216
217
Exception for checkpoint-related failures.
218
219
```java { .api }
220
public class CheckpointException extends FlinkException {
221
public CheckpointException(String message);
222
public CheckpointException(String message, Throwable cause);
223
public CheckpointException(CheckpointFailureReason reason);
224
public CheckpointException(CheckpointFailureReason reason, Throwable cause);
225
226
public CheckpointFailureReason getCheckpointFailureReason();
227
}
228
```
229
230
### CheckpointFailureReason
231
232
Enumeration of checkpoint failure reasons.
233
234
```java { .api }
235
public enum CheckpointFailureReason {
236
CHECKPOINT_COORDINATOR_SHUTDOWN,
237
CHECKPOINT_COORDINATOR_SUSPEND,
238
CHECKPOINT_DECLINED_TASK_NOT_READY,
239
CHECKPOINT_DECLINED_SUBSUMED,
240
CHECKPOINT_DECLINED_TIME_OUT,
241
CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED,
242
CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER,
243
CHECKPOINT_DECLINED_INPUT_END_OF_STREAM,
244
CHECKPOINT_ASYNC_EXCEPTION,
245
CHECKPOINT_EXPIRED,
246
TASK_CHECKPOINT_FAILURE,
247
TASK_FAILURE_DURING_CHECKPOINT,
248
UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE,
249
TRIGGER_CHECKPOINT_FAILURE,
250
FINALIZE_CHECKPOINT_FAILURE,
251
IO_EXCEPTION;
252
}
253
```
254
255
## Usage Examples
256
257
### Implementing Stateful Functions
258
259
```java
260
import org.apache.flink.runtime.state.*;
261
import org.apache.flink.api.common.state.*;
262
import org.apache.flink.api.common.typeinfo.TypeHint;
263
264
public class StatefulMapFunction extends RichMapFunction<String, String>
265
implements CheckpointedFunction {
266
267
private ValueState<Integer> countState;
268
private ListState<String> bufferState;
269
270
@Override
271
public void initializeState(FunctionInitializationContext context) throws Exception {
272
// Initialize keyed state
273
ValueStateDescriptor<Integer> countDescriptor =
274
new ValueStateDescriptor<>("count", Integer.class);
275
countState = context.getKeyedStateStore().getState(countDescriptor);
276
277
// Initialize operator state
278
ListStateDescriptor<String> bufferDescriptor =
279
new ListStateDescriptor<>("buffer", String.class);
280
bufferState = context.getOperatorStateStore().getListState(bufferDescriptor);
281
282
// Restore state if recovering from checkpoint
283
if (context.isRestored()) {
284
System.out.println("Restoring state from checkpoint");
285
}
286
}
287
288
@Override
289
public void snapshotState(FunctionSnapshotContext context) throws Exception {
290
// State is automatically managed for managed state
291
System.out.println("Taking snapshot for checkpoint: " + context.getCheckpointId());
292
}
293
294
@Override
295
public String map(String value) throws Exception {
296
// Use keyed state
297
Integer currentCount = countState.value();
298
if (currentCount == null) {
299
currentCount = 0;
300
}
301
countState.update(currentCount + 1);
302
303
// Use operator state
304
bufferState.add(value);
305
306
return value + " (processed " + (currentCount + 1) + " times)";
307
}
308
}
309
```
310
311
### Configuring State Backends
312
313
```java
314
import org.apache.flink.runtime.state.StateBackend;
315
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
316
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
317
318
// Configure memory state backend (for testing/development)
319
StateBackend memoryBackend = new MemoryStateBackend();
320
321
// Configure filesystem state backend (for production)
322
StateBackend fsBackend = new FsStateBackend("file:///checkpoints");
323
324
// Configure state backend in job configuration
325
Configuration jobConfig = new Configuration();
326
jobConfig.setString("state.backend", "filesystem");
327
jobConfig.setString("state.checkpoints.dir", "file:///checkpoints");
328
jobConfig.setString("state.savepoints.dir", "file:///savepoints");
329
330
// Set advanced checkpointing options
331
jobConfig.setLong("execution.checkpointing.interval", 30000L);
332
jobConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
333
jobConfig.setLong("execution.checkpointing.timeout", 600000L);
334
jobConfig.setInteger("execution.checkpointing.max-concurrent-checkpoints", 1);
335
jobConfig.setLong("execution.checkpointing.min-pause", 5000L);
336
```
337
338
### Checkpoint Coordination Setup
339
340
```java
341
import org.apache.flink.runtime.checkpoint.*;
342
343
// Set up checkpoint coordinator
344
CheckpointConfig checkpointConfig = new CheckpointConfig();
345
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
346
checkpointConfig.setCheckpointInterval(30000); // 30 seconds
347
checkpointConfig.setCheckpointTimeout(600000); // 10 minutes
348
checkpointConfig.setMaxConcurrentCheckpoints(1);
349
checkpointConfig.setMinPauseBetweenCheckpoints(5000); // 5 seconds
350
351
// Configure checkpoint retention
352
checkpointConfig.enableExternalizedCheckpoints(
353
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
354
355
// Set up checkpoint storage
356
checkpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoints"));
357
358
CheckpointCoordinator coordinator = new CheckpointCoordinator(
359
jobGraph.getJobID(),
360
checkpointConfig,
361
tasksToTrigger,
362
tasksToWaitFor,
363
tasksToCommitTo,
364
userClassLoader,
365
checkpointIdCounter,
366
completedCheckpointStore,
367
stateBackend,
368
executor,
369
failureManager
370
);
371
372
// Start periodic checkpointing
373
coordinator.startCheckpointScheduler();
374
```
375
376
### Manual Checkpoint Triggering
377
378
```java
379
// Trigger a checkpoint manually
380
CheckpointProperties properties = CheckpointProperties.forCheckpoint(
381
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION);
382
383
CompletableFuture<CompletedCheckpoint> checkpointFuture = coordinator.triggerCheckpoint(
384
properties,
385
null, // no external savepoint location
386
false // not periodic
387
);
388
389
checkpointFuture.whenComplete((checkpoint, throwable) -> {
390
if (throwable != null) {
391
System.err.println("Checkpoint failed: " + throwable.getMessage());
392
} else {
393
System.out.println("Checkpoint completed: " + checkpoint.getCheckpointID());
394
}
395
});
396
```
397
398
## Common Patterns
399
400
### State Migration
401
402
```java
403
// Handle state schema evolution
404
@Override
405
public void initializeState(FunctionInitializationContext context) throws Exception {
406
ValueStateDescriptor<MyState> descriptor = new ValueStateDescriptor<>(
407
"myState",
408
new MyStateTypeSerializer()
409
);
410
411
// Configure state migration
412
descriptor.initializeSerializerUnlessSet(new MyStateTypeSerializer());
413
414
state = context.getKeyedStateStore().getState(descriptor);
415
}
416
```
417
418
### Broadcast State
419
420
```java
421
// Set up broadcast state for configuration
422
MapStateDescriptor<String, Configuration> configDescriptor =
423
new MapStateDescriptor<>("config", String.class, Configuration.class);
424
425
BroadcastState<String, Configuration> broadcastState =
426
context.getOperatorStateStore().getBroadcastState(configDescriptor);
427
428
// Update broadcast state
429
broadcastState.put("global-config", newConfiguration);
430
431
// Read from broadcast state in processing function
432
Configuration config = broadcastState.get("global-config");
433
```