or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdregistry.mdstorage-factory.mdstorage-implementation.mdupload-scheduling.mdwriters.md
tile.json

tessl/maven-org-apache-flink--flink-dstl

Apache Flink DSTL (Distributed State Timeline) - A filesystem-based state changelog implementation for Flink's state management

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-dstl@1.20.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-dstl@1.20.0

index.mddocs/

Apache Flink DSTL

Apache Flink DSTL (Distributed State Timeline) provides a filesystem-based state changelog implementation for Flink's state management system. It enables durable storage of state changes that can be used for recovery and state reconstruction in distributed streaming environments.

Package Information

  • Package Name: flink-dstl
  • Package Type: maven
  • Group ID: org.apache.flink
  • Artifact ID: flink-dstl-dfs
  • Language: Java
  • Installation: Include in Maven dependencies with org.apache.flink:flink-dstl:1.20.2

Core Imports

import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.changelog.fs.FsStateChangelogStorage;
import org.apache.flink.changelog.fs.FsStateChangelogWriter;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
import org.apache.flink.changelog.fs.TaskChangelogRegistry;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;

Basic Usage

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;

// Configure the storage
Configuration config = new Configuration();
config.set(FsStateChangelogOptions.BASE_PATH, "/path/to/changelog");
config.set(FsStateChangelogOptions.COMPRESSION_ENABLED, true);

// Create storage factory
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();

// Create storage instance
StateChangelogStorage<?> storage = factory.createStorage(
    jobID, 
    config, 
    metricGroup, 
    localRecoveryConfig
);

// Create writer for an operator
FsStateChangelogWriter writer = storage.createWriter(
    operatorID, 
    keyGroupRange, 
    mailboxExecutor
);

Architecture

The DSTL module is built around several key components:

  • Storage Factory: FsStateChangelogStorageFactory creates storage instances with identifier "filesystem"
  • Storage Implementation: FsStateChangelogStorage provides the main storage functionality
  • Configuration: FsStateChangelogOptions defines all configuration parameters
  • Writers: FsStateChangelogWriter handles writing state changes to filesystem
  • Upload System: Pluggable upload schedulers and uploaders for persistence
  • Registry: TaskChangelogRegistry tracks changelog segments on TaskManager side

Capabilities

Storage Factory and Configuration

Factory for creating filesystem-based changelog storage instances with comprehensive configuration options.

@Internal
public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {
    public static final String IDENTIFIER = "filesystem";
    
    public String getIdentifier();
    public StateChangelogStorage<?> createStorage(
        JobID jobID, 
        Configuration configuration, 
        TaskManagerJobMetricGroup metricGroup, 
        LocalRecoveryConfig localRecoveryConfig
    ) throws IOException;
    public StateChangelogStorageView<?> createStorageView(Configuration configuration);
    public static void configure(
        Configuration configuration, 
        File newFolder, 
        Duration uploadTimeout, 
        int maxUploadAttempts
    );
}

Storage Factory and Configuration

Main Storage Implementation

Filesystem-based implementation of StateChangelogStorage with thread-safe operations and writer creation.

@Experimental
@ThreadSafe
public class FsStateChangelogStorage 
    extends FsStateChangelogStorageForRecovery 
    implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
    
    public FsStateChangelogStorage(
        JobID jobID, 
        Configuration config, 
        TaskManagerJobMetricGroup metricGroup, 
        LocalRecoveryConfig localRecoveryConfig
    ) throws IOException;
    
    public FsStateChangelogWriter createWriter(
        String operatorID, 
        KeyGroupRange keyGroupRange, 
        MailboxExecutor mailboxExecutor
    );
    
    public void close() throws Exception;
    public AvailabilityProvider getAvailabilityProvider();
}

Main Storage Implementation

State Change Writers

Writer implementation for persisting state changes to filesystem with batching and upload coordination.

@NotThreadSafe
class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandleStreamImpl> {
    public void appendMeta(byte[] value) throws IOException;
    public void append(int keyGroup, byte[] value) throws IOException;
    public SequenceNumber initialSequenceNumber();
    public SequenceNumber nextSequenceNumber();
    public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist(
        SequenceNumber from, 
        long checkpointId
    );
    public void close() throws Exception;
    public void truncate(SequenceNumber to);
    public void truncateAndClose(SequenceNumber from);
    public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
    public void reset(SequenceNumber from, SequenceNumber to, long checkpointId);
}

