CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-dstl-dfs

Distributed file system-based changelog storage implementation for Apache Flink's streaming state backend.

Pending
Overview
Eval results
Files

recovery-system.mddocs/

Recovery and State Management

Recovery system providing read-only access to persisted changelog data and lifecycle management for state handles. The recovery system enables checkpoint restoration and manages the lifecycle of changelog state objects.

Capabilities

FsStateChangelogStorageForRecovery

Read-only storage implementation for recovery operations during checkpoint restoration.

/**
 * Filesystem-based implementation of StateChangelogStorageView for recovery operations.
 * Provides read-only access to persisted changelog data during checkpoint restoration.
 */
public class FsStateChangelogStorageForRecovery 
        implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {
    
    /**
     * Creates recovery storage with changelog stream handle reader
     * @param changelogStreamHandleReader Reader for accessing persisted changelog streams
     */
    public FsStateChangelogStorageForRecovery(
        ChangelogStreamHandleReader changelogStreamHandleReader
    );
    
    /**
     * Creates a reader for accessing changelog handles during recovery
     * @return StateChangelogHandleReader for reading persisted changelog data
     */
    public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader();
    
    /**
     * Closes the recovery storage and releases resources
     * @throws Exception if cleanup fails
     */
    public void close() throws Exception;
}

Changelog Stream Reading

Components for reading persisted changelog data from distributed file systems.

/**
 * Reader for changelog stream handles with direct file system access
 */
public class ChangelogStreamHandleReader implements AutoCloseable {
    
    /** Direct reader without caching */
    public static final ChangelogStreamHandleReader DIRECT_READER;
    
    /**
     * Reads state changes from a changelog handle
     * @param handle Changelog handle containing stream references
     * @return CloseableIterator for iterating over state changes
     * @throws IOException if reading fails
     */
    public CloseableIterator<StateChange> read(ChangelogStateHandleStreamImpl handle) 
        throws IOException;
    
    /**
     * Closes the reader and releases resources
     * @throws IOException if cleanup fails
     */
    public void close() throws IOException;
}

/**
 * Reader with local caching support for improved performance
 */
public class ChangelogStreamHandleReaderWithCache extends ChangelogStreamHandleReader {
    
    /**
     * Creates cached reader with configuration
     * @param configuration Flink configuration containing cache settings
     */
    public ChangelogStreamHandleReaderWithCache(Configuration configuration);
}

State Change Iteration

Iterator implementation for traversing state changes during recovery.

/**
 * Iterator implementation for reading state changes from changelog streams
 */
public class StateChangeIteratorImpl implements CloseableIterator<StateChange> {
    
    /**
     * Creates iterator with changelog stream reader
     * @param changelogStreamHandleReader Reader for accessing changelog streams
     */
    public StateChangeIteratorImpl(ChangelogStreamHandleReader changelogStreamHandleReader);
    
    /**
     * Checks if more state changes are available
     * @return true if more changes exist
     */
    public boolean hasNext();
    
    /**
     * Returns the next state change
     * @return Next StateChange in the iteration
     * @throws NoSuchElementException if no more changes exist
     */
    public StateChange next();
    
    /**
     * Closes the iterator and releases resources
     * @throws IOException if cleanup fails
     */
    public void close() throws IOException;
}

Usage Examples:

import org.apache.flink.changelog.fs.*;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;

// Create recovery storage (typically done by factory)
ChangelogStreamHandleReader reader = new ChangelogStreamHandleReaderWithCache(config);
FsStateChangelogStorageForRecovery recoveryStorage = 
    new FsStateChangelogStorageForRecovery(reader);

// Create handle reader for recovery
StateChangelogHandleReader<ChangelogStateHandleStreamImpl> handleReader = 
    recoveryStorage.createReader();

// Read state changes from checkpoint handle
ChangelogStateHandleStreamImpl checkpointHandle = getCheckpointHandle();
try (CloseableIterator<StateChange> iterator = reader.read(checkpointHandle)) {
    while (iterator.hasNext()) {
        StateChange change = iterator.next();
        
        // Process state change during recovery
        if (change.getKeyGroup() != StateChange.META_KEY_GROUP) {
            // Regular state change for specific key group
            int keyGroup = change.getKeyGroup();
            byte[] changeData = change.getChange();
            applyStateChange(keyGroup, changeData);
        } else {
            // Metadata change
            byte[] metadata = change.getChange();
            applyMetadataChange(metadata);
        }
    }
}

// Cleanup
recoveryStorage.close();

TaskChangelogRegistry

Registry for managing the lifecycle of changelog state handles and coordinating between job manager and task manager ownership.

/**
 * Registry for tracking changelog state objects and managing their lifecycle.
 * Coordinates between task manager and job manager ownership of state handles.
 */
public interface TaskChangelogRegistry {
    
    /**
     * Starts tracking a state handle with reference counting
     * @param handle StreamStateHandle to track
     * @param refCount Initial reference count (number of changelog segments)
     */
    void startTracking(StreamStateHandle handle, long refCount);
    
