Apache Flink DSTL (Distributed State Timeline) - A filesystem-based state changelog implementation for Flink's state management
—
Factory for creating filesystem-based changelog storage instances with comprehensive configuration options. This is the main entry point for using the DSTL module.
Factory class that implements the StateChangelogStorageFactory interface for creating filesystem-based changelog storage.
/**
* Factory for creating FsStateChangelogStorage instances
*/
@Internal
public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {
public static final String IDENTIFIER = "filesystem";
/**
* Returns the identifier for this storage factory
* @return "filesystem" identifier string
*/
public String getIdentifier();
/**
* Creates a new StateChangelogStorage instance
* @param jobID The job identifier
* @param configuration Configuration settings
* @param metricGroup Metric group for monitoring
* @param localRecoveryConfig Local recovery configuration
* @return StateChangelogStorage instance
* @throws IOException If storage creation fails
*/
public StateChangelogStorage<?> createStorage(
JobID jobID,
Configuration configuration,
TaskManagerJobMetricGroup metricGroup,
LocalRecoveryConfig localRecoveryConfig
) throws IOException;
/**
* Creates a storage view for recovery operations
* @param configuration Configuration settings
* @return StateChangelogStorageView instance for recovery
*/
public StateChangelogStorageView<?> createStorageView(Configuration configuration);
/**
* Helper method for programmatic configuration
* @param configuration Configuration object to modify
* @param newFolder Base folder for changelog storage
* @param uploadTimeout Timeout for upload operations
* @param maxUploadAttempts Maximum number of upload retry attempts
*/
public static void configure(
Configuration configuration,
File newFolder,
Duration uploadTimeout,
int maxUploadAttempts
);
}Usage Example:
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.Configuration;
// Create factory
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();
// Verify identifier
String identifier = factory.getIdentifier(); // Returns "filesystem"
// Configure programmatically
Configuration config = new Configuration();
FsStateChangelogStorageFactory.configure(
config,
new File("/tmp/changelog"),
Duration.ofSeconds(30),
5
);All configuration options for the filesystem-based changelog storage, defined as static constants in FsStateChangelogOptions.
/**
* Configuration options for FsStateChangelogStorage
*/
@Experimental
public class FsStateChangelogOptions {
/**
* Base path to store changelog files. Required setting.
*/
public static final ConfigOption<String> BASE_PATH =
ConfigOptions.key("state.changelog.dstl.dfs.base-path")
.stringType()
.noDefaultValue()
.withDescription("Base path to store changelog files.");
/**
* Whether to enable compression when serializing changelog. Default: false.
*/
public static final ConfigOption<Boolean> COMPRESSION_ENABLED =
ConfigOptions.key("state.changelog.dstl.dfs.compression.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable compression when serializing changelog.");
/**
* Size threshold for preemptive persistence. Default: 5MB.
*/
public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD =
ConfigOptions.key("state.changelog.dstl.dfs.preemptive-persist-threshold")
.memoryType()
.defaultValue(MemorySize.parse("5MB"))
.withDescription("Size threshold for preemptive persistence.");
/**
* Delay before persisting changelog. Default: 10ms.
*/
public static final ConfigOption<Duration> PERSIST_DELAY =
ConfigOptions.key("state.changelog.dstl.dfs.persist-delay")
.durationType()
.defaultValue(Duration.ofMillis(10))
.withDescription("Delay before persisting changelog.");
/**
* Size threshold for batch persistence. Default: 10MB.
*/
public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD =
ConfigOptions.key("state.changelog.dstl.dfs.persist-size-threshold")
.memoryType()
.defaultValue(MemorySize.parse("10MB"))
.withDescription("Size threshold for batch persistence.");
/**
* Buffer size for uploads. Default: 1MB.
*/
public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE =
ConfigOptions.key("state.changelog.dstl.dfs.upload-buffer-size")
.memoryType()
.defaultValue(MemorySize.parse("1MB"))
.withDescription("Buffer size for uploads.");
/**
* Number of upload threads. Default: 5.
*/
public static final ConfigOption<Integer> NUM_UPLOAD_THREADS =
ConfigOptions.key("state.changelog.dstl.dfs.num-upload-threads")
.intType()
.defaultValue(5)
.withDescription("Number of upload threads.");
/**
* Number of discard threads. Default: 1.
*/
public static final ConfigOption<Integer> NUM_DISCARD_THREADS =
ConfigOptions.key("state.changelog.dstl.dfs.num-discard-threads")
.intType()
.defaultValue(1)
.withDescription("Number of discard threads.");
/**
* Maximum in-flight data. Default: 100MB.
*/
public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT =
ConfigOptions.key("state.changelog.dstl.dfs.in-flight-data-limit")
.memoryType()
.defaultValue(MemorySize.parse("100MB"))
.withDescription("Maximum in-flight data.");
/**
* Retry policy for uploads. Default: "fixed".
*/
public static final ConfigOption<String> RETRY_POLICY =
ConfigOptions.key("state.changelog.dstl.dfs.retry-policy")
.stringType()
.defaultValue("fixed")
.withDescription("Retry policy for uploads.");
/**
* Upload timeout. Default: 1s.
*/
public static final ConfigOption<Duration> UPLOAD_TIMEOUT =
ConfigOptions.key("state.changelog.dstl.dfs.upload-timeout")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription("Upload timeout.");
/**
* Maximum retry attempts. Default: 3.
*/
public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS =
ConfigOptions.key("state.changelog.dstl.dfs.retry-max-attempts")
.intType()
.defaultValue(3)
.withDescription("Maximum retry attempts.");
/**
* Delay between retries. Default: 500ms.
*/
public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE =
ConfigOptions.key("state.changelog.dstl.dfs.retry-delay-after-failure")
.durationType()
.defaultValue(Duration.ofMillis(500))
.withDescription("Delay between retries.");
/**
* Cache file idle timeout. Default: 10min.
*/
public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT =
ConfigOptions.key("state.changelog.dstl.dfs.cache-idle-timeout")
.durationType()
.defaultValue(Duration.ofMinutes(10))
.withDescription("Cache file idle timeout.");
}Configuration Usage Examples:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
Configuration config = new Configuration();
// Required: Set base path
config.set(FsStateChangelogOptions.BASE_PATH, "/path/to/changelog/storage");
// Optional: Enable compression
config.set(FsStateChangelogOptions.COMPRESSION_ENABLED, true);
// Optional: Adjust thresholds
config.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.parse("10MB"));
config.set(FsStateChangelogOptions.PERSIST_SIZE_THRESHOLD, MemorySize.parse("20MB"));
// Optional: Configure upload behavior
config.set(FsStateChangelogOptions.NUM_UPLOAD_THREADS, 10);
config.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofSeconds(30));
config.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 5);
// Use configuration with factory
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();
StateChangelogStorage<?> storage = factory.createStorage(
jobID, config, metricGroup, localRecoveryConfig
);Storage view implementation for recovery-only operations, used when reading existing changelog data.
/**
* Recovery-only implementation of changelog storage
*/
@Experimental
@ThreadSafe
public class FsStateChangelogStorageForRecovery
implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {
/**
* Creates a reader for changelog handles
* @return StateChangelogHandleReader for reading changelog data
*/
public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader();
/**
* Closes the storage view and releases resources
* @throws Exception If closing fails
*/
public void close() throws Exception;
}Recovery Usage Example:
// Create storage view for recovery
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();
StateChangelogStorageView<?> storageView = factory.createStorageView(config);
// Create reader for recovery
StateChangelogHandleReader<?> reader = storageView.createReader();
// Use reader to iterate through state changes
// (reader usage depends on specific changelog handles)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-dstl