Apache Flink DSTL (Distributed State Timeline) - A filesystem-based state changelog implementation for Flink's state management
—
Filesystem-based implementation of StateChangelogStorage with thread-safe operations, writer creation, and availability tracking. This is the core storage component that coordinates state change persistence.
Main filesystem-based implementation that extends the recovery storage and adds writer creation capabilities.
/**
* Filesystem-based implementation of StateChangelogStorage
*/
@Experimental
@ThreadSafe
public class FsStateChangelogStorage
extends FsStateChangelogStorageForRecovery
implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
/**
* Creates storage with default changelog registry
* @param jobID The job identifier
* @param config Configuration settings
* @param metricGroup Metric group for monitoring
* @param localRecoveryConfig Local recovery configuration
* @throws IOException If storage initialization fails
*/
public FsStateChangelogStorage(
JobID jobID,
Configuration config,
TaskManagerJobMetricGroup metricGroup,
LocalRecoveryConfig localRecoveryConfig
) throws IOException;
/**
* Creates storage with custom changelog registry
* @param jobID The job identifier
* @param config Configuration settings
* @param metricGroup Metric group for monitoring
* @param changelogRegistry Custom changelog registry
* @param localRecoveryConfig Local recovery configuration
* @throws IOException If storage initialization fails
*/
public FsStateChangelogStorage(
JobID jobID,
Configuration config,
TaskManagerJobMetricGroup metricGroup,
TaskChangelogRegistry changelogRegistry,
LocalRecoveryConfig localRecoveryConfig
) throws IOException;
/**
* Creates a writer for state changes for a specific operator
* @param operatorID Identifier of the operator
* @param keyGroupRange Range of key groups handled by this writer
* @param mailboxExecutor Executor for asynchronous operations
* @return FsStateChangelogWriter instance
*/
public FsStateChangelogWriter createWriter(
String operatorID,
KeyGroupRange keyGroupRange,
MailboxExecutor mailboxExecutor
);
/**
* Closes the storage and releases all resources
* @throws Exception If closing fails
*/
public void close() throws Exception;
/**
* Returns availability provider for backpressure handling
* @return AvailabilityProvider instance
*/
public AvailabilityProvider getAvailabilityProvider();
}Usage Examples:
import org.apache.flink.changelog.fs.FsStateChangelogStorage;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
// Basic storage creation
Configuration config = new Configuration();
config.set(FsStateChangelogOptions.BASE_PATH, "/path/to/changelog");
FsStateChangelogStorage storage = new FsStateChangelogStorage(
new JobID(),
config,
metricGroup,
localRecoveryConfig
);
// Create writer for an operator
FsStateChangelogWriter writer = storage.createWriter(
"my-operator-id",
KeyGroupRange.of(0, 127),
mailboxExecutor
);
// Use writer to persist state changes
writer.append(5, stateChangeData);
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> result =
writer.persist(sequenceNumber, checkpointId);
// Clean up
writer.close();
storage.close();Constructor provided for testing scenarios with direct uploader configuration.
/**
* Testing constructor with direct uploader configuration
* @param jobID The job identifier
* @param basePath Base path for changelog storage
* @param compression Whether to enable compression
* @param bufferSize Buffer size for operations
* @param metricGroup Metric group for monitoring
* @param changelogRegistry Changelog registry for tracking
* @param localRecoveryConfig Local recovery configuration
* @throws IOException If storage initialization fails
*/
@VisibleForTesting
public FsStateChangelogStorage(
JobID jobID,
Path basePath,
boolean compression,
int bufferSize,
ChangelogStorageMetricGroup metricGroup,
TaskChangelogRegistry changelogRegistry,
LocalRecoveryConfig localRecoveryConfig
) throws IOException;Low-level constructor for advanced testing scenarios with custom upload scheduler.
/**
* Low-level constructor with custom upload scheduler
* @param uploader Custom state change upload scheduler
* @param preEmptivePersistThresholdInBytes Threshold for preemptive persistence
* @param changelogRegistry Changelog registry for tracking
* @param localRecoveryConfig Local recovery configuration
*/
@VisibleForTesting
public FsStateChangelogStorage(
StateChangeUploadScheduler uploader,
long preEmptivePersistThresholdInBytes,
TaskChangelogRegistry changelogRegistry,
LocalRecoveryConfig localRecoveryConfig
);Testing Usage Example:
// Testing with custom parameters
ChangelogStorageMetricGroup testMetricGroup = new ChangelogStorageMetricGroup(metricGroup);
TaskChangelogRegistry testRegistry = TaskChangelogRegistry.defaultChangelogRegistry(1);
FsStateChangelogStorage testStorage = new FsStateChangelogStorage(
new JobID(),
new Path("file:///tmp/test-changelog"),
true, // compression enabled
1024 * 1024, // 1MB buffer
testMetricGroup,
testRegistry,
localRecoveryConfig
);The storage provides availability information for backpressure handling in streaming applications.
/**
* Returns availability provider for coordinating backpressure
* @return AvailabilityProvider that signals when storage is available for writes
*/
public AvailabilityProvider getAvailabilityProvider();Availability Usage Example:
FsStateChangelogStorage storage = new FsStateChangelogStorage(/* ... */);
// Check if storage is available for writes
AvailabilityProvider availability = storage.getAvailabilityProvider();
// Wait for availability if needed
CompletableFuture<?> availabilityFuture = availability.getAvailabilityFuture();
availabilityFuture.thenRun(() -> {
// Storage is now available for writes
FsStateChangelogWriter writer = storage.createWriter(operatorID, keyGroupRange, mailboxExecutor);
// Proceed with writing operations
});The storage integrates with Flink's local recovery mechanism for improved fault tolerance.
Local Recovery Configuration Example:
// Configure local recovery
LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(
true, // enable local recovery
new LocalRecoveryDirectoryProvider() {
@Override
public File allocationBaseDirForCheckpoint(long checkpointId) {
return new File("/local/recovery/checkpoint-" + checkpointId);
}
@Override
public File subtaskSpecificCheckpointDirectory(long checkpointId, AllocationID allocationID, JobID jobID, int subtaskIndex) {
return new File("/local/recovery/checkpoint-" + checkpointId + "/task-" + subtaskIndex);
}
}
);
// Create storage with local recovery enabled
FsStateChangelogStorage storage = new FsStateChangelogStorage(
jobID, config, metricGroup, localRecoveryConfig
);The storage handles various error conditions and provides appropriate exception handling.
Common Error Scenarios:
try {
FsStateChangelogStorage storage = new FsStateChangelogStorage(
jobID, config, metricGroup, localRecoveryConfig
);
} catch (IOException e) {
// Handle storage initialization failure
// Common causes: invalid base path, permissions, filesystem issues
log.error("Failed to initialize changelog storage", e);
}
try {
storage.close();
} catch (Exception e) {
// Handle cleanup failure
// May include upload scheduler shutdown, resource cleanup
log.warn("Error during storage cleanup", e);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-dstl