Apache Flink DSTL (Distributed State Timeline) - A filesystem-based state changelog implementation for Flink's state management
—
Writer implementation for persisting state changes to filesystem with batching, upload coordination, and lifecycle management. Writers are created per operator and handle the actual writing of state changes.
Core writer implementation that handles state change persistence with sequence number tracking and upload coordination.
/**
* Filesystem-based writer for state changes
* Note: This class is not thread-safe and should be used from a single thread
*/
@NotThreadSafe
class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandleStreamImpl> {
/**
* Appends metadata to the changelog
* @param value Metadata bytes to append
* @throws IOException If append operation fails
*/
public void appendMeta(byte[] value) throws IOException;
/**
* Appends a state change for a specific key group
* @param keyGroup Key group identifier (must be within writer's key group range)
* @param value State change data bytes
* @throws IOException If append operation fails
*/
public void append(int keyGroup, byte[] value) throws IOException;
/**
* Returns the initial sequence number for this writer
* @return SequenceNumber representing the starting point
*/
public SequenceNumber initialSequenceNumber();
/**
* Returns the next sequence number for new state changes
* @return SequenceNumber for the next state change
*/
public SequenceNumber nextSequenceNumber();
/**
* Persists accumulated state changes up to the given sequence number
* @param from Sequence number to persist from (inclusive)
* @param checkpointId Checkpoint identifier for this persistence operation
* @return CompletableFuture containing the snapshot result with changelog handle
*/
public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist(
SequenceNumber from,
long checkpointId
);
/**
* Closes the writer and releases all resources
* @throws Exception If closing fails
*/
public void close() throws Exception;
/**
* Truncates state changes up to the given sequence number
* @param to Sequence number to truncate to (exclusive)
*/
public void truncate(SequenceNumber to);
/**
* Truncates state changes from the given sequence number and closes the writer
* @param from Sequence number to truncate from (inclusive)
*/
public void truncateAndClose(SequenceNumber from);
/**
* Confirms state changes in the given range for a checkpoint
* @param from Start sequence number (inclusive)
* @param to End sequence number (exclusive)
* @param checkpointId Checkpoint identifier
*/
public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
/**
* Resets state changes in the given range for a checkpoint
* @param from Start sequence number (inclusive)
* @param to End sequence number (exclusive)
* @param checkpointId Checkpoint identifier
*/
public void reset(SequenceNumber from, SequenceNumber to, long checkpointId);
}Basic Writer Usage Example:
// Create writer through storage
FsStateChangelogStorage storage = new FsStateChangelogStorage(/* ... */);
FsStateChangelogWriter writer = storage.createWriter(
"my-operator",
KeyGroupRange.of(0, 127),
mailboxExecutor
);
// Get initial sequence number
SequenceNumber initialSeq = writer.initialSequenceNumber();
// Append metadata
byte[] metadata = "operator-metadata".getBytes();
writer.appendMeta(metadata);
// Append state changes for different key groups
byte[] stateChange1 = serializeStateChange(state1);
writer.append(5, stateChange1);
byte[] stateChange2 = serializeStateChange(state2);
writer.append(15, stateChange2);
// Persist changes
SequenceNumber currentSeq = writer.nextSequenceNumber();
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persistFuture =
writer.persist(initialSeq, checkpointId);
// Handle persistence result
persistFuture.thenAccept(result -> {
ChangelogStateHandleStreamImpl handle = result.getJobManagerOwnedSnapshot();
// Handle contains the persisted changelog data
System.out.println("Persisted changelog: " + handle);
});
// Clean up
writer.close();Writers maintain sequence numbers to track the order of state changes and coordinate with checkpointing.
/**
* Gets the initial sequence number for this writer instance
* @return SequenceNumber representing the starting point
*/
public SequenceNumber initialSequenceNumber();
/**
* Gets the next available sequence number for new state changes
* @return SequenceNumber for the next state change to be written
*/
public SequenceNumber nextSequenceNumber();Sequence Number Usage Example:
FsStateChangelogWriter writer = storage.createWriter(/* ... */);
// Track sequence numbers
SequenceNumber start = writer.initialSequenceNumber();
System.out.println("Starting from sequence: " + start);
// Write some state changes
writer.append(1, stateData1);
writer.append(2, stateData2);
// Get current position
SequenceNumber current = writer.nextSequenceNumber();
System.out.println("Next sequence will be: " + current);
// Persist from start to current
writer.persist(start, checkpointId).thenAccept(result -> {
System.out.println("Persisted sequence range: " + start + " to " + current);
});Methods for appending different types of state change data to the changelog.
/**
* Appends operator metadata to the changelog
* @param value Serialized metadata bytes
* @throws IOException If the append operation fails
*/
public void appendMeta(byte[] value) throws IOException;
/**
* Appends state change data for a specific key group
* @param keyGroup Key group identifier (must be within the writer's assigned range)
* @param value Serialized state change bytes
* @throws IOException If the append operation fails
*/
public void append(int keyGroup, byte[] value) throws IOException;Appending Examples:
FsStateChangelogWriter writer = storage.createWriter(
"operator-1",
KeyGroupRange.of(0, 63), // Key groups 0-63
mailboxExecutor
);
// Append operator metadata (initialization info, configuration, etc.)
String metadataJson = "{\"operator\":\"MyOperator\",\"version\":\"1.0\"}";
writer.appendMeta(metadataJson.getBytes(StandardCharsets.UTF_8));
// Append state changes for specific key groups
for (int keyGroup = 0; keyGroup < 64; keyGroup++) {
byte[] stateChange = createStateChangeForKeyGroup(keyGroup);
writer.append(keyGroup, stateChange);
}
// Attempting to append to key group outside range will fail
try {
writer.append(100, someData); // Will throw exception since 100 > 63
} catch (IllegalArgumentException e) {
System.err.println("Key group out of range: " + e.getMessage());
}Core persistence functionality that coordinates with the upload system to store state changes durably.
/**
* Persists accumulated state changes starting from the given sequence number
* @param from Starting sequence number (inclusive)
* @param checkpointId Checkpoint identifier for tracking
* @return CompletableFuture with snapshot result containing changelog handle
*/
public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist(
SequenceNumber from,
long checkpointId
);Persistence Examples:
// Basic persistence
SequenceNumber startSeq = writer.initialSequenceNumber();
writer.append(1, stateData1);
writer.append(2, stateData2);
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
writer.persist(startSeq, 12345L);
future.whenComplete((result, throwable) -> {
if (throwable != null) {
System.err.println("Persistence failed: " + throwable.getMessage());
} else {
ChangelogStateHandleStreamImpl handle = result.getJobManagerOwnedSnapshot();
System.out.println("Successfully persisted to: " + handle.getStreamStateHandle());
// Local handle (if local recovery is enabled)
StreamStateHandle localHandle = result.getTaskLocalSnapshot();
if (localHandle != null) {
System.out.println("Local backup at: " + localHandle);
}
}
});
// Chain multiple persist operations
CompletableFuture<Void> chainedPersistence = writer.persist(startSeq, 12345L)
.thenCompose(result1 -> {
// Write more changes
writer.append(3, moreStateData);
return writer.persist(writer.nextSequenceNumber(), 12346L);
})
.thenAccept(result2 -> {
System.out.println("Both persistence operations completed");
});Methods for managing the writer lifecycle including truncation, confirmation, and cleanup.
/**
* Truncates changelog up to the given sequence number
* @param to Sequence number to truncate to (exclusive)
*/
public void truncate(SequenceNumber to);
/**
* Truncates from the given sequence number and closes writer
* @param from Sequence number to truncate from (inclusive)
*/
public void truncateAndClose(SequenceNumber from);
/**
* Confirms successful processing of state changes in range
* @param from Start sequence number (inclusive)
* @param to End sequence number (exclusive)
* @param checkpointId Associated checkpoint ID
*/
public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
/**
* Resets state changes in range (e.g., after checkpoint failure)
* @param from Start sequence number (inclusive)
* @param to End sequence number (exclusive)
* @param checkpointId Associated checkpoint ID
*/
public void reset(SequenceNumber from, SequenceNumber to, long checkpointId);
/**
* Closes writer and releases all resources
* @throws Exception If cleanup fails
*/
public void close() throws Exception;Lifecycle Management Examples:
FsStateChangelogWriter writer = storage.createWriter(/* ... */);
try {
// Normal operation
SequenceNumber seq1 = writer.nextSequenceNumber();
writer.append(1, data1);
writer.append(2, data2);
SequenceNumber seq2 = writer.nextSequenceNumber();
writer.persist(seq1, checkpointId).get();
// Confirm successful checkpoint
writer.confirm(seq1, seq2, checkpointId);
// Continue with more changes
writer.append(3, data3);
SequenceNumber seq3 = writer.nextSequenceNumber();
// Truncate old data to save space
writer.truncate(seq2);
} catch (Exception e) {
// Reset on failure
writer.reset(seq1, seq2, checkpointId);
} finally {
// Always clean up
writer.close();
}
// Alternative: truncate and close in one operation
writer.truncateAndClose(someSequenceNumber);Common error scenarios and appropriate handling strategies.
Error Handling Examples:
FsStateChangelogWriter writer = storage.createWriter(/* ... */);
try {
// This may fail if key group is out of range
writer.append(keyGroup, stateData);
} catch (IllegalArgumentException e) {
System.err.println("Invalid key group: " + e.getMessage());
}
try {
// This may fail due to I/O issues
writer.appendMeta(metadata);
} catch (IOException e) {
System.err.println("Failed to write metadata: " + e.getMessage());
// May need to recreate writer or fail the checkpoint
}
// Handle persistence failures
writer.persist(sequenceNumber, checkpointId)
.exceptionally(throwable -> {
System.err.println("Persistence failed: " + throwable.getMessage());
// Return null result or take recovery action
return null;
});
try {
writer.close();
} catch (Exception e) {
System.err.println("Failed to close writer cleanly: " + e.getMessage());
// Log but continue with shutdown
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-dstl