0
# Changelog Writers
1
2
Writers for appending state changes and managing persistence operations with preemptive flushing and checkpoint coordination. Writers handle the accumulation, batching, and persistence of state changes for individual operators.
3
4
## Capabilities
5
6
### StateChangelogWriter Interface
7
8
Core interface for writing state changes with checkpoint coordination and lifecycle management.
9
10
```java { .api }
11
/**
12
* Interface for writing state changes to changelog storage.
13
* Provides methods for appending changes, managing persistence, and coordinating with checkpoints.
14
*/
15
public interface StateChangelogWriter<T> extends AutoCloseable {
16
17
/**
18
* Appends a state change for a specific key group
19
* @param keyGroup Key group identifier (0-based)
20
* @param value Serialized state change data
21
* @throws IOException if append operation fails
22
*/
23
void append(int keyGroup, byte[] value) throws IOException;
24
25
/**
26
* Appends metadata changes (not associated with any key group)
27
* @param value Serialized metadata change data
28
* @throws IOException if append operation fails
29
*/
30
void appendMeta(byte[] value) throws IOException;
31
32
/**
33
* Returns the initial sequence number for this writer
34
* @return Initial SequenceNumber (typically 0)
35
*/
36
SequenceNumber initialSequenceNumber();
37
38
/**
39
* Advances to the next sequence number, creating a rollover point
40
* @return Next SequenceNumber for distinguishing change batches
41
*/
42
SequenceNumber nextSequenceNumber();
43
44
/**
45
* Persists accumulated changes starting from the specified sequence number
46
* @param from Starting sequence number (inclusive)
47
* @param checkpointId Checkpoint identifier for this persistence operation
48
* @return CompletableFuture with snapshot result containing changelog handles
49
* @throws IOException if persistence fails
50
*/
51
CompletableFuture<SnapshotResult<T>> persist(SequenceNumber from, long checkpointId)
52
throws IOException;
53
54
/**
55
* Truncates changes up to the specified sequence number
56
* @param to Sequence number to truncate up to (exclusive)
57
*/
58
void truncate(SequenceNumber to);
59
60
/**
61
* Truncates changes from the specified sequence number and closes writer
62
* @param from Sequence number to truncate from (inclusive)
63
*/
64
void truncateAndClose(SequenceNumber from);
65
66
/**
67
* Confirms successful checkpoint completion for a range of sequence numbers
68
* @param from Starting sequence number (inclusive)
69
* @param to Ending sequence number (exclusive)
70
* @param checkpointId Checkpoint identifier
71
*/
72
void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
73
74
/**
75
* Resets writer state after checkpoint abort
76
* @param from Starting sequence number (inclusive)
77
* @param to Ending sequence number (exclusive)
78
* @param checkpointId Checkpoint identifier
79
*/
80
void reset(SequenceNumber from, SequenceNumber to, long checkpointId);
81
82
/**
83
* Closes the writer and releases resources
84
*/
85
void close();
86
}
87
```
88
89
### FsStateChangelogWriter Implementation
90
91
Filesystem-specific implementation with preemptive persistence and batch management.
92
93
```java { .api }
94
/**
95
* Filesystem-based implementation of StateChangelogWriter.
96
* Not thread-safe - designed for single-threaded use per operator.
97
*/
98
class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandleStreamImpl> {
99
100
/**
101
* Constructor for filesystem changelog writer
102
* @param logId Unique identifier for this writer's log
103
* @param keyGroupRange Key group range this writer handles
104
* @param uploader Upload scheduler for persistence operations
105
* @param preEmptivePersistThresholdInBytes Size threshold for preemptive persistence
106
* @param mailboxExecutor Executor for callback processing
107
* @param changelogRegistry Registry for managing changelog state lifecycle
108
* @param localRecoveryConfig Configuration for local recovery
109
* @param localChangelogRegistry Registry for local changelog files
110
*/
111
FsStateChangelogWriter(
112
UUID logId,
113
KeyGroupRange keyGroupRange,
114
StateChangeUploadScheduler uploader,
115
long preEmptivePersistThresholdInBytes,
116
MailboxExecutor mailboxExecutor,
117
TaskChangelogRegistry changelogRegistry,
118
LocalRecoveryConfig localRecoveryConfig,
119
LocalChangelogRegistry localChangelogRegistry
120
);
121
}
122
```
123
124
**Usage Examples:**
125
126
```java
127
import org.apache.flink.changelog.fs.FsStateChangelogWriter;
128
import org.apache.flink.runtime.state.changelog.SequenceNumber;
129
130
// Writer is typically created by FsStateChangelogStorage
131
FsStateChangelogWriter writer = storage.createWriter(
132
"my-operator",
133
KeyGroupRange.of(0, 127),
134
mailboxExecutor
135
);
136
137
// Append state changes during processing
138
writer.append(5, stateChangeForKeyGroup5);
139
writer.append(10, stateChangeForKeyGroup10);
140
writer.appendMeta(operatorMetadata);
141
142
// Get sequence number for checkpoint coordination
143
SequenceNumber checkpointSqn = writer.nextSequenceNumber();
144
145
// Persist changes during checkpoint
146
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
147
writer.persist(writer.initialSequenceNumber(), checkpointId);
148
149
future.thenAccept(result -> {
150
// Checkpoint snapshot completed
151
ChangelogStateHandleStreamImpl handle = result.getJobManagerOwnedSnapshot();
152
log.info("Persisted changelog with {} handles", handle.getStreamStateHandles().size());
153
});
154
155
// Cleanup after checkpoint completion
156
writer.confirm(fromSqn, toSqn, checkpointId);
157
```
158
159
### Preemptive Persistence
160
161
Writers automatically trigger persistence when accumulated changes exceed the configured threshold:
162
163
```java
164
// Configure preemptive persistence threshold
165
Configuration config = new Configuration();
166
config.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.parse("5MB"));
167
168
// Writer automatically flushes when threshold is exceeded
169
writer.append(keyGroup, largeStateChange); // May trigger preemptive flush
170
```
171
172
The preemptive persistence:
173
- Reduces checkpoint duration by avoiding large uploads during checkpoint
174
- Provides quasi-continuous uploading of state changes
175
- Maintains ordering guarantees across preemptive and checkpoint-triggered persistence
176
177
### Sequence Number Management
178
179
Writers use sequence numbers to track and coordinate state change batches:
180
181
```java
182
// Initial state
183
SequenceNumber initial = writer.initialSequenceNumber(); // Typically SequenceNumber.of(0)
184
185
// Advance sequence number to create rollover points
186
writer.append(1, change1);
187
writer.append(2, change2);
188
SequenceNumber rollover1 = writer.nextSequenceNumber();
189
190
writer.append(3, change3);
191
writer.append(4, change4);
192
SequenceNumber rollover2 = writer.nextSequenceNumber();
193
194
// Persist changes from specific sequence number
195
CompletableFuture<SnapshotResult> result = writer.persist(rollover1, checkpointId);
196
```
197
198
### Error Handling and Recovery
199
200
Writers handle various error conditions and coordinate with Flink's fault tolerance:
201
202
```java
203
try {
204
writer.append(keyGroup, stateChange);
205
206
CompletableFuture<SnapshotResult> persistFuture = writer.persist(sqn, checkpointId);
207
208
persistFuture.whenComplete((result, throwable) -> {
209
if (throwable != null) {
210
// Handle persistence failure
211
log.error("Changelog persistence failed for checkpoint {}", checkpointId, throwable);
212
// Flink will trigger checkpoint abort and recovery
213
} else {
214
// Persistence successful
215
log.debug("Changelog persisted successfully for checkpoint {}", checkpointId);
216
}
217
});
218
219
} catch (IOException e) {
220
// Handle append failures
221
log.error("Failed to append state change", e);
222
throw new RuntimeException("State change append failed", e);
223
}
224
```
225
226
### Lifecycle Management
227
228
Writers coordinate with Flink's checkpoint lifecycle:
229
230
```java
231
// During checkpoint
232
CompletableFuture<SnapshotResult> snapshotFuture = writer.persist(fromSqn, checkpointId);
233
234
// On checkpoint completion
235
writer.confirm(fromSqn, toSqn, checkpointId);
236
237
// On checkpoint abort/failure
238
writer.reset(fromSqn, toSqn, checkpointId);
239
240
// On state truncation (after successful checkpoint)
241
writer.truncate(truncateUpToSqn);
242
243
// On operator shutdown
244
writer.close();
245
```
246
247
### Local Recovery Integration
248
249
When local recovery is enabled, writers coordinate with local changelog registries:
250
251
```java
252
// Local recovery handles are automatically managed
253
CompletableFuture<SnapshotResult> result = writer.persist(sqn, checkpointId);
254
255
result.thenAccept(snapshotResult -> {
256
// Both remote and local handles are available
257
ChangelogStateHandleStreamImpl remoteHandle = snapshotResult.getJobManagerOwnedSnapshot();
258
ChangelogStateHandleStreamImpl localHandle = snapshotResult.getTaskLocalSnapshot();
259
260
// Local registry tracks local handles for recovery
261
});
262
```