Distributed file system-based changelog storage implementation for Apache Flink's streaming state backend.
—
Recovery system providing read-only access to persisted changelog data and lifecycle management for state handles. The recovery system enables checkpoint restoration and manages the lifecycle of changelog state objects.
Read-only storage implementation for recovery operations during checkpoint restoration.
/**
* Filesystem-based implementation of StateChangelogStorageView for recovery operations.
* Provides read-only access to persisted changelog data during checkpoint restoration.
*/
public class FsStateChangelogStorageForRecovery
implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {
/**
* Creates recovery storage with changelog stream handle reader
* @param changelogStreamHandleReader Reader for accessing persisted changelog streams
*/
public FsStateChangelogStorageForRecovery(
ChangelogStreamHandleReader changelogStreamHandleReader
);
/**
* Creates a reader for accessing changelog handles during recovery
* @return StateChangelogHandleReader for reading persisted changelog data
*/
public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader();
/**
* Closes the recovery storage and releases resources
* @throws Exception if cleanup fails
*/
public void close() throws Exception;
}Components for reading persisted changelog data from distributed file systems.
/**
* Reader for changelog stream handles with direct file system access
*/
public class ChangelogStreamHandleReader implements AutoCloseable {
/** Direct reader without caching */
public static final ChangelogStreamHandleReader DIRECT_READER;
/**
* Reads state changes from a changelog handle
* @param handle Changelog handle containing stream references
* @return CloseableIterator for iterating over state changes
* @throws IOException if reading fails
*/
public CloseableIterator<StateChange> read(ChangelogStateHandleStreamImpl handle)
throws IOException;
/**
* Closes the reader and releases resources
* @throws IOException if cleanup fails
*/
public void close() throws IOException;
}
/**
* Reader with local caching support for improved performance
*/
public class ChangelogStreamHandleReaderWithCache extends ChangelogStreamHandleReader {
/**
* Creates cached reader with configuration
* @param configuration Flink configuration containing cache settings
*/
public ChangelogStreamHandleReaderWithCache(Configuration configuration);
}Iterator implementation for traversing state changes during recovery.
/**
* Iterator implementation for reading state changes from changelog streams
*/
public class StateChangeIteratorImpl implements CloseableIterator<StateChange> {
/**
* Creates iterator with changelog stream reader
* @param changelogStreamHandleReader Reader for accessing changelog streams
*/
public StateChangeIteratorImpl(ChangelogStreamHandleReader changelogStreamHandleReader);
/**
* Checks if more state changes are available
* @return true if more changes exist
*/
public boolean hasNext();
/**
* Returns the next state change
* @return Next StateChange in the iteration
* @throws NoSuchElementException if no more changes exist
*/
public StateChange next();
/**
* Closes the iterator and releases resources
* @throws IOException if cleanup fails
*/
public void close() throws IOException;
}Usage Examples:
import org.apache.flink.changelog.fs.*;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
// Create recovery storage (typically done by factory)
ChangelogStreamHandleReader reader = new ChangelogStreamHandleReaderWithCache(config);
FsStateChangelogStorageForRecovery recoveryStorage =
new FsStateChangelogStorageForRecovery(reader);
// Create handle reader for recovery
StateChangelogHandleReader<ChangelogStateHandleStreamImpl> handleReader =
recoveryStorage.createReader();
// Read state changes from checkpoint handle
ChangelogStateHandleStreamImpl checkpointHandle = getCheckpointHandle();
try (CloseableIterator<StateChange> iterator = reader.read(checkpointHandle)) {
while (iterator.hasNext()) {
StateChange change = iterator.next();
// Process state change during recovery
if (change.getKeyGroup() != StateChange.META_KEY_GROUP) {
// Regular state change for specific key group
int keyGroup = change.getKeyGroup();
byte[] changeData = change.getChange();
applyStateChange(keyGroup, changeData);
} else {
// Metadata change
byte[] metadata = change.getChange();
applyMetadataChange(metadata);
}
}
}
// Cleanup
recoveryStorage.close();Registry for managing the lifecycle of changelog state handles and coordinating between job manager and task manager ownership.
/**
* Registry for tracking changelog state objects and managing their lifecycle.
* Coordinates between task manager and job manager ownership of state handles.
*/
public interface TaskChangelogRegistry {
/**
* Starts tracking a state handle with reference counting
* @param handle StreamStateHandle to track
* @param refCount Initial reference count (number of changelog segments)
*/
void startTracking(StreamStateHandle handle, long refCount);
/**
* Stops tracking a state handle (JM becomes owner)
* @param handle StreamStateHandle to stop tracking
*/
void stopTracking(StreamStateHandle handle);
/**
* Releases a reference to a state handle (decrements ref count)
* @param handle StreamStateHandle to release
*/
void release(StreamStateHandle handle);
/**
* Creates default registry with specified number of discard threads
* @param numDiscardThreads Number of threads for async discard operations
* @return TaskChangelogRegistry instance
*/
static TaskChangelogRegistry defaultChangelogRegistry(int numDiscardThreads);
}Default implementation of the changelog registry with reference counting and async cleanup.
/**
* Default implementation of TaskChangelogRegistry with reference counting
*/
public class TaskChangelogRegistryImpl implements TaskChangelogRegistry {
/**
* Creates registry with custom executor for discard operations
* @param discardExecutor Executor for running discard operations
*/
public TaskChangelogRegistryImpl(Executor discardExecutor);
/**
* Closes the registry and shuts down discard operations
* @throws Exception if cleanup fails
*/
public void close() throws Exception;
}Usage Examples:
// Create changelog registry (typically done by storage)
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);
// Start tracking uploaded state handle
StreamStateHandle uploadedHandle = uploadResult.getStreamStateHandle();
long refCount = 3; // Number of state change sets in this handle
registry.startTracking(uploadedHandle, refCount);
// Release references as state changes become unused
registry.release(uploadedHandle); // refCount becomes 2
registry.release(uploadedHandle); // refCount becomes 1
registry.release(uploadedHandle); // refCount becomes 0, handle is discarded
// Stop tracking when JM becomes owner (e.g., after checkpoint completion)
registry.stopTracking(confirmedHandle);Registry for managing local changelog files when local recovery is enabled.
/**
* Registry for managing local changelog files during recovery
*/
public interface LocalChangelogRegistry extends AutoCloseable {
/** No-op implementation when local recovery is disabled */
LocalChangelogRegistry NO_OP = /* ... */;
/**
* Registers a local changelog handle for a checkpoint
* @param handle Local stream state handle
* @param checkpointId Checkpoint identifier
*/
void register(StreamStateHandle handle, long checkpointId);
/**
* Discards local changelog files up to a checkpoint
* @param checkpointId Checkpoint identifier (inclusive)
*/
void discardUpToCheckpoint(long checkpointId);
/**
* Closes the registry and releases resources
* @throws IOException if cleanup fails
*/
void close() throws IOException;
}
/**
* Implementation of LocalChangelogRegistry with async cleanup
*/
public class LocalChangelogRegistryImpl implements LocalChangelogRegistry {
/**
* Creates local registry with single-threaded executor
* @param executor Executor for cleanup operations
*/
public LocalChangelogRegistryImpl(Executor executor);
}Wrapper components for managing changelog stream access and caching.
/**
* Wrapper for changelog streams providing additional functionality
*/
public class ChangelogStreamWrapper {
/**
* Wraps a changelog stream with additional features
* @param inputStream Underlying input stream
* @param streamStateHandle Handle for the stream
*/
public ChangelogStreamWrapper(
InputStream inputStream,
StreamStateHandle streamStateHandle
);
}The recovery system includes caching for improved performance:
// Configure cache timeout for recovery
Configuration config = new Configuration();
config.set(FsStateChangelogOptions.CACHE_IDLE_TIMEOUT, Duration.ofMinutes(10));
// Cached reader automatically manages local cache files
ChangelogStreamHandleReaderWithCache cachedReader =
new ChangelogStreamHandleReaderWithCache(config);
// Cache files are automatically cleaned up after idle timeoutRecovery operations handle various failure scenarios:
try {
// Read changelog during recovery
try (CloseableIterator<StateChange> iterator = reader.read(handle)) {
while (iterator.hasNext()) {
StateChange change = iterator.next();
// Process change...
}
}
} catch (IOException e) {
// Handle reading failures
log.error("Failed to read changelog during recovery", e);
throw new RuntimeException("Recovery failed", e);
} catch (RuntimeException e) {
// Handle processing failures
log.error("Failed to process state change during recovery", e);
throw e;
}The recovery system integrates with Flink's checkpoint lifecycle:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-dstl-dfs