0
# Checkpointing and State
1
2
Flink's checkpointing mechanism provides fault tolerance by creating consistent snapshots of streaming application state. This enables exactly-once processing semantics and recovery from failures.
3
4
## Core Checkpointing Interfaces
5
6
### Checkpointed<T>
7
8
Interface for functions that need to participate in checkpointing.
9
10
```java { .api }
11
public interface Checkpointed<T> extends Serializable {
12
T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
13
void restoreState(T state) throws Exception;
14
}
15
```
16
17
**Usage Example**:
18
19
```java
20
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
21
22
public class StatefulMapFunction implements MapFunction<String, String>, Checkpointed<Integer> {
23
private int counter = 0;
24
25
@Override
26
public String map(String value) throws Exception {
27
counter++;
28
return value + "_" + counter;
29
}
30
31
@Override
32
public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
33
return counter;
34
}
35
36
@Override
37
public void restoreState(Integer state) throws Exception {
38
counter = state;
39
}
40
}
41
```
42
43
### CheckpointedAsynchronously<T>
44
45
Interface for functions that can perform asynchronous checkpointing to avoid blocking stream processing.
46
47
```java { .api }
48
public interface CheckpointedAsynchronously<T> extends Checkpointed<T> {
49
// Inherits snapshotState and restoreState methods
50
// Indicates that snapshotState can be called asynchronously
51
}
52
```
53
54
**Usage Example**:
55
56
```java
57
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
58
59
public class AsyncCheckpointedSink implements SinkFunction<String>, CheckpointedAsynchronously<Map<String, Integer>> {
60
private Map<String, Integer> state = new HashMap<>();
61
62
@Override
63
public void invoke(String value) throws Exception {
64
state.put(value, state.getOrDefault(value, 0) + 1);
65
}
66
67
@Override
68
public Map<String, Integer> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
69
// Can be called asynchronously - return a copy of the state
70
return new HashMap<>(state);
71
}
72
73
@Override
74
public void restoreState(Map<String, Integer> restoredState) throws Exception {
75
state = restoredState;
76
}
77
}
78
```
79
80
## Checkpoint Committing
81
82
### CheckpointCommitter
83
84
Interface for committing checkpoints to external systems for additional durability guarantees.
85
86
```java { .api }
87
public interface CheckpointCommitter extends Serializable {
88
void commitCheckpoint(long checkpointId) throws Exception;
89
boolean isCheckpointCommitted(long checkpointId) throws Exception;
90
}
91
```
92
93
**Usage Example**:
94
95
```java
96
import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
97
98
public class DatabaseCheckpointCommitter implements CheckpointCommitter {
99
private final String connectionUrl;
100
101
public DatabaseCheckpointCommitter(String connectionUrl) {
102
this.connectionUrl = connectionUrl;
103
}
104
105
@Override
106
public void commitCheckpoint(long checkpointId) throws Exception {
107
// Commit checkpoint information to external database
108
// This ensures the checkpoint is durably stored
109
executeUpdate("INSERT INTO checkpoints (id, timestamp) VALUES (?, ?)",
110
checkpointId, System.currentTimeMillis());
111
}
112
113
@Override
114
public boolean isCheckpointCommitted(long checkpointId) throws Exception {
115
// Check if checkpoint was successfully committed
116
return checkExists("SELECT 1 FROM checkpoints WHERE id = ?", checkpointId);
117
}
118
119
private void executeUpdate(String sql, Object... params) throws Exception {
120
// Database update implementation
121
}
122
123
private boolean checkExists(String sql, Object... params) throws Exception {
124
// Database query implementation
125
return false;
126
}
127
}
128
```
129
130
## State Handle Providers
131
132
### StateHandleProvider<T>
133
134
Interface for providing state handle storage and retrieval mechanisms.
135
136
```java { .api }
137
public interface StateHandleProvider<T extends StateHandle> extends Serializable {
138
T createStateHandle(String state) throws Exception;
139
String getStateFromHandle(T stateHandle) throws Exception;
140
}
141
```
142
143
## Checkpoint Configuration
144
145
Checkpointing is configured at the execution environment level:
146
147
```java
148
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
149
150
// Enable checkpointing with 5-second interval
151
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
152
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
153
154
// Configure checkpointing behavior
155
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
156
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
157
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
158
env.getCheckpointConfig().setCheckpointTimeout(60000);
159
```
160
161
## Integration with User Functions
162
163
### Rich Functions with State
164
165
Rich functions can access operator state through the RuntimeContext:
166
167
```java
168
import org.apache.flink.api.common.functions.RichMapFunction;
169
import org.apache.flink.api.common.state.ValueState;
170
import org.apache.flink.api.common.state.ValueStateDescriptor;
171
import org.apache.flink.configuration.Configuration;
172
173
public class StatefulRichMapFunction extends RichMapFunction<String, String> {
174
private ValueState<Integer> counterState;
175
176
@Override
177
public void open(Configuration parameters) throws Exception {
178
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>(
179
"counter", // state name
180
Integer.class, // type class
181
0 // default value
182
);
183
counterState = getRuntimeContext().getState(descriptor);
184
}
185
186
@Override
187
public String map(String value) throws Exception {
188
Integer currentCount = counterState.value();
189
currentCount++;
190
counterState.update(currentCount);
191
return value + "_" + currentCount;
192
}
193
}
194
```
195
196
### Keyed State
197
198
For keyed streams, state is automatically partitioned by key:
199
200
```java
201
import org.apache.flink.streaming.api.datastream.DataStream;
202
203
DataStream<String> input = env.socketTextStream("localhost", 9999);
204
205
DataStream<String> result = input
206
.groupBy(value -> value.split(" ")[0]) // Group by first word
207
.map(new StatefulRichMapFunction()); // State is keyed automatically
208
```
209
210
## Fault Tolerance Guarantees
211
212
### Exactly-Once Processing
213
214
Flink provides exactly-once processing guarantees through:
215
216
1. **Consistent Checkpointing**: All operators checkpoint their state consistently
217
2. **Checkpoint Barriers**: Special watermark-like records that align checkpoints across parallel streams
218
3. **State Recovery**: On failure, all operators restore state from the last successful checkpoint
219
220
```java
221
// Example of exactly-once sink with checkpointing
222
public class ExactlyOnceSink implements SinkFunction<String>, CheckpointedAsynchronously<List<String>> {
223
private List<String> pendingRecords = new ArrayList<>();
224
225
@Override
226
public void invoke(String value) throws Exception {
227
// Buffer records until checkpoint
228
pendingRecords.add(value);
229
}
230
231
@Override
232
public List<String> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
233
// Flush all pending records to external system
234
flushToExternalSystem(pendingRecords);
235
236
// Return state for recovery
237
List<String> snapshot = new ArrayList<>(pendingRecords);
238
pendingRecords.clear();
239
return snapshot;
240
}
241
242
@Override
243
public void restoreState(List<String> state) throws Exception {
244
pendingRecords = state;
245
}
246
247
private void flushToExternalSystem(List<String> records) throws Exception {
248
// Implementation to write records to external system
249
}
250
}
251
```
252
253
## Types
254
255
```java { .api }
256
// State handle interface
257
public interface StateHandle extends Serializable {
258
void discardState() throws Exception;
259
long getStateSize() throws Exception;
260
}
261
262
// Checkpoint configuration
263
public class CheckpointConfig {
264
public void setCheckpointingMode(CheckpointingMode mode);
265
public void setMinPauseBetweenCheckpoints(long minPause);
266
public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints);
267
public void setCheckpointTimeout(long checkpointTimeout);
268
public void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanup);
269
}
270
271
// Checkpointing modes
272
public enum CheckpointingMode {
273
EXACTLY_ONCE, // Exactly-once processing semantics
274
AT_LEAST_ONCE // At-least-once processing semantics
275
}
276
277
// External checkpoint cleanup modes
278
public enum ExternalizedCheckpointCleanup {
279
RETAIN_ON_CANCELLATION, // Keep checkpoints when job is cancelled
280
DELETE_ON_CANCELLATION // Delete checkpoints when job is cancelled
281
}
282
```