Distributed file system-based changelog storage implementation for Apache Flink's streaming state backend.
—
Writers for appending state changes and managing persistence operations with preemptive flushing and checkpoint coordination. Writers handle the accumulation, batching, and persistence of state changes for individual operators.
Core interface for writing state changes with checkpoint coordination and lifecycle management.
/**
* Interface for writing state changes to changelog storage.
* Provides methods for appending changes, managing persistence, and coordinating with checkpoints.
*/
public interface StateChangelogWriter<T> extends AutoCloseable {
/**
* Appends a state change for a specific key group
* @param keyGroup Key group identifier (0-based)
* @param value Serialized state change data
* @throws IOException if append operation fails
*/
void append(int keyGroup, byte[] value) throws IOException;
/**
* Appends metadata changes (not associated with any key group)
* @param value Serialized metadata change data
* @throws IOException if append operation fails
*/
void appendMeta(byte[] value) throws IOException;
/**
* Returns the initial sequence number for this writer
* @return Initial SequenceNumber (typically 0)
*/
SequenceNumber initialSequenceNumber();
/**
* Advances to the next sequence number, creating a rollover point
* @return Next SequenceNumber for distinguishing change batches
*/
SequenceNumber nextSequenceNumber();
/**
* Persists accumulated changes starting from the specified sequence number
* @param from Starting sequence number (inclusive)
* @param checkpointId Checkpoint identifier for this persistence operation
* @return CompletableFuture with snapshot result containing changelog handles
* @throws IOException if persistence fails
*/
CompletableFuture<SnapshotResult<T>> persist(SequenceNumber from, long checkpointId)
throws IOException;
/**
* Truncates changes up to the specified sequence number
* @param to Sequence number to truncate up to (exclusive)
*/
void truncate(SequenceNumber to);
/**
* Truncates changes from the specified sequence number and closes writer
* @param from Sequence number to truncate from (inclusive)
*/
void truncateAndClose(SequenceNumber from);
/**
* Confirms successful checkpoint completion for a range of sequence numbers
* @param from Starting sequence number (inclusive)
* @param to Ending sequence number (exclusive)
* @param checkpointId Checkpoint identifier
*/
void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
/**
* Resets writer state after checkpoint abort
* @param from Starting sequence number (inclusive)
* @param to Ending sequence number (exclusive)
* @param checkpointId Checkpoint identifier
*/
void reset(SequenceNumber from, SequenceNumber to, long checkpointId);
/**
* Closes the writer and releases resources
*/
void close();
}Filesystem-specific implementation with preemptive persistence and batch management.
/**
* Filesystem-based implementation of StateChangelogWriter.
* Not thread-safe - designed for single-threaded use per operator.
*/
class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandleStreamImpl> {
/**
* Constructor for filesystem changelog writer
* @param logId Unique identifier for this writer's log
* @param keyGroupRange Key group range this writer handles
* @param uploader Upload scheduler for persistence operations
* @param preEmptivePersistThresholdInBytes Size threshold for preemptive persistence
* @param mailboxExecutor Executor for callback processing
* @param changelogRegistry Registry for managing changelog state lifecycle
* @param localRecoveryConfig Configuration for local recovery
* @param localChangelogRegistry Registry for local changelog files
*/
FsStateChangelogWriter(
UUID logId,
KeyGroupRange keyGroupRange,
StateChangeUploadScheduler uploader,
long preEmptivePersistThresholdInBytes,
MailboxExecutor mailboxExecutor,
TaskChangelogRegistry changelogRegistry,
LocalRecoveryConfig localRecoveryConfig,
LocalChangelogRegistry localChangelogRegistry
);
}Usage Examples:
import org.apache.flink.changelog.fs.FsStateChangelogWriter;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
// Writer is typically created by FsStateChangelogStorage
FsStateChangelogWriter writer = storage.createWriter(
"my-operator",
KeyGroupRange.of(0, 127),
mailboxExecutor
);
// Append state changes during processing
writer.append(5, stateChangeForKeyGroup5);
writer.append(10, stateChangeForKeyGroup10);
writer.appendMeta(operatorMetadata);
// Get sequence number for checkpoint coordination
SequenceNumber checkpointSqn = writer.nextSequenceNumber();
// Persist changes during checkpoint
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
writer.persist(writer.initialSequenceNumber(), checkpointId);
future.thenAccept(result -> {
// Checkpoint snapshot completed
ChangelogStateHandleStreamImpl handle = result.getJobManagerOwnedSnapshot();
log.info("Persisted changelog with {} handles", handle.getStreamStateHandles().size());
});
// Cleanup after checkpoint completion
writer.confirm(fromSqn, toSqn, checkpointId);Writers automatically trigger persistence when accumulated changes exceed the configured threshold:
// Configure preemptive persistence threshold
Configuration config = new Configuration();
config.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.parse("5MB"));
// Writer automatically flushes when threshold is exceeded
writer.append(keyGroup, largeStateChange); // May trigger preemptive flushThe preemptive persistence:
Writers use sequence numbers to track and coordinate state change batches:
// Initial state
SequenceNumber initial = writer.initialSequenceNumber(); // Typically SequenceNumber.of(0)
// Advance sequence number to create rollover points
writer.append(1, change1);
writer.append(2, change2);
SequenceNumber rollover1 = writer.nextSequenceNumber();
writer.append(3, change3);
writer.append(4, change4);
SequenceNumber rollover2 = writer.nextSequenceNumber();
// Persist changes from specific sequence number
CompletableFuture<SnapshotResult> result = writer.persist(rollover1, checkpointId);Writers handle various error conditions and coordinate with Flink's fault tolerance:
try {
writer.append(keyGroup, stateChange);
CompletableFuture<SnapshotResult> persistFuture = writer.persist(sqn, checkpointId);
persistFuture.whenComplete((result, throwable) -> {
if (throwable != null) {
// Handle persistence failure
log.error("Changelog persistence failed for checkpoint {}", checkpointId, throwable);
// Flink will trigger checkpoint abort and recovery
} else {
// Persistence successful
log.debug("Changelog persisted successfully for checkpoint {}", checkpointId);
}
});
} catch (IOException e) {
// Handle append failures
log.error("Failed to append state change", e);
throw new RuntimeException("State change append failed", e);
}Writers coordinate with Flink's checkpoint lifecycle:
// During checkpoint
CompletableFuture<SnapshotResult> snapshotFuture = writer.persist(fromSqn, checkpointId);
// On checkpoint completion
writer.confirm(fromSqn, toSqn, checkpointId);
// On checkpoint abort/failure
writer.reset(fromSqn, toSqn, checkpointId);
// On state truncation (after successful checkpoint)
writer.truncate(truncateUpToSqn);
// On operator shutdown
writer.close();When local recovery is enabled, writers coordinate with local changelog registries:
// Local recovery handles are automatically managed
CompletableFuture<SnapshotResult> result = writer.persist(sqn, checkpointId);
result.thenAccept(snapshotResult -> {
// Both remote and local handles are available
ChangelogStateHandleStreamImpl remoteHandle = snapshotResult.getJobManagerOwnedSnapshot();
ChangelogStateHandleStreamImpl localHandle = snapshotResult.getTaskLocalSnapshot();
// Local registry tracks local handles for recovery
});Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-dstl-dfs