CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-dstl

Apache Flink DSTL (Distributed State Timeline) - A filesystem-based state changelog implementation for Flink's state management

Pending
Overview
Eval results
Files

storage-factory.mddocs/

Storage Factory and Configuration

Factory for creating filesystem-based changelog storage instances with comprehensive configuration options. This is the main entry point for using the DSTL module.

Capabilities

FsStateChangelogStorageFactory

Factory class that implements the StateChangelogStorageFactory interface for creating filesystem-based changelog storage.

/**
 * Factory for creating FsStateChangelogStorage instances
 */
@Internal
public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {
    public static final String IDENTIFIER = "filesystem";
    
    /**
     * Returns the identifier for this storage factory
     * @return "filesystem" identifier string
     */
    public String getIdentifier();
    
    /**
     * Creates a new StateChangelogStorage instance
     * @param jobID The job identifier
     * @param configuration Configuration settings
     * @param metricGroup Metric group for monitoring
     * @param localRecoveryConfig Local recovery configuration
     * @return StateChangelogStorage instance
     * @throws IOException If storage creation fails
     */
    public StateChangelogStorage<?> createStorage(
        JobID jobID,
        Configuration configuration,
        TaskManagerJobMetricGroup metricGroup,
        LocalRecoveryConfig localRecoveryConfig
    ) throws IOException;
    
    /**
     * Creates a storage view for recovery operations
     * @param configuration Configuration settings
     * @return StateChangelogStorageView instance for recovery
     */
    public StateChangelogStorageView<?> createStorageView(Configuration configuration);
    
    /**
     * Helper method for programmatic configuration
     * @param configuration Configuration object to modify
     * @param newFolder Base folder for changelog storage
     * @param uploadTimeout Timeout for upload operations
     * @param maxUploadAttempts Maximum number of upload retry attempts
     */
    public static void configure(
        Configuration configuration,
        File newFolder,
        Duration uploadTimeout,
        int maxUploadAttempts
    );
}

Usage Example:

import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.Configuration;

// Create factory
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();

// Verify identifier
String identifier = factory.getIdentifier(); // Returns "filesystem"

// Configure programmatically
Configuration config = new Configuration();
FsStateChangelogStorageFactory.configure(
    config,
    new File("/tmp/changelog"),
    Duration.ofSeconds(30),
    5
);

Configuration Options

All configuration options for the filesystem-based changelog storage, defined as static constants in FsStateChangelogOptions.

/**
 * Configuration options for FsStateChangelogStorage
 */
@Experimental
public class FsStateChangelogOptions {
    
    /**
     * Base path to store changelog files. Required setting.
     */
    public static final ConfigOption<String> BASE_PATH =
        ConfigOptions.key("state.changelog.dstl.dfs.base-path")
            .stringType()
            .noDefaultValue()
            .withDescription("Base path to store changelog files.");
    
    /**
     * Whether to enable compression when serializing changelog. Default: false.
     */
    public static final ConfigOption<Boolean> COMPRESSION_ENABLED =
        ConfigOptions.key("state.changelog.dstl.dfs.compression.enabled")
            .booleanType()
            .defaultValue(false)
            .withDescription("Whether to enable compression when serializing changelog.");
    
    /**
     * Size threshold for preemptive persistence. Default: 5MB.
     */
    public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD =
        ConfigOptions.key("state.changelog.dstl.dfs.preemptive-persist-threshold")
            .memoryType()
            .defaultValue(MemorySize.parse("5MB"))
            .withDescription("Size threshold for preemptive persistence.");
    
    /**
     * Delay before persisting changelog. Default: 10ms.
     */
    public static final ConfigOption<Duration> PERSIST_DELAY =
        ConfigOptions.key("state.changelog.dstl.dfs.persist-delay")
            .durationType()
            .defaultValue(Duration.ofMillis(10))
            .withDescription("Delay before persisting changelog.");
    
    /**
     * Size threshold for batch persistence. Default: 10MB.
     */
    public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD =
        ConfigOptions.key("state.changelog.dstl.dfs.persist-size-threshold")
            .memoryType()
            .defaultValue(MemorySize.parse("10MB"))
            .withDescription("Size threshold for batch persistence.");
    
    /**
     * Buffer size for uploads. Default: 1MB.
     */
    public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE =
        ConfigOptions.key("state.changelog.dstl.dfs.upload-buffer-size")
            .memoryType()
            .defaultValue(MemorySize.parse("1MB"))
            .withDescription("Buffer size for uploads.");
    
    /**
     * Number of upload threads. Default: 5.
     */
    public static final ConfigOption<Integer> NUM_UPLOAD_THREADS =
        ConfigOptions.key("state.changelog.dstl.dfs.num-upload-threads")
            .intType()
            .defaultValue(5)
            .withDescription("Number of upload threads.");
    
    /**
     * Number of discard threads. Default: 1.
     */
    public static final ConfigOption<Integer> NUM_DISCARD_THREADS =
        ConfigOptions.key("state.changelog.dstl.dfs.num-discard-threads")
            .intType()
            .defaultValue(1)
            .withDescription("Number of discard threads.");
    
    /**
     * Maximum in-flight data. Default: 100MB.
     */
    public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT =
        ConfigOptions.key("state.changelog.dstl.dfs.in-flight-data-limit")
            .memoryType()
            .defaultValue(MemorySize.parse("100MB"))
            .withDescription("Maximum in-flight data.");
    
    /**
     * Retry policy for uploads. Default: "fixed".
     */
    public static final ConfigOption<String> RETRY_POLICY =
        ConfigOptions.key("state.changelog.dstl.dfs.retry-policy")
            .stringType()
            .defaultValue("fixed")
            .withDescription("Retry policy for uploads.");
    
    /**
     * Upload timeout. Default: 1s.
     */
    public static final ConfigOption<Duration> UPLOAD_TIMEOUT =
        ConfigOptions.key("state.changelog.dstl.dfs.upload-timeout")
            .durationType()
            .defaultValue(Duration.ofSeconds(1))
            .withDescription("Upload timeout.");
    
    /**
     * Maximum retry attempts. Default: 3.
     */
    public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS =
        ConfigOptions.key("state.changelog.dstl.dfs.retry-max-attempts")
            .intType()
            .defaultValue(3)
            .withDescription("Maximum retry attempts.");
    
    /**
     * Delay between retries. Default: 500ms.
     */
    public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE =
        ConfigOptions.key("state.changelog.dstl.dfs.retry-delay-after-failure")
            .durationType()
            .defaultValue(Duration.ofMillis(500))
            .withDescription("Delay between retries.");
    
    /**
     * Cache file idle timeout. Default: 10min.
     */
    public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT =
        ConfigOptions.key("state.changelog.dstl.dfs.cache-idle-timeout")
            .durationType()
            .defaultValue(Duration.ofMinutes(10))
            .withDescription("Cache file idle timeout.");
}

Configuration Usage Examples:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;

Configuration config = new Configuration();

// Required: Set base path
config.set(FsStateChangelogOptions.BASE_PATH, "/path/to/changelog/storage");

// Optional: Enable compression
config.set(FsStateChangelogOptions.COMPRESSION_ENABLED, true);

// Optional: Adjust thresholds
config.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.parse("10MB"));
config.set(FsStateChangelogOptions.PERSIST_SIZE_THRESHOLD, MemorySize.parse("20MB"));

// Optional: Configure upload behavior
config.set(FsStateChangelogOptions.NUM_UPLOAD_THREADS, 10);
config.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofSeconds(30));
config.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 5);

// Use configuration with factory
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();
StateChangelogStorage<?> storage = factory.createStorage(
    jobID, config, metricGroup, localRecoveryConfig
);

Recovery Storage View

Storage view implementation for recovery-only operations, used when reading existing changelog data.

/**
 * Recovery-only implementation of changelog storage
 */
@Experimental
@ThreadSafe
public class FsStateChangelogStorageForRecovery 
    implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {
    
    /**
     * Creates a reader for changelog handles
     * @return StateChangelogHandleReader for reading changelog data
     */
    public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader();
    
    /**
     * Closes the storage view and releases resources
     * @throws Exception If closing fails
     */
    public void close() throws Exception;
}

Recovery Usage Example:

// Create storage view for recovery
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();
StateChangelogStorageView<?> storageView = factory.createStorageView(config);

// Create reader for recovery
StateChangelogHandleReader<?> reader = storageView.createReader();

// Use reader to iterate through state changes
// (reader usage depends on specific changelog handles)

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-dstl

docs

index.md

registry.md

storage-factory.md

storage-implementation.md

upload-scheduling.md

writers.md

tile.json