or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

changelog-writers.mdconfiguration-options.mdindex.mdmetrics-monitoring.mdrecovery-system.mdstorage-factory.mdstorage-implementation.mdupload-system.md
tile.json

tessl/maven-org-apache-flink--flink-dstl-dfs

Distributed file system-based changelog storage implementation for Apache Flink's streaming state backend.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-dstl-dfs@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-dstl-dfs@2.1.0

index.mddocs/

Flink DSTL DFS

Flink DSTL DFS provides a distributed file system-based implementation of changelog storage for Apache Flink's streaming state backend. It enables efficient state change tracking and recovery by persisting state modifications to distributed file systems like HDFS or S3. The library supports fault-tolerance guarantees by providing reliable state change persistence with built-in metrics, configurable upload scheduling, and preemptive persistence optimizations.

Package Information

  • Package Name: flink-dstl-dfs
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-dstl-dfs
  • Installation: Add to Maven dependencies:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-dstl-dfs</artifactId>
    <version>2.1.0</version>
</dependency>

Core Imports

import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.changelog.fs.FsStateChangelogStorage;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;

Basic Usage

import org.apache.flink.api.common.JobID;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;

// Configure filesystem changelog storage
Configuration config = new Configuration();
FsStateChangelogStorageFactory.configure(
    config,
    new File("/path/to/changelog/storage"),
    Duration.ofSeconds(10), // upload timeout
    3  // max retry attempts
);

// Create storage factory
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();

// Create storage instance
FsStateChangelogStorage storage = (FsStateChangelogStorage) factory.createStorage(
    new JobID(),
    config,
    taskManagerJobMetricGroup,
    localRecoveryConfig
);

// Create changelog writer for an operator
FsStateChangelogWriter writer = storage.createWriter(
    "operator-id",
    KeyGroupRange.of(0, 127),
    mailboxExecutor
);

// Append state changes
writer.append(0, stateChangeBytes);
writer.appendMeta(metadataBytes);

// Persist changes during checkpoint
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> result = 
    writer.persist(sequenceNumber, checkpointId);

Architecture

The Flink DSTL DFS library is organized around several key components:

  • Storage Factory: FsStateChangelogStorageFactory creates storage instances and provides service loader integration
  • Storage Implementation: FsStateChangelogStorage manages changelog writers and upload scheduling
  • Writers: FsStateChangelogWriter handles state change appending and persistence operations
  • Upload System: Configurable upload schedulers handle batching and asynchronous persistence to distributed file systems
  • Recovery System: FsStateChangelogStorageForRecovery provides read-only access for checkpoint recovery
  • Configuration: Comprehensive options for tuning performance, retry policies, and storage behavior

Capabilities

Storage Factory and Configuration

Factory for creating filesystem-based changelog storage instances with comprehensive configuration support for distributed file systems.

public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {
    public static final String IDENTIFIER = "filesystem";
    
    public String getIdentifier();
    public StateChangelogStorage<?> createStorage(
        JobID jobID,
        Configuration configuration,
        TaskManagerJobMetricGroup metricGroup,
        LocalRecoveryConfig localRecoveryConfig
    ) throws IOException;
    public StateChangelogStorageView<?> createStorageView(Configuration configuration);
    public static void configure(
        Configuration configuration,
        File newFolder,
        Duration uploadTimeout,
        int maxUploadAttempts
    );
}

Storage Factory and Configuration

Main Storage Implementation

Core filesystem-based implementation providing changelog writers and managing upload operations for high-throughput streaming applications.

public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery 
        implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
    
    public FsStateChangelogWriter createWriter(
        String operatorID, 
        KeyGroupRange keyGroupRange, 
        MailboxExecutor mailboxExecutor
    );
    public void close() throws Exception;
    public AvailabilityProvider getAvailabilityProvider();
}

Main Storage Implementation

Changelog Writers

Writers for appending state changes and managing persistence operations with preemptive flushing and checkpoint coordination.

interface StateChangelogWriter<T> {
    void append(int keyGroup, byte[] value) throws IOException;
    void appendMeta(byte[] value) throws IOException;
    SequenceNumber nextSequenceNumber();
    CompletableFuture<SnapshotResult<T>> persist(SequenceNumber from, long checkpointId) throws IOException;
    void truncate(SequenceNumber to);
    void truncateAndClose(SequenceNumber from);
    void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
    void reset(SequenceNumber from, SequenceNumber to, long checkpointId);
    void close();
}

Changelog Writers

Configuration Options

Comprehensive configuration options for performance tuning, retry policies, and storage behavior optimization.

public class FsStateChangelogOptions {
    public static final ConfigOption<String> BASE_PATH;
    public static final ConfigOption<Boolean> COMPRESSION_ENABLED;
    public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD;
    public static final ConfigOption<Duration> PERSIST_DELAY;
    public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD;
    public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE;
    public static final ConfigOption<Integer> NUM_UPLOAD_THREADS;
    public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT;
    public static final ConfigOption<String> RETRY_POLICY;
    public static final ConfigOption<Duration> UPLOAD_TIMEOUT;
    public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;
}

Configuration Options

Upload Scheduling and Management

Upload scheduling system with batching, throttling, and retry capabilities for efficient distributed file system operations.

public interface StateChangeUploadScheduler extends AutoCloseable {
    void upload(UploadTask task);
    AvailabilityProvider getAvailabilityProvider();
    void close() throws Exception;
    
    static StateChangeUploadScheduler fromConfig(
        JobID jobID,
        Configuration config,
        ChangelogStorageMetricGroup metricGroup,
        TaskChangelogRegistry changelogRegistry,
        LocalRecoveryConfig localRecoveryConfig
    ) throws IOException;
}

public interface StateChangeUploader extends AutoCloseable {
    UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException;
}

Upload System

Recovery and State Management

Recovery system providing read-only access to persisted changelog data and lifecycle management for state handles.

public class FsStateChangelogStorageForRecovery 
        implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {
    
    public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader();
    public void close() throws Exception;
}

public interface TaskChangelogRegistry {
    void startTracking(StreamStateHandle handle, long refCount);
    void stopTracking(StreamStateHandle handle);
    void release(StreamStateHandle handle);
}

Recovery and State Management

Metrics and Monitoring

Comprehensive metrics collection for monitoring upload performance, failure rates, and system health in production environments.

public class ChangelogStorageMetricGroup extends ProxyMetricGroup<MetricGroup> {
    // Provides counters for uploads, failures, batch sizes, latencies, and retry attempts
}

Metrics and Monitoring

Types

public final class UploadResult {
    public final StreamStateHandle streamStateHandle;
    public final StreamStateHandle localStreamHandle;
    public final long offset;
    public final long localOffset;
    public final SequenceNumber sequenceNumber;
    public final long size;
    
    public UploadResult(StreamStateHandle streamStateHandle, long offset, 
                       SequenceNumber sequenceNumber, long size);
    public StreamStateHandle getStreamStateHandle();
    public long getOffset();
    public SequenceNumber getSequenceNumber();
    public long getSize();
}

public class StateChangeSet {
    public StateChangeSet(UUID logId, SequenceNumber sequenceNumber, List<StateChange> changes);
    public UUID getLogId();
    public List<StateChange> getChanges();
    public SequenceNumber getSequenceNumber();
    public long getSize();
}

public interface RetryPolicy {
    long timeoutFor(int attempt);
    long retryAfter(int failedAttempt, Exception exception);
    
    static RetryPolicy fromConfig(ReadableConfig config);
    static RetryPolicy fixed(int maxAttempts, long timeout, long delayAfterFailure);
    RetryPolicy NONE = /* ... */;
}