State Change Writers

Upload Scheduling and Management

Upload scheduler interfaces and implementations for batching and coordinating state change uploads.

@Internal
public interface StateChangeUploadScheduler extends AutoCloseable {
    void upload(UploadTask uploadTask) throws IOException;
    
    static StateChangeUploadScheduler directScheduler(StateChangeUploader uploader);
    static StateChangeUploadScheduler fromConfig(
        JobID jobID, 
        ReadableConfig config, 
        ChangelogStorageMetricGroup metricGroup, 
        TaskChangelogRegistry changelogRegistry, 
        LocalRecoveryConfig localRecoveryConfig
    ) throws IOException;
    
    default AvailabilityProvider getAvailabilityProvider();
}

Upload Scheduling and Management

Registry and Tracking

TaskManager-side registry for tracking changelog segments and managing their lifecycle.

@Internal
public interface TaskChangelogRegistry {
    TaskChangelogRegistry NO_OP = new TaskChangelogRegistry() { /* no-op implementation */ };
    
    void startTracking(StreamStateHandle handle, long refCount);
    void stopTracking(StreamStateHandle handle);
    void release(StreamStateHandle handle);
    
    static TaskChangelogRegistry defaultChangelogRegistry(int numAsyncDiscardThreads);
    static TaskChangelogRegistry defaultChangelogRegistry(Executor executor);
}

Registry and Tracking

Types

Configuration Options

@Experimental
public class FsStateChangelogOptions {
    public static final ConfigOption<String> BASE_PATH;
    public static final ConfigOption<Boolean> COMPRESSION_ENABLED;
    public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD;
    public static final ConfigOption<Duration> PERSIST_DELAY;
    public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD;
    public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE;
    public static final ConfigOption<Integer> NUM_UPLOAD_THREADS;
    public static final ConfigOption<Integer> NUM_DISCARD_THREADS;
    public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT;
    public static final ConfigOption<String> RETRY_POLICY;
    public static final ConfigOption<Duration> UPLOAD_TIMEOUT;
    public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;
    public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE;
    public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT;
}

Core Data Structures

@ThreadSafe
@Internal
class StateChangeSet {
    public StateChangeSet(UUID logId, SequenceNumber sequenceNumber, List<StateChange> changes);
    
    public UUID getLogId();
    public SequenceNumber getSequenceNumber();
    public List<StateChange> getChanges();
    public long getSize();
}

@Internal
final class UploadResult {
    public final StreamStateHandle streamStateHandle;
    public final @Nullable StreamStateHandle localStreamHandle;
    public final long offset;
    public final long localOffset;
    public final SequenceNumber sequenceNumber;
    public final long size;
    
    public UploadResult(
        StreamStateHandle streamStateHandle,
        long offset,
        SequenceNumber sequenceNumber,
        long size
    );
    
    public UploadResult(
        StreamStateHandle streamStateHandle,
        @Nullable StreamStateHandle localStreamHandle,
        long offset,
        long localOffset,
        SequenceNumber sequenceNumber,
        long size
    );
    
    public static UploadResult of(
        StreamStateHandle streamStateHandle,
        StreamStateHandle localStreamHandle,
        StateChangeSet changeSet,
        long offset,
        long localOffset
    );
    
    public StreamStateHandle getStreamStateHandle();
    public StreamStateHandle getLocalStreamHandleStateHandle();
    public long getOffset();
    public long getLocalOffset();
    public SequenceNumber getSequenceNumber();
    public long getSize();
}

Upload Task Definition

@ThreadSafe
final class UploadTask {
    final Collection<StateChangeSet> changeSets;
    final Consumer<List<UploadResult>> successCallback;
    final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;
    
    public UploadTask(
        Collection<StateChangeSet> changeSets,
        Consumer<List<UploadResult>> successCallback,
        BiConsumer<List<SequenceNumber>, Throwable> failureCallback
    );
    
    public void complete(List<UploadResult> results);
    public void fail(Throwable error);
    public long getSize();
    public Collection<StateChangeSet> getChangeSets();
}

Retry Policy Interface

@Internal
public interface RetryPolicy {
    RetryPolicy NONE = new NoRetryPolicy();
    
    static RetryPolicy fromConfig(ReadableConfig config);
    static RetryPolicy fixed(int maxAttempts, long timeout, long delayAfterFailure);
    
    long timeoutFor(int attempt);
    long retryAfter(int failedAttempt, Exception exception);
}