0
# Registry and Tracking
1
2
TaskManager-side registry for tracking changelog segments and managing their lifecycle. The registry coordinates reference counting, cleanup operations, and resource management for persisted changelog data.
3
4
## Capabilities
5
6
### TaskChangelogRegistry Interface
7
8
Core interface for tracking and managing changelog segments with reference counting and cleanup coordination.
9
10
```java { .api }
11
/**
12
* TaskManager-side registry for tracking changelog segments
13
*/
14
@Internal
15
public interface TaskChangelogRegistry {
16
17
/** No-operation registry implementation for testing or disabled scenarios */
18
TaskChangelogRegistry NO_OP = new TaskChangelogRegistry() { /* no-op implementation */ };
19
20
/**
21
* Starts tracking a changelog segment with initial reference count
22
* @param handle StreamStateHandle representing the changelog segment
23
* @param refCount Initial reference count for the segment
24
*/
25
void startTracking(StreamStateHandle handle, long refCount);
26
27
/**
28
* Stops tracking a changelog segment (decrements reference count)
29
* @param handle StreamStateHandle to stop tracking
30
*/
31
void stopTracking(StreamStateHandle handle);
32
33
/**
34
* Releases a changelog segment for cleanup when no longer needed
35
* @param handle StreamStateHandle to release
36
*/
37
void release(StreamStateHandle handle);
38
39
/**
40
* Creates default registry with specified number of async discard threads
41
* @param numAsyncDiscardThreads Number of threads for asynchronous cleanup
42
* @return Configured TaskChangelogRegistry instance
43
*/
44
static TaskChangelogRegistry defaultChangelogRegistry(int numAsyncDiscardThreads);
45
46
/**
47
* Creates default registry with custom executor for testing
48
* @param executor Custom executor for discard operations
49
* @return TaskChangelogRegistry instance using the provided executor
50
*/
51
@VisibleForTesting
52
static TaskChangelogRegistry defaultChangelogRegistry(Executor executor);
53
}
54
```
55
56
**Basic Registry Usage Example:**
57
58
```java
59
import org.apache.flink.changelog.fs.TaskChangelogRegistry;
60
import org.apache.flink.runtime.state.StreamStateHandle;
61
62
// Create registry with default settings
63
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);
64
65
// Track a new changelog segment
66
StreamStateHandle handle = createChangelogHandle();
67
registry.startTracking(handle, 1); // Initial reference count of 1
68
69
// Multiple operators might reference the same segment
70
registry.startTracking(handle, 2); // Increment to 2 references
71
72
// When operators no longer need the segment
73
registry.stopTracking(handle); // Decrements to 1
74
registry.stopTracking(handle); // Decrements to 0
75
76
// Release when completely done
77
registry.release(handle); // Triggers cleanup
78
```
79
80
### Default Registry Creation
81
82
Factory methods for creating registry instances with different configuration options.
83
84
```java { .api }
85
/**
86
* Creates default registry with specified async discard threads
87
* @param numAsyncDiscardThreads Number of background threads for cleanup
88
* @return TaskChangelogRegistry with thread pool for async operations
89
*/
90
static TaskChangelogRegistry defaultChangelogRegistry(int numAsyncDiscardThreads);
91
92
/**
93
* Creates registry with custom executor (primarily for testing)
94
* @param executor Custom executor for discard operations
95
* @return TaskChangelogRegistry using provided executor
96
*/
97
@VisibleForTesting
98
static TaskChangelogRegistry defaultChangelogRegistry(Executor executor);
99
```
100
101
**Registry Creation Examples:**
102
103
```java
104
// Production usage: create with configurable thread count
105
int discardThreads = config.get(FsStateChangelogOptions.NUM_DISCARD_THREADS);
106
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(discardThreads);
107
108
// Testing usage: create with custom executor
109
Executor testExecutor = Executors.newSingleThreadExecutor();
110
TaskChangelogRegistry testRegistry = TaskChangelogRegistry.defaultChangelogRegistry(testExecutor);
111
112
// Disabled registry for scenarios where tracking is not needed
113
TaskChangelogRegistry disabledRegistry = TaskChangelogRegistry.NO_OP;
114
```
115
116
### Reference Counting and Lifecycle
117
118
The registry implements reference counting to ensure changelog segments are only cleaned up when no longer referenced.
119
120
```java { .api }
121
/**
122
* Starts tracking with initial reference count
123
* @param handle Changelog segment handle
124
* @param refCount Initial number of references
125
*/
126
void startTracking(StreamStateHandle handle, long refCount);
127
128
/**
129
* Decrements reference count by stopping tracking
130
* @param handle Changelog segment handle
131
*/
132
void stopTracking(StreamStateHandle handle);
133
134
/**
135
* Releases segment for cleanup when appropriate
136
* @param handle Changelog segment handle
137
*/
138
void release(StreamStateHandle handle);
139
```
140
141
**Reference Counting Examples:**
142
143
```java
144
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);
145
StreamStateHandle handle = uploadResult.streamStateHandle;
146
147
// Scenario 1: Single operator using a segment
148
registry.startTracking(handle, 1);
149
// ... operator uses segment ...
150
registry.stopTracking(handle); // Ref count goes to 0
151
registry.release(handle); // Cleanup initiated
152
153
// Scenario 2: Multiple operators sharing a segment
154
registry.startTracking(handle, 3); // 3 operators will use this segment
155
156
// First operator finishes
157
registry.stopTracking(handle); // Ref count: 3 -> 2
158
159
// Second operator finishes
160
registry.stopTracking(handle); // Ref count: 2 -> 1
161
162
// Third operator finishes
163
registry.stopTracking(handle); // Ref count: 1 -> 0
164
165
// Now safe to release
166
registry.release(handle); // Cleanup initiated when ref count is 0
167
```
168
169
### TaskChangelogRegistryImpl Implementation
170
171
Internal implementation providing thread-safe reference counting and asynchronous cleanup.
172
173
```java { .api }
174
/**
175
* Default implementation of TaskChangelogRegistry with thread-safe operations
176
*/
177
@Internal
178
@ThreadSafe
179
class TaskChangelogRegistryImpl implements TaskChangelogRegistry {
180
181
/**
182
* Creates registry with executor for async discard operations
183
* @param discardExecutor Executor for running cleanup tasks
184
*/
185
public TaskChangelogRegistryImpl(Executor discardExecutor);
186
}
187
```
188
189
**Implementation Usage Example:**
190
191
```java
192
// Create with custom thread pool
193
ExecutorService discardExecutor = Executors.newFixedThreadPool(3);
194
TaskChangelogRegistry registry = new TaskChangelogRegistryImpl(discardExecutor);
195
196
// Registry will use the executor for async cleanup operations
197
StreamStateHandle handle = createHandle();
198
registry.startTracking(handle, 1);
199
registry.stopTracking(handle);
200
registry.release(handle); // Cleanup runs asynchronously on discardExecutor
201
202
// Clean up executor when done
203
discardExecutor.shutdown();
204
```
205
206
### Integration with Storage Components
207
208
The registry integrates with storage and upload components to coordinate segment lifecycle.
209
210
**Storage Integration Example:**
211
212
```java
213
// Create storage with registry
214
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);
215
216
FsStateChangelogStorage storage = new FsStateChangelogStorage(
217
jobID,
218
config,
219
metricGroup,
220
registry, // Pass registry to storage
221
localRecoveryConfig
222
);
223
224
// Writer operations automatically coordinate with registry
225
FsStateChangelogWriter writer = storage.createWriter(operatorId, keyGroupRange, mailboxExecutor);
226
227
// When persist completes, registry tracks the result
228
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
229
writer.persist(sequenceNumber, checkpointId);
230
231
future.thenAccept(result -> {
232
StreamStateHandle handle = result.getJobManagerOwnedSnapshot().getStreamStateHandle();
233
// Registry automatically starts tracking this handle
234
System.out.println("Registry now tracking: " + handle);
235
});
236
```
237
238
### Upload Coordination
239
240
The registry coordinates with upload schedulers to manage segment references during upload operations.
241
242
**Upload Coordination Example:**
243
244
```java
245
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);
246
247
StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(
248
jobID, config, metricGroup, registry, localRecoveryConfig
249
);
250
251
// Upload task completion automatically updates registry
252
UploadTask task = new UploadTask();
253
task.changeset = changeSet;
254
255
task.onCompleted = uploadResult -> {
256
// Upload scheduler coordinates with registry
257
StreamStateHandle handle = uploadResult.streamStateHandle;
258
259
// Registry tracks the uploaded segment
260
registry.startTracking(handle, 1);
261
262
System.out.println("Upload completed and registered: " + handle);
263
};
264
265
scheduler.upload(task);
266
```
267
268
### No-Op Registry
269
270
Special implementation that performs no tracking, useful for testing or disabled scenarios.
271
272
```java { .api }
273
/**
274
* No-operation registry that performs no tracking
275
*/
276
TaskChangelogRegistry NO_OP = new NoOpTaskChangelogRegistry();
277
```
278
279
**No-Op Usage Example:**
280
281
```java
282
// Use NO_OP registry when tracking is not needed
283
TaskChangelogRegistry noOpRegistry = TaskChangelogRegistry.NO_OP;
284
285
// All operations are no-ops (safe but perform no actual tracking)
286
noOpRegistry.startTracking(handle, 1); // Does nothing
287
noOpRegistry.stopTracking(handle); // Does nothing
288
noOpRegistry.release(handle); // Does nothing
289
290
// Useful for testing or minimal configurations
291
FsStateChangelogStorage storage = new FsStateChangelogStorage(
292
jobID, config, metricGroup,
293
TaskChangelogRegistry.NO_OP, // Disable tracking
294
localRecoveryConfig
295
);
296
```
297
298
### Thread Safety and Concurrency
299
300
The registry implementations are thread-safe and coordinate properly with concurrent operations.
301
302
**Concurrent Usage Example:**
303
304
```java
305
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(4);
306
StreamStateHandle sharedHandle = createSharedHandle();
307
308
// Multiple threads can safely interact with the registry
309
ExecutorService executor = Executors.newFixedThreadPool(10);
310
311
// Thread 1: Start tracking
312
executor.submit(() -> {
313
registry.startTracking(sharedHandle, 5);
314
System.out.println("Started tracking with 5 references");
315
});
316
317
// Threads 2-6: Stop tracking (decrementing references)
318
for (int i = 0; i < 5; i++) {
319
executor.submit(() -> {
320
registry.stopTracking(sharedHandle);
321
System.out.println("Stopped tracking (decremented reference)");
322
});
323
}
324
325
// Thread 7: Release when ready
326
executor.submit(() -> {
327
// Wait a bit to ensure all stop tracking calls complete
328
try { Thread.sleep(100); } catch (InterruptedException e) {}
329
registry.release(sharedHandle);
330
System.out.println("Released handle for cleanup");
331
});
332
333
executor.shutdown();
334
```
335
336
### Error Handling and Resilience
337
338
The registry provides appropriate error handling for various failure scenarios.
339
340
**Error Handling Examples:**
341
342
```java
343
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);
344
345
try {
346
// Normal operation
347
registry.startTracking(handle, 1);
348
registry.stopTracking(handle);
349
registry.release(handle);
350
} catch (Exception e) {
351
System.err.println("Registry operation failed: " + e.getMessage());
352
// Registry errors are typically non-fatal but should be logged
353
}
354
355
// Handle invalid operations gracefully
356
registry.stopTracking(nonExistentHandle); // Safe no-op in most implementations
357
registry.release(alreadyReleasedHandle); // Safe no-op in most implementations
358
359
// Defensive programming
360
if (handle != null) {
361
registry.startTracking(handle, 1);
362
}
363
```
364
365
### Cleanup and Shutdown
366
367
Proper cleanup of registry resources when shutting down.
368
369
**Cleanup Example:**
370
371
```java
372
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(3);
373
374
// Use registry during application lifecycle
375
// ...
376
377
// During shutdown, ensure proper cleanup
378
if (registry instanceof TaskChangelogRegistryImpl) {
379
TaskChangelogRegistryImpl impl = (TaskChangelogRegistryImpl) registry;
380
// Implementation handles cleanup of internal executor and pending operations
381
impl.close(); // If close method is available
382
}
383
384
// Or let it clean up naturally during application shutdown
385
// The internal executor will be shutdown when the JVM exits
386
```