    /**
     * Stops tracking a state handle (JM becomes owner)
     * @param handle StreamStateHandle to stop tracking
     */
    void stopTracking(StreamStateHandle handle);
    
    /**
     * Releases a reference to a state handle (decrements ref count)
     * @param handle StreamStateHandle to release
     */
    void release(StreamStateHandle handle);
    
    /**
     * Creates default registry with specified number of discard threads
     * @param numDiscardThreads Number of threads for async discard operations
     * @return TaskChangelogRegistry instance
     */
    static TaskChangelogRegistry defaultChangelogRegistry(int numDiscardThreads);
}

TaskChangelogRegistryImpl

Default implementation of the changelog registry with reference counting and async cleanup.

/**
 * Default implementation of TaskChangelogRegistry with reference counting
 */
public class TaskChangelogRegistryImpl implements TaskChangelogRegistry {
    
    /**
     * Creates registry with custom executor for discard operations
     * @param discardExecutor Executor for running discard operations
     */
    public TaskChangelogRegistryImpl(Executor discardExecutor);
    
    /**
     * Closes the registry and shuts down discard operations
     * @throws Exception if cleanup fails
     */
    public void close() throws Exception;
}

Usage Examples:

// Create changelog registry (typically done by storage)
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);

// Start tracking uploaded state handle
StreamStateHandle uploadedHandle = uploadResult.getStreamStateHandle();
long refCount = 3; // Number of state change sets in this handle
registry.startTracking(uploadedHandle, refCount);

// Release references as state changes become unused
registry.release(uploadedHandle); // refCount becomes 2
registry.release(uploadedHandle); // refCount becomes 1  
registry.release(uploadedHandle); // refCount becomes 0, handle is discarded

// Stop tracking when JM becomes owner (e.g., after checkpoint completion)
registry.stopTracking(confirmedHandle);

Local Changelog Registry

Registry for managing local changelog files when local recovery is enabled.

/**
 * Registry for managing local changelog files during recovery
 */
public interface LocalChangelogRegistry extends AutoCloseable {
    
    /** No-op implementation when local recovery is disabled */
    LocalChangelogRegistry NO_OP = /* ... */;
    
    /**
     * Registers a local changelog handle for a checkpoint
     * @param handle Local stream state handle
     * @param checkpointId Checkpoint identifier
     */
    void register(StreamStateHandle handle, long checkpointId);
    
    /**
     * Discards local changelog files up to a checkpoint
     * @param checkpointId Checkpoint identifier (inclusive)
     */
    void discardUpToCheckpoint(long checkpointId);
    
    /**
     * Closes the registry and releases resources
     * @throws IOException if cleanup fails
     */
    void close() throws IOException;
}

/**
 * Implementation of LocalChangelogRegistry with async cleanup
 */
public class LocalChangelogRegistryImpl implements LocalChangelogRegistry {
    
    /**
     * Creates local registry with single-threaded executor
     * @param executor Executor for cleanup operations
     */
    public LocalChangelogRegistryImpl(Executor executor);
}

Changelog Stream Wrapping

Wrapper components for managing changelog stream access and caching.

/**
 * Wrapper for changelog streams providing additional functionality
 */
public class ChangelogStreamWrapper {
    
    /**
     * Wraps a changelog stream with additional features
     * @param inputStream Underlying input stream
     * @param streamStateHandle Handle for the stream
     */
    public ChangelogStreamWrapper(
        InputStream inputStream,
        StreamStateHandle streamStateHandle
    );
}

Recovery Performance and Caching

The recovery system includes caching for improved performance:

// Configure cache timeout for recovery
Configuration config = new Configuration();
config.set(FsStateChangelogOptions.CACHE_IDLE_TIMEOUT, Duration.ofMinutes(10));

// Cached reader automatically manages local cache files
ChangelogStreamHandleReaderWithCache cachedReader = 
    new ChangelogStreamHandleReaderWithCache(config);

// Cache files are automatically cleaned up after idle timeout

Error Handling During Recovery

Recovery operations handle various failure scenarios:

try {
    // Read changelog during recovery
    try (CloseableIterator<StateChange> iterator = reader.read(handle)) {
        while (iterator.hasNext()) {
            StateChange change = iterator.next();
            // Process change...
        }
    }
} catch (IOException e) {
    // Handle reading failures
    log.error("Failed to read changelog during recovery", e);
    throw new RuntimeException("Recovery failed", e);
} catch (RuntimeException e) {
    // Handle processing failures
    log.error("Failed to process state change during recovery", e);
    throw e;
}

Integration with Checkpoint Lifecycle

The recovery system integrates with Flink's checkpoint lifecycle:

  • Checkpoint Creation: Handles are created by writers and tracked by registry
  • Checkpoint Confirmation: Registry stops tracking confirmed handles (JM ownership)
  • Checkpoint Subsumption: Registry releases old handles and discards unused state
  • Recovery: Storage view provides read access to persisted handles
  • Cleanup: Registry ensures proper cleanup of unused handles and local files

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-dstl-dfs

docs

changelog-writers.md

configuration-options.md

index.md

metrics-monitoring.md

recovery-system.md

storage-factory.md

storage-implementation.md

upload-system.md

tile.json