Distributed file system-based changelog storage implementation for Apache Flink's streaming state backend.
—
Factory for creating filesystem-based changelog storage instances with comprehensive configuration support for distributed file systems. The factory handles service discovery and provides convenient configuration methods for setting up changelog storage.
Main factory class implementing the StateChangelogStorageFactory interface for filesystem-based changelog storage.
/**
* Factory for creating FsStateChangelogStorage instances.
* Registered as a service for automatic discovery by Flink's StateChangelogStorageFactory loading mechanism.
*/
public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {
/** Identifier for filesystem-based changelog storage */
public static final String IDENTIFIER = "filesystem";
/**
* Returns the identifier for this storage factory
* @return "filesystem" identifier
*/
public String getIdentifier();
/**
* Creates a new changelog storage instance for write operations
* @param jobID Job identifier for the storage
* @param configuration Flink configuration containing storage settings
* @param metricGroup Metric group for collecting storage metrics
* @param localRecoveryConfig Configuration for local recovery features
* @return FsStateChangelogStorage instance for the job
* @throws IOException if storage initialization fails
*/
public StateChangelogStorage<?> createStorage(
JobID jobID,
Configuration configuration,
TaskManagerJobMetricGroup metricGroup,
LocalRecoveryConfig localRecoveryConfig
) throws IOException;
/**
* Creates a storage view for recovery operations (read-only)
* @param configuration Flink configuration
* @return FsStateChangelogStorageForRecovery instance for reading persisted changelogs
*/
public StateChangelogStorageView<?> createStorageView(Configuration configuration);
/**
* Static helper method to configure changelog storage settings
* @param configuration Configuration object to modify
* @param newFolder Base directory for changelog files
* @param uploadTimeout Timeout for upload operations
* @param maxUploadAttempts Maximum number of retry attempts for failed uploads
*/
public static void configure(
Configuration configuration,
File newFolder,
Duration uploadTimeout,
int maxUploadAttempts
);
}Usage Examples:
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.JobID;
// Configure storage using the static helper
Configuration config = new Configuration();
FsStateChangelogStorageFactory.configure(
config,
new File("/hdfs/changelog"),
Duration.ofSeconds(30),
5
);
// Create factory instance
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();
// Verify identifier
assert "filesystem".equals(factory.getIdentifier());
// Create storage for write operations
FsStateChangelogStorage storage = (FsStateChangelogStorage) factory.createStorage(
new JobID(),
config,
taskManagerJobMetricGroup,
localRecoveryConfig
);
// Create storage view for recovery
FsStateChangelogStorageForRecovery recoveryStorage =
(FsStateChangelogStorageForRecovery) factory.createStorageView(config);The factory is automatically registered via Java's ServiceLoader mechanism:
// META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory
org.apache.flink.changelog.fs.FsStateChangelogStorageFactoryThis enables automatic discovery by Flink's changelog storage loading system when the "filesystem" identifier is specified in configuration.
The factory integrates with Flink's configuration system using the options defined in FsStateChangelogOptions:
import static org.apache.flink.changelog.fs.FsStateChangelogOptions.*;
import static org.apache.flink.configuration.StateChangelogOptions.STATE_CHANGE_LOG_STORAGE;
// Manual configuration
Configuration config = new Configuration();
config.set(STATE_CHANGE_LOG_STORAGE, "filesystem");
config.set(BASE_PATH, "/path/to/changelog/storage");
config.set(UPLOAD_TIMEOUT, Duration.ofSeconds(10));
config.set(RETRY_MAX_ATTEMPTS, 3);
config.set(COMPRESSION_ENABLED, true);The factory methods handle various initialization errors:
createStorage() when filesystem access fails or configuration is invalidtry {
StateChangelogStorage<?> storage = factory.createStorage(
jobId, config, metricGroup, localRecoveryConfig
);
} catch (IOException e) {
// Handle storage initialization failure
log.error("Failed to initialize changelog storage", e);
throw new RuntimeException("Changelog storage setup failed", e);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-dstl-dfs