Distributed file system-based changelog storage implementation for Apache Flink's streaming state backend.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-dstl-dfs@2.1.0Flink DSTL DFS provides a distributed file system-based implementation of changelog storage for Apache Flink's streaming state backend. It enables efficient state change tracking and recovery by persisting state modifications to distributed file systems like HDFS or S3. The library supports fault-tolerance guarantees by providing reliable state change persistence with built-in metrics, configurable upload scheduling, and preemptive persistence optimizations.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-dstl-dfs</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.changelog.fs.FsStateChangelogStorage;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;import org.apache.flink.api.common.JobID;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
// Configure filesystem changelog storage
Configuration config = new Configuration();
FsStateChangelogStorageFactory.configure(
config,
new File("/path/to/changelog/storage"),
Duration.ofSeconds(10), // upload timeout
3 // max retry attempts
);
// Create storage factory
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();
// Create storage instance
FsStateChangelogStorage storage = (FsStateChangelogStorage) factory.createStorage(
new JobID(),
config,
taskManagerJobMetricGroup,
localRecoveryConfig
);
// Create changelog writer for an operator
FsStateChangelogWriter writer = storage.createWriter(
"operator-id",
KeyGroupRange.of(0, 127),
mailboxExecutor
);
// Append state changes
writer.append(0, stateChangeBytes);
writer.appendMeta(metadataBytes);
// Persist changes during checkpoint
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> result =
writer.persist(sequenceNumber, checkpointId);The Flink DSTL DFS library is organized around several key components:
FsStateChangelogStorageFactory creates storage instances and provides service loader integrationFsStateChangelogStorage manages changelog writers and upload schedulingFsStateChangelogWriter handles state change appending and persistence operationsFsStateChangelogStorageForRecovery provides read-only access for checkpoint recoveryFactory for creating filesystem-based changelog storage instances with comprehensive configuration support for distributed file systems.
public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {
public static final String IDENTIFIER = "filesystem";
public String getIdentifier();
public StateChangelogStorage<?> createStorage(
JobID jobID,
Configuration configuration,
TaskManagerJobMetricGroup metricGroup,
LocalRecoveryConfig localRecoveryConfig
) throws IOException;
public StateChangelogStorageView<?> createStorageView(Configuration configuration);
public static void configure(
Configuration configuration,
File newFolder,
Duration uploadTimeout,
int maxUploadAttempts
);
}Storage Factory and Configuration
Core filesystem-based implementation providing changelog writers and managing upload operations for high-throughput streaming applications.
public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery
implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
public FsStateChangelogWriter createWriter(
String operatorID,
KeyGroupRange keyGroupRange,
MailboxExecutor mailboxExecutor
);
public void close() throws Exception;
public AvailabilityProvider getAvailabilityProvider();
}Writers for appending state changes and managing persistence operations with preemptive flushing and checkpoint coordination.
interface StateChangelogWriter<T> {
void append(int keyGroup, byte[] value) throws IOException;
void appendMeta(byte[] value) throws IOException;
SequenceNumber nextSequenceNumber();
CompletableFuture<SnapshotResult<T>> persist(SequenceNumber from, long checkpointId) throws IOException;
void truncate(SequenceNumber to);
void truncateAndClose(SequenceNumber from);
void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
void reset(SequenceNumber from, SequenceNumber to, long checkpointId);
void close();
}Comprehensive configuration options for performance tuning, retry policies, and storage behavior optimization.
public class FsStateChangelogOptions {
public static final ConfigOption<String> BASE_PATH;
public static final ConfigOption<Boolean> COMPRESSION_ENABLED;
public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD;
public static final ConfigOption<Duration> PERSIST_DELAY;
public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD;
public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE;
public static final ConfigOption<Integer> NUM_UPLOAD_THREADS;
public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT;
public static final ConfigOption<String> RETRY_POLICY;
public static final ConfigOption<Duration> UPLOAD_TIMEOUT;
public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;
}Upload scheduling system with batching, throttling, and retry capabilities for efficient distributed file system operations.
public interface StateChangeUploadScheduler extends AutoCloseable {
void upload(UploadTask task);
AvailabilityProvider getAvailabilityProvider();
void close() throws Exception;
static StateChangeUploadScheduler fromConfig(
JobID jobID,
Configuration config,
ChangelogStorageMetricGroup metricGroup,
TaskChangelogRegistry changelogRegistry,
LocalRecoveryConfig localRecoveryConfig
) throws IOException;
}
public interface StateChangeUploader extends AutoCloseable {
UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException;
}Recovery system providing read-only access to persisted changelog data and lifecycle management for state handles.
public class FsStateChangelogStorageForRecovery
implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {
public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader();
public void close() throws Exception;
}
public interface TaskChangelogRegistry {
void startTracking(StreamStateHandle handle, long refCount);
void stopTracking(StreamStateHandle handle);
void release(StreamStateHandle handle);
}Comprehensive metrics collection for monitoring upload performance, failure rates, and system health in production environments.
public class ChangelogStorageMetricGroup extends ProxyMetricGroup<MetricGroup> {
// Provides counters for uploads, failures, batch sizes, latencies, and retry attempts
}public final class UploadResult {
public final StreamStateHandle streamStateHandle;
public final StreamStateHandle localStreamHandle;
public final long offset;
public final long localOffset;
public final SequenceNumber sequenceNumber;
public final long size;
public UploadResult(StreamStateHandle streamStateHandle, long offset,
SequenceNumber sequenceNumber, long size);
public StreamStateHandle getStreamStateHandle();
public long getOffset();
public SequenceNumber getSequenceNumber();
public long getSize();
}
public class StateChangeSet {
public StateChangeSet(UUID logId, SequenceNumber sequenceNumber, List<StateChange> changes);
public UUID getLogId();
public List<StateChange> getChanges();
public SequenceNumber getSequenceNumber();
public long getSize();
}
public interface RetryPolicy {
long timeoutFor(int attempt);
long retryAfter(int failedAttempt, Exception exception);
static RetryPolicy fromConfig(ReadableConfig config);
static RetryPolicy fixed(int maxAttempts, long timeout, long delayAfterFailure);
RetryPolicy NONE = /* ... */;
}