Distributed file system-based changelog storage implementation for Apache Flink's streaming state backend.
—
Core filesystem-based implementation providing changelog writers and managing upload operations for high-throughput streaming applications. The storage implementation coordinates between writers, upload schedulers, and recovery components.
Main storage implementation that manages changelog writers and upload operations for active streaming jobs.
/**
* Filesystem-based implementation of StateChangelogStorage for write operations.
* Thread-safe and manages multiple changelog writers for different operators.
*/
public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery
implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
/**
* Creates a new changelog writer for a specific operator and key group range
* @param operatorID Unique identifier for the operator
* @param keyGroupRange Key group range this writer handles
* @param mailboxExecutor Executor for callback processing
* @return FsStateChangelogWriter instance for the operator
*/
public FsStateChangelogWriter createWriter(
String operatorID,
KeyGroupRange keyGroupRange,
MailboxExecutor mailboxExecutor
);
/**
* Closes the storage and all associated resources
* @throws Exception if cleanup fails
*/
public void close() throws Exception;
/**
* Returns availability provider for backpressure control
* @return AvailabilityProvider indicating when storage can accept more data
*/
public AvailabilityProvider getAvailabilityProvider();
}Multiple constructor variants support different initialization scenarios:
/**
* Main constructor for production use
*/
public FsStateChangelogStorage(
JobID jobID,
Configuration config,
TaskManagerJobMetricGroup metricGroup,
LocalRecoveryConfig localRecoveryConfig
) throws IOException;
/**
* Constructor with custom changelog registry
*/
public FsStateChangelogStorage(
JobID jobID,
Configuration config,
TaskManagerJobMetricGroup metricGroup,
TaskChangelogRegistry changelogRegistry,
LocalRecoveryConfig localRecoveryConfig
) throws IOException;
/**
* Testing constructor with direct parameters
*/
public FsStateChangelogStorage(
JobID jobID,
Path basePath,
boolean compression,
int bufferSize,
ChangelogStorageMetricGroup metricGroup,
TaskChangelogRegistry changelogRegistry,
LocalRecoveryConfig localRecoveryConfig
) throws IOException;
/**
* Advanced constructor with custom upload scheduler
*/
public FsStateChangelogStorage(
StateChangeUploadScheduler uploader,
long preEmptivePersistThresholdInBytes,
TaskChangelogRegistry changelogRegistry,
LocalRecoveryConfig localRecoveryConfig
);Usage Examples:
import org.apache.flink.changelog.fs.FsStateChangelogStorage;
import org.apache.flink.runtime.state.KeyGroupRange;
// Create storage instance (typically done by factory)
FsStateChangelogStorage storage = new FsStateChangelogStorage(
jobId, config, metricGroup, localRecoveryConfig
);
// Create writers for different operators
FsStateChangelogWriter operatorWriter1 = storage.createWriter(
"map-operator",
KeyGroupRange.of(0, 63),
mailboxExecutor
);
FsStateChangelogWriter operatorWriter2 = storage.createWriter(
"filter-operator",
KeyGroupRange.of(64, 127),
mailboxExecutor
);
// Check if storage can accept more data
AvailabilityProvider availability = storage.getAvailabilityProvider();
if (availability.isAvailable()) {
// Safe to write more data
operatorWriter1.append(keyGroup, stateChangeBytes);
}
// Cleanup when done
storage.close();The storage implementation integrates with the upload scheduling system:
/**
* Internal components managed by FsStateChangelogStorage
*/
class InternalComponents {
private final StateChangeUploadScheduler uploader;
private final long preEmptivePersistThresholdInBytes;
private final TaskChangelogRegistry changelogRegistry;
private final AtomicInteger logIdGenerator;
private final LocalChangelogRegistry localChangelogRegistry;
}The storage automatically:
When local recovery is enabled, the storage manages local changelog registries:
// Local recovery configuration
LocalRecoveryConfig localRecoveryConfig = LocalRecoveryConfig.enabled(localStateDirectory);
FsStateChangelogStorage storage = new FsStateChangelogStorage(
jobId, config, metricGroup, localRecoveryConfig
);
// Storage automatically creates LocalChangelogRegistryImpl when enabled
// Handles both remote and local persistence of changelog dataThe storage provides backpressure mechanisms through availability providers:
import org.apache.flink.runtime.io.AvailabilityProvider;
// Monitor storage availability
AvailabilityProvider availability = storage.getAvailabilityProvider();
// Use in async context
availability.getAvailabilityFuture().thenRun(() -> {
// Storage is available, safe to continue writing
writer.append(keyGroup, data);
});The storage handles various error conditions and lifecycle events:
try {
FsStateChangelogStorage storage = new FsStateChangelogStorage(
jobId, config, metricGroup, localRecoveryConfig
);
// Use storage...
} catch (IOException e) {
// Handle initialization or operation errors
log.error("Storage operation failed", e);
} finally {
// Always close to cleanup resources
if (storage != null) {
storage.close();
}
}The storage ensures:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-dstl-dfs