Distributed file system-based changelog storage implementation for Apache Flink's streaming state backend.
—
Comprehensive configuration options for performance tuning, retry policies, and storage behavior optimization. All configuration options use Flink's ConfigOption framework with support for deprecated keys and environment variable overrides.
Configuration options class containing all settings for filesystem-based changelog storage.
/**
* Configuration options for FsStateChangelogStorage.
* All options use the "state.changelog.dstl.dfs" prefix.
*/
public class FsStateChangelogOptions {
/** Base path for storing changelog files */
public static final ConfigOption<String> BASE_PATH;
/** Enable/disable compression for changelog serialization */
public static final ConfigOption<Boolean> COMPRESSION_ENABLED;
/** Size threshold for preemptive persistence */
public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD;
/** Delay before persisting changelog after checkpoint trigger */
public static final ConfigOption<Duration> PERSIST_DELAY;
/** Size threshold for batched persistence operations */
public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD;
/** Buffer size for upload operations */
public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE;
/** Number of threads for upload operations */
public static final ConfigOption<Integer> NUM_UPLOAD_THREADS;
/** Number of threads for discarding unused changelog data */
public static final ConfigOption<Integer> NUM_DISCARD_THREADS;
/** Maximum amount of data allowed to be in-flight */
public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT;
/** Retry policy for failed uploads */
public static final ConfigOption<String> RETRY_POLICY;
/** Timeout for individual upload operations */
public static final ConfigOption<Duration> UPLOAD_TIMEOUT;
/** Maximum number of retry attempts */
public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;
/** Delay before next retry attempt after failure */
public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE;
/** Cache idle timeout for recovery operations */
public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT;
}Configure the base path where changelog files are stored:
/**
* Base path to store changelog files
* Key: "state.changelog.dstl.dfs.base-path"
* Deprecated key: "dstl.dfs.base-path"
* Type: String
* Required: Yes
*/
public static final ConfigOption<String> BASE_PATH =
ConfigOptions.key("state.changelog.dstl.dfs.base-path")
.stringType()
.noDefaultValue()
.withDeprecatedKeys("dstl.dfs.base-path")
.withDescription("Base path to store changelog files.");Usage Examples:
Configuration config = new Configuration();
// HDFS path
config.set(FsStateChangelogOptions.BASE_PATH, "hdfs://namenode:8020/flink/changelog");
// S3 path
config.set(FsStateChangelogOptions.BASE_PATH, "s3://my-bucket/changelog");
// Local filesystem (for testing)
config.set(FsStateChangelogOptions.BASE_PATH, "file:///tmp/changelog");Configure performance-related settings for optimal throughput:
/**
* Enable compression when serializing changelog
* Key: "state.changelog.dstl.dfs.compression.enabled"
* Default: false
*/
public static final ConfigOption<Boolean> COMPRESSION_ENABLED;
/**
* Size threshold for preemptive persistence
* Key: "state.changelog.dstl.dfs.preemptive-persist-threshold"
* Default: 5MB
*/
public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD;
/**
* Buffer size used when uploading change sets
* Key: "state.changelog.dstl.dfs.upload.buffer-size"
* Default: 1MB
*/
public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE;
/**
* Number of threads to use for upload operations
* Key: "state.changelog.dstl.dfs.upload.num-threads"
* Default: 5
*/
public static final ConfigOption<Integer> NUM_UPLOAD_THREADS;Usage Examples:
Configuration config = new Configuration();
// Enable compression for better storage efficiency
config.set(FsStateChangelogOptions.COMPRESSION_ENABLED, true);
// Reduce preemptive threshold for faster checkpoints
config.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.parse("2MB"));
// Increase buffer size for high-throughput workloads
config.set(FsStateChangelogOptions.UPLOAD_BUFFER_SIZE, MemorySize.parse("4MB"));
// Increase upload threads for better parallelism
config.set(FsStateChangelogOptions.NUM_UPLOAD_THREADS, 10);Configure batching behavior and backpressure settings:
/**
* Delay before persisting changelog after receiving persist request
* Key: "state.changelog.dstl.dfs.batch.persist-delay"
* Default: 10ms
*/
public static final ConfigOption<Duration> PERSIST_DELAY;
/**
* Size threshold for accumulated changes waiting for persist delay
* Key: "state.changelog.dstl.dfs.batch.persist-size-threshold"
* Default: 10MB
*/
public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD;
/**
* Maximum amount of data allowed to be in-flight
* Key: "state.changelog.dstl.dfs.upload.max-in-flight"
* Default: 100MB
*/
public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT;Usage Examples:
// Configure batching for better efficiency
config.set(FsStateChangelogOptions.PERSIST_DELAY, Duration.ofMillis(50));
config.set(FsStateChangelogOptions.PERSIST_SIZE_THRESHOLD, MemorySize.parse("20MB"));
// Configure backpressure limit
config.set(FsStateChangelogOptions.IN_FLIGHT_DATA_LIMIT, MemorySize.parse("200MB"));Configure retry behavior for failed upload operations:
/**
* Retry policy for failed uploads
* Key: "state.changelog.dstl.dfs.upload.retry-policy"
* Default: "fixed"
* Valid values: "none", "fixed"
*/
public static final ConfigOption<String> RETRY_POLICY;
/**
* Upload timeout duration
* Key: "state.changelog.dstl.dfs.upload.timeout"
* Default: 1 second
*/
public static final ConfigOption<Duration> UPLOAD_TIMEOUT;
/**
* Maximum number of retry attempts
* Key: "state.changelog.dstl.dfs.upload.max-attempts"
* Default: 3
*/
public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;
/**
* Delay before next retry attempt after failure
* Key: "state.changelog.dstl.dfs.upload.next-attempt-delay"
* Default: 500ms
*/
public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE;Usage Examples:
// Configure aggressive retry policy
config.set(FsStateChangelogOptions.RETRY_POLICY, "fixed");
config.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofSeconds(5));
config.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 5);
config.set(FsStateChangelogOptions.RETRY_DELAY_AFTER_FAILURE, Duration.ofSeconds(1));
// Disable retries for fast-fail behavior
config.set(FsStateChangelogOptions.RETRY_POLICY, "none");Configure recovery and cleanup behavior:
/**
* Number of threads for discarding unused changelog data
* Key: "state.changelog.dstl.dfs.discard.num-threads"
* Default: 1
*/
public static final ConfigOption<Integer> NUM_DISCARD_THREADS;
/**
* Cache idle timeout for recovery operations
* Key: "state.changelog.dstl.dfs.download.local-cache.idle-timeout-ms"
* Default: 10 minutes
*/
public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT;Usage Examples:
// Increase discard threads for faster cleanup
config.set(FsStateChangelogOptions.NUM_DISCARD_THREADS, 3);
// Reduce cache timeout for memory efficiency
config.set(FsStateChangelogOptions.CACHE_IDLE_TIMEOUT, Duration.ofMinutes(5));import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import static org.apache.flink.changelog.fs.FsStateChangelogOptions.*;
// Complete configuration for high-throughput workload
Configuration config = new Configuration();
// Storage location
config.set(BASE_PATH, "hdfs://namenode:8020/flink/changelog");
// Performance tuning
config.set(COMPRESSION_ENABLED, true);
config.set(PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.parse("3MB"));
config.set(UPLOAD_BUFFER_SIZE, MemorySize.parse("2MB"));
config.set(NUM_UPLOAD_THREADS, 8);
// Batching configuration
config.set(PERSIST_DELAY, Duration.ofMillis(20));
config.set(PERSIST_SIZE_THRESHOLD, MemorySize.parse("15MB"));
config.set(IN_FLIGHT_DATA_LIMIT, MemorySize.parse("150MB"));
// Retry configuration
config.set(RETRY_POLICY, "fixed");
config.set(UPLOAD_TIMEOUT, Duration.ofSeconds(3));
config.set(RETRY_MAX_ATTEMPTS, 4);
config.set(RETRY_DELAY_AFTER_FAILURE, Duration.ofMillis(750));
// Cleanup configuration
config.set(NUM_DISCARD_THREADS, 2);
config.set(CACHE_IDLE_TIMEOUT, Duration.ofMinutes(15));Important constraints and validation rules:
PERSIST_SIZE_THRESHOLD must not exceed IN_FLIGHT_DATA_LIMITBASE_PATH is required and must be accessible by all TaskManagersUPLOAD_TIMEOUT * RETRY_MAX_ATTEMPTS should be less than checkpoint timeoutInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-dstl-dfs