0
# Recovery and State Management
1
2
Recovery system providing read-only access to persisted changelog data and lifecycle management for state handles. The recovery system enables checkpoint restoration and manages the lifecycle of changelog state objects.
3
4
## Capabilities
5
6
### FsStateChangelogStorageForRecovery
7
8
Read-only storage implementation for recovery operations during checkpoint restoration.
9
10
```java { .api }
11
/**
12
* Filesystem-based implementation of StateChangelogStorageView for recovery operations.
13
* Provides read-only access to persisted changelog data during checkpoint restoration.
14
*/
15
public class FsStateChangelogStorageForRecovery
16
implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {
17
18
/**
19
* Creates recovery storage with changelog stream handle reader
20
* @param changelogStreamHandleReader Reader for accessing persisted changelog streams
21
*/
22
public FsStateChangelogStorageForRecovery(
23
ChangelogStreamHandleReader changelogStreamHandleReader
24
);
25
26
/**
27
* Creates a reader for accessing changelog handles during recovery
28
* @return StateChangelogHandleReader for reading persisted changelog data
29
*/
30
public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader();
31
32
/**
33
* Closes the recovery storage and releases resources
34
* @throws Exception if cleanup fails
35
*/
36
public void close() throws Exception;
37
}
38
```
39
40
### Changelog Stream Reading
41
42
Components for reading persisted changelog data from distributed file systems.
43
44
```java { .api }
45
/**
46
* Reader for changelog stream handles with direct file system access
47
*/
48
public class ChangelogStreamHandleReader implements AutoCloseable {
49
50
/** Direct reader without caching */
51
public static final ChangelogStreamHandleReader DIRECT_READER;
52
53
/**
54
* Reads state changes from a changelog handle
55
* @param handle Changelog handle containing stream references
56
* @return CloseableIterator for iterating over state changes
57
* @throws IOException if reading fails
58
*/
59
public CloseableIterator<StateChange> read(ChangelogStateHandleStreamImpl handle)
60
throws IOException;
61
62
/**
63
* Closes the reader and releases resources
64
* @throws IOException if cleanup fails
65
*/
66
public void close() throws IOException;
67
}
68
69
/**
70
* Reader with local caching support for improved performance
71
*/
72
public class ChangelogStreamHandleReaderWithCache extends ChangelogStreamHandleReader {
73
74
/**
75
* Creates cached reader with configuration
76
* @param configuration Flink configuration containing cache settings
77
*/
78
public ChangelogStreamHandleReaderWithCache(Configuration configuration);
79
}
80
```
81
82
### State Change Iteration
83
84
Iterator implementation for traversing state changes during recovery.
85
86
```java { .api }
87
/**
88
* Iterator implementation for reading state changes from changelog streams
89
*/
90
public class StateChangeIteratorImpl implements CloseableIterator<StateChange> {
91
92
/**
93
* Creates iterator with changelog stream reader
94
* @param changelogStreamHandleReader Reader for accessing changelog streams
95
*/
96
public StateChangeIteratorImpl(ChangelogStreamHandleReader changelogStreamHandleReader);
97
98
/**
99
* Checks if more state changes are available
100
* @return true if more changes exist
101
*/
102
public boolean hasNext();
103
104
/**
105
* Returns the next state change
106
* @return Next StateChange in the iteration
107
* @throws NoSuchElementException if no more changes exist
108
*/
109
public StateChange next();
110
111
/**
112
* Closes the iterator and releases resources
113
* @throws IOException if cleanup fails
114
*/
115
public void close() throws IOException;
116
}
117
```
118
119
**Usage Examples:**
120
121
```java
122
import org.apache.flink.changelog.fs.*;
123
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
124
125
// Create recovery storage (typically done by factory)
126
ChangelogStreamHandleReader reader = new ChangelogStreamHandleReaderWithCache(config);
127
FsStateChangelogStorageForRecovery recoveryStorage =
128
new FsStateChangelogStorageForRecovery(reader);
129
130
// Create handle reader for recovery
131
StateChangelogHandleReader<ChangelogStateHandleStreamImpl> handleReader =
132
recoveryStorage.createReader();
133
134
// Read state changes from checkpoint handle
135
ChangelogStateHandleStreamImpl checkpointHandle = getCheckpointHandle();
136
try (CloseableIterator<StateChange> iterator = reader.read(checkpointHandle)) {
137
while (iterator.hasNext()) {
138
StateChange change = iterator.next();
139
140
// Process state change during recovery
141
if (change.getKeyGroup() != StateChange.META_KEY_GROUP) {
142
// Regular state change for specific key group
143
int keyGroup = change.getKeyGroup();
144
byte[] changeData = change.getChange();
145
applyStateChange(keyGroup, changeData);
146
} else {
147
// Metadata change
148
byte[] metadata = change.getChange();
149
applyMetadataChange(metadata);
150
}
151
}
152
}
153
154
// Cleanup
155
recoveryStorage.close();
156
```
157
158
### TaskChangelogRegistry
159
160
Registry for managing the lifecycle of changelog state handles and coordinating between job manager and task manager ownership.
161
162
```java { .api }
163
/**
164
* Registry for tracking changelog state objects and managing their lifecycle.
165
* Coordinates between task manager and job manager ownership of state handles.
166
*/
167
public interface TaskChangelogRegistry {
168
169
/**
170
* Starts tracking a state handle with reference counting
171
* @param handle StreamStateHandle to track
172
* @param refCount Initial reference count (number of changelog segments)
173
*/
174
void startTracking(StreamStateHandle handle, long refCount);
175
176
/**
177
* Stops tracking a state handle (JM becomes owner)
178
* @param handle StreamStateHandle to stop tracking
179
*/
180
void stopTracking(StreamStateHandle handle);
181
182
/**
183
* Releases a reference to a state handle (decrements ref count)
184
* @param handle StreamStateHandle to release
185
*/
186
void release(StreamStateHandle handle);
187
188
/**
189
* Creates default registry with specified number of discard threads
190
* @param numDiscardThreads Number of threads for async discard operations
191
* @return TaskChangelogRegistry instance
192
*/
193
static TaskChangelogRegistry defaultChangelogRegistry(int numDiscardThreads);
194
}
195
```
196
197
### TaskChangelogRegistryImpl
198
199
Default implementation of the changelog registry with reference counting and async cleanup.
200
201
```java { .api }
202
/**
203
* Default implementation of TaskChangelogRegistry with reference counting
204
*/
205
public class TaskChangelogRegistryImpl implements TaskChangelogRegistry {
206
207
/**
208
* Creates registry with custom executor for discard operations
209
* @param discardExecutor Executor for running discard operations
210
*/
211
public TaskChangelogRegistryImpl(Executor discardExecutor);
212
213
/**
214
* Closes the registry and shuts down discard operations
215
* @throws Exception if cleanup fails
216
*/
217
public void close() throws Exception;
218
}
219
```
220
221
**Usage Examples:**
222
223
```java
224
// Create changelog registry (typically done by storage)
225
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);
226
227
// Start tracking uploaded state handle
228
StreamStateHandle uploadedHandle = uploadResult.getStreamStateHandle();
229
long refCount = 3; // Number of state change sets in this handle
230
registry.startTracking(uploadedHandle, refCount);
231
232
// Release references as state changes become unused
233
registry.release(uploadedHandle); // refCount becomes 2
234
registry.release(uploadedHandle); // refCount becomes 1
235
registry.release(uploadedHandle); // refCount becomes 0, handle is discarded
236
237
// Stop tracking when JM becomes owner (e.g., after checkpoint completion)
238
registry.stopTracking(confirmedHandle);
239
```
240
241
### Local Changelog Registry
242
243
Registry for managing local changelog files when local recovery is enabled.
244
245
```java { .api }
246
/**
247
* Registry for managing local changelog files during recovery
248
*/
249
public interface LocalChangelogRegistry extends AutoCloseable {
250
251
/** No-op implementation when local recovery is disabled */
252
LocalChangelogRegistry NO_OP = /* ... */;
253
254
/**
255
* Registers a local changelog handle for a checkpoint
256
* @param handle Local stream state handle
257
* @param checkpointId Checkpoint identifier
258
*/
259
void register(StreamStateHandle handle, long checkpointId);
260
261
/**
262
* Discards local changelog files up to a checkpoint
263
* @param checkpointId Checkpoint identifier (inclusive)
264
*/
265
void discardUpToCheckpoint(long checkpointId);
266
267
/**
268
* Closes the registry and releases resources
269
* @throws IOException if cleanup fails
270
*/
271
void close() throws IOException;
272
}
273
274
/**
275
* Implementation of LocalChangelogRegistry with async cleanup
276
*/
277
public class LocalChangelogRegistryImpl implements LocalChangelogRegistry {
278
279
/**
280
* Creates local registry with single-threaded executor
281
* @param executor Executor for cleanup operations
282
*/
283
public LocalChangelogRegistryImpl(Executor executor);
284
}
285
```
286
287
### Changelog Stream Wrapping
288
289
Wrapper components for managing changelog stream access and caching.
290
291
```java { .api }
292
/**
293
* Wrapper for changelog streams providing additional functionality
294
*/
295
public class ChangelogStreamWrapper {
296
297
/**
298
* Wraps a changelog stream with additional features
299
* @param inputStream Underlying input stream
300
* @param streamStateHandle Handle for the stream
301
*/
302
public ChangelogStreamWrapper(
303
InputStream inputStream,
304
StreamStateHandle streamStateHandle
305
);
306
}
307
```
308
309
### Recovery Performance and Caching
310
311
The recovery system includes caching for improved performance:
312
313
```java
314
// Configure cache timeout for recovery
315
Configuration config = new Configuration();
316
config.set(FsStateChangelogOptions.CACHE_IDLE_TIMEOUT, Duration.ofMinutes(10));
317
318
// Cached reader automatically manages local cache files
319
ChangelogStreamHandleReaderWithCache cachedReader =
320
new ChangelogStreamHandleReaderWithCache(config);
321
322
// Cache files are automatically cleaned up after idle timeout
323
```
324
325
### Error Handling During Recovery
326
327
Recovery operations handle various failure scenarios:
328
329
```java
330
try {
331
// Read changelog during recovery
332
try (CloseableIterator<StateChange> iterator = reader.read(handle)) {
333
while (iterator.hasNext()) {
334
StateChange change = iterator.next();
335
// Process change...
336
}
337
}
338
} catch (IOException e) {
339
// Handle reading failures
340
log.error("Failed to read changelog during recovery", e);
341
throw new RuntimeException("Recovery failed", e);
342
} catch (RuntimeException e) {
343
// Handle processing failures
344
log.error("Failed to process state change during recovery", e);
345
throw e;
346
}
347
```
348
349
### Integration with Checkpoint Lifecycle
350
351
The recovery system integrates with Flink's checkpoint lifecycle:
352
353
- **Checkpoint Creation**: Handles are created by writers and tracked by registry
354
- **Checkpoint Confirmation**: Registry stops tracking confirmed handles (JM ownership)
355
- **Checkpoint Subsumption**: Registry releases old handles and discards unused state
356
- **Recovery**: Storage view provides read access to persisted handles
357
- **Cleanup**: Registry ensures proper cleanup of unused handles and local files