0
# State Change Writers
1
2
Writer implementation for persisting state changes to filesystem with batching, upload coordination, and lifecycle management. Writers are created per operator and handle the actual writing of state changes.
3
4
## Capabilities
5
6
### FsStateChangelogWriter
7
8
Core writer implementation that handles state change persistence with sequence number tracking and upload coordination.
9
10
```java { .api }
11
/**
12
* Filesystem-based writer for state changes
13
* Note: This class is not thread-safe and should be used from a single thread
14
*/
15
@NotThreadSafe
16
class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandleStreamImpl> {
17
18
/**
19
* Appends metadata to the changelog
20
* @param value Metadata bytes to append
21
* @throws IOException If append operation fails
22
*/
23
public void appendMeta(byte[] value) throws IOException;
24
25
/**
26
* Appends a state change for a specific key group
27
* @param keyGroup Key group identifier (must be within writer's key group range)
28
* @param value State change data bytes
29
* @throws IOException If append operation fails
30
*/
31
public void append(int keyGroup, byte[] value) throws IOException;
32
33
/**
34
* Returns the initial sequence number for this writer
35
* @return SequenceNumber representing the starting point
36
*/
37
public SequenceNumber initialSequenceNumber();
38
39
/**
40
* Returns the next sequence number for new state changes
41
* @return SequenceNumber for the next state change
42
*/
43
public SequenceNumber nextSequenceNumber();
44
45
/**
46
* Persists accumulated state changes up to the given sequence number
47
* @param from Sequence number to persist from (inclusive)
48
* @param checkpointId Checkpoint identifier for this persistence operation
49
* @return CompletableFuture containing the snapshot result with changelog handle
50
*/
51
public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist(
52
SequenceNumber from,
53
long checkpointId
54
);
55
56
/**
57
* Closes the writer and releases all resources
58
* @throws Exception If closing fails
59
*/
60
public void close() throws Exception;
61
62
/**
63
* Truncates state changes up to the given sequence number
64
* @param to Sequence number to truncate to (exclusive)
65
*/
66
public void truncate(SequenceNumber to);
67
68
/**
69
* Truncates state changes from the given sequence number and closes the writer
70
* @param from Sequence number to truncate from (inclusive)
71
*/
72
public void truncateAndClose(SequenceNumber from);
73
74
/**
75
* Confirms state changes in the given range for a checkpoint
76
* @param from Start sequence number (inclusive)
77
* @param to End sequence number (exclusive)
78
* @param checkpointId Checkpoint identifier
79
*/
80
public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
81
82
/**
83
* Resets state changes in the given range for a checkpoint
84
* @param from Start sequence number (inclusive)
85
* @param to End sequence number (exclusive)
86
* @param checkpointId Checkpoint identifier
87
*/
88
public void reset(SequenceNumber from, SequenceNumber to, long checkpointId);
89
}
90
```
91
92
**Basic Writer Usage Example:**
93
94
```java
95
// Create writer through storage
96
FsStateChangelogStorage storage = new FsStateChangelogStorage(/* ... */);
97
FsStateChangelogWriter writer = storage.createWriter(
98
"my-operator",
99
KeyGroupRange.of(0, 127),
100
mailboxExecutor
101
);
102
103
// Get initial sequence number
104
SequenceNumber initialSeq = writer.initialSequenceNumber();
105
106
// Append metadata
107
byte[] metadata = "operator-metadata".getBytes();
108
writer.appendMeta(metadata);
109
110
// Append state changes for different key groups
111
byte[] stateChange1 = serializeStateChange(state1);
112
writer.append(5, stateChange1);
113
114
byte[] stateChange2 = serializeStateChange(state2);
115
writer.append(15, stateChange2);
116
117
// Persist changes
118
SequenceNumber currentSeq = writer.nextSequenceNumber();
119
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persistFuture =
120
writer.persist(initialSeq, checkpointId);
121
122
// Handle persistence result
123
persistFuture.thenAccept(result -> {
124
ChangelogStateHandleStreamImpl handle = result.getJobManagerOwnedSnapshot();
125
// Handle contains the persisted changelog data
126
System.out.println("Persisted changelog: " + handle);
127
});
128
129
// Clean up
130
writer.close();
131
```
132
133
### Sequence Number Management
134
135
Writers maintain sequence numbers to track the order of state changes and coordinate with checkpointing.
136
137
```java { .api }
138
/**
139
* Gets the initial sequence number for this writer instance
140
* @return SequenceNumber representing the starting point
141
*/
142
public SequenceNumber initialSequenceNumber();
143
144
/**
145
* Gets the next available sequence number for new state changes
146
* @return SequenceNumber for the next state change to be written
147
*/
148
public SequenceNumber nextSequenceNumber();
149
```
150
151
**Sequence Number Usage Example:**
152
153
```java
154
FsStateChangelogWriter writer = storage.createWriter(/* ... */);
155
156
// Track sequence numbers
157
SequenceNumber start = writer.initialSequenceNumber();
158
System.out.println("Starting from sequence: " + start);
159
160
// Write some state changes
161
writer.append(1, stateData1);
162
writer.append(2, stateData2);
163
164
// Get current position
165
SequenceNumber current = writer.nextSequenceNumber();
166
System.out.println("Next sequence will be: " + current);
167
168
// Persist from start to current
169
writer.persist(start, checkpointId).thenAccept(result -> {
170
System.out.println("Persisted sequence range: " + start + " to " + current);
171
});
172
```
173
174
### State Change Appending
175
176
Methods for appending different types of state change data to the changelog.
177
178
```java { .api }
179
/**
180
* Appends operator metadata to the changelog
181
* @param value Serialized metadata bytes
182
* @throws IOException If the append operation fails
183
*/
184
public void appendMeta(byte[] value) throws IOException;
185
186
/**
187
* Appends state change data for a specific key group
188
* @param keyGroup Key group identifier (must be within the writer's assigned range)
189
* @param value Serialized state change bytes
190
* @throws IOException If the append operation fails
191
*/
192
public void append(int keyGroup, byte[] value) throws IOException;
193
```
194
195
**Appending Examples:**
196
197
```java
198
FsStateChangelogWriter writer = storage.createWriter(
199
"operator-1",
200
KeyGroupRange.of(0, 63), // Key groups 0-63
201
mailboxExecutor
202
);
203
204
// Append operator metadata (initialization info, configuration, etc.)
205
String metadataJson = "{\"operator\":\"MyOperator\",\"version\":\"1.0\"}";
206
writer.appendMeta(metadataJson.getBytes(StandardCharsets.UTF_8));
207
208
// Append state changes for specific key groups
209
for (int keyGroup = 0; keyGroup < 64; keyGroup++) {
210
byte[] stateChange = createStateChangeForKeyGroup(keyGroup);
211
writer.append(keyGroup, stateChange);
212
}
213
214
// Attempting to append to key group outside range will fail
215
try {
216
writer.append(100, someData); // Will throw exception since 100 > 63
217
} catch (IllegalArgumentException e) {
218
System.err.println("Key group out of range: " + e.getMessage());
219
}
220
```
221
222
### Persistence Operations
223
224
Core persistence functionality that coordinates with the upload system to store state changes durably.
225
226
```java { .api }
227
/**
228
* Persists accumulated state changes starting from the given sequence number
229
* @param from Starting sequence number (inclusive)
230
* @param checkpointId Checkpoint identifier for tracking
231
* @return CompletableFuture with snapshot result containing changelog handle
232
*/
233
public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist(
234
SequenceNumber from,
235
long checkpointId
236
);
237
```
238
239
**Persistence Examples:**
240
241
```java
242
// Basic persistence
243
SequenceNumber startSeq = writer.initialSequenceNumber();
244
writer.append(1, stateData1);
245
writer.append(2, stateData2);
246
247
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
248
writer.persist(startSeq, 12345L);
249
250
future.whenComplete((result, throwable) -> {
251
if (throwable != null) {
252
System.err.println("Persistence failed: " + throwable.getMessage());
253
} else {
254
ChangelogStateHandleStreamImpl handle = result.getJobManagerOwnedSnapshot();
255
System.out.println("Successfully persisted to: " + handle.getStreamStateHandle());
256
257
// Local handle (if local recovery is enabled)
258
StreamStateHandle localHandle = result.getTaskLocalSnapshot();
259
if (localHandle != null) {
260
System.out.println("Local backup at: " + localHandle);
261
}
262
}
263
});
264
265
// Chain multiple persist operations
266
CompletableFuture<Void> chainedPersistence = writer.persist(startSeq, 12345L)
267
.thenCompose(result1 -> {
268
// Write more changes
269
writer.append(3, moreStateData);
270
return writer.persist(writer.nextSequenceNumber(), 12346L);
271
})
272
.thenAccept(result2 -> {
273
System.out.println("Both persistence operations completed");
274
});
275
```
276
277
### Lifecycle Management
278
279
Methods for managing the writer lifecycle including truncation, confirmation, and cleanup.
280
281
```java { .api }
282
/**
283
* Truncates changelog up to the given sequence number
284
* @param to Sequence number to truncate to (exclusive)
285
*/
286
public void truncate(SequenceNumber to);
287
288
/**
289
* Truncates from the given sequence number and closes writer
290
* @param from Sequence number to truncate from (inclusive)
291
*/
292
public void truncateAndClose(SequenceNumber from);
293
294
/**
295
* Confirms successful processing of state changes in range
296
* @param from Start sequence number (inclusive)
297
* @param to End sequence number (exclusive)
298
* @param checkpointId Associated checkpoint ID
299
*/
300
public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
301
302
/**
303
* Resets state changes in range (e.g., after checkpoint failure)
304
* @param from Start sequence number (inclusive)
305
* @param to End sequence number (exclusive)
306
* @param checkpointId Associated checkpoint ID
307
*/
308
public void reset(SequenceNumber from, SequenceNumber to, long checkpointId);
309
310
/**
311
* Closes writer and releases all resources
312
* @throws Exception If cleanup fails
313
*/
314
public void close() throws Exception;
315
```
316
317
**Lifecycle Management Examples:**
318
319
```java
320
FsStateChangelogWriter writer = storage.createWriter(/* ... */);
321
322
try {
323
// Normal operation
324
SequenceNumber seq1 = writer.nextSequenceNumber();
325
writer.append(1, data1);
326
writer.append(2, data2);
327
328
SequenceNumber seq2 = writer.nextSequenceNumber();
329
writer.persist(seq1, checkpointId).get();
330
331
// Confirm successful checkpoint
332
writer.confirm(seq1, seq2, checkpointId);
333
334
// Continue with more changes
335
writer.append(3, data3);
336
SequenceNumber seq3 = writer.nextSequenceNumber();
337
338
// Truncate old data to save space
339
writer.truncate(seq2);
340
341
} catch (Exception e) {
342
// Reset on failure
343
writer.reset(seq1, seq2, checkpointId);
344
} finally {
345
// Always clean up
346
writer.close();
347
}
348
349
// Alternative: truncate and close in one operation
350
writer.truncateAndClose(someSequenceNumber);
351
```
352
353
### Error Handling
354
355
Common error scenarios and appropriate handling strategies.
356
357
**Error Handling Examples:**
358
359
```java
360
FsStateChangelogWriter writer = storage.createWriter(/* ... */);
361
362
try {
363
// This may fail if key group is out of range
364
writer.append(keyGroup, stateData);
365
} catch (IllegalArgumentException e) {
366
System.err.println("Invalid key group: " + e.getMessage());
367
}
368
369
try {
370
// This may fail due to I/O issues
371
writer.appendMeta(metadata);
372
} catch (IOException e) {
373
System.err.println("Failed to write metadata: " + e.getMessage());
374
// May need to recreate writer or fail the checkpoint
375
}
376
377
// Handle persistence failures
378
writer.persist(sequenceNumber, checkpointId)
379
.exceptionally(throwable -> {
380
System.err.println("Persistence failed: " + throwable.getMessage());
381
// Return null result or take recovery action
382
return null;
383
});
384
385
try {
386
writer.close();
387
} catch (Exception e) {
388
System.err.println("Failed to close writer cleanly: " + e.getMessage());
389
// Log but continue with shutdown
390
}
391
```