CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-dstl

Apache Flink DSTL (Distributed State Timeline) - A filesystem-based state changelog implementation for Flink's state management

Pending
Overview
Eval results
Files

registry.mddocs/

Registry and Tracking

TaskManager-side registry for tracking changelog segments and managing their lifecycle. The registry coordinates reference counting, cleanup operations, and resource management for persisted changelog data.

Capabilities

TaskChangelogRegistry Interface

Core interface for tracking and managing changelog segments with reference counting and cleanup coordination.

/**
 * TaskManager-side registry for tracking changelog segments
 */
@Internal
public interface TaskChangelogRegistry {
    
    /** No-operation registry implementation for testing or disabled scenarios */
    TaskChangelogRegistry NO_OP = new TaskChangelogRegistry() { /* no-op implementation */ };
    
    /**
     * Starts tracking a changelog segment with initial reference count
     * @param handle StreamStateHandle representing the changelog segment
     * @param refCount Initial reference count for the segment
     */
    void startTracking(StreamStateHandle handle, long refCount);
    
    /**
     * Stops tracking a changelog segment (decrements reference count)
     * @param handle StreamStateHandle to stop tracking
     */
    void stopTracking(StreamStateHandle handle);
    
    /**
     * Releases a changelog segment for cleanup when no longer needed
     * @param handle StreamStateHandle to release
     */
    void release(StreamStateHandle handle);
    
    /**
     * Creates default registry with specified number of async discard threads
     * @param numAsyncDiscardThreads Number of threads for asynchronous cleanup
     * @return Configured TaskChangelogRegistry instance
     */
    static TaskChangelogRegistry defaultChangelogRegistry(int numAsyncDiscardThreads);
    
    /**
     * Creates default registry with custom executor for testing
     * @param executor Custom executor for discard operations
     * @return TaskChangelogRegistry instance using the provided executor
     */
    @VisibleForTesting
    static TaskChangelogRegistry defaultChangelogRegistry(Executor executor);
}

Basic Registry Usage Example:

import org.apache.flink.changelog.fs.TaskChangelogRegistry;
import org.apache.flink.runtime.state.StreamStateHandle;

// Create registry with default settings
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);

// Track a new changelog segment
StreamStateHandle handle = createChangelogHandle();
registry.startTracking(handle, 1); // Initial reference count of 1

// Multiple operators might reference the same segment
registry.startTracking(handle, 2); // Increment to 2 references

// When operators no longer need the segment
registry.stopTracking(handle); // Decrements to 1
registry.stopTracking(handle); // Decrements to 0

// Release when completely done
registry.release(handle); // Triggers cleanup

Default Registry Creation

Factory methods for creating registry instances with different configuration options.

/**
 * Creates default registry with specified async discard threads
 * @param numAsyncDiscardThreads Number of background threads for cleanup
 * @return TaskChangelogRegistry with thread pool for async operations
 */
static TaskChangelogRegistry defaultChangelogRegistry(int numAsyncDiscardThreads);

/**
 * Creates registry with custom executor (primarily for testing)
 * @param executor Custom executor for discard operations
 * @return TaskChangelogRegistry using provided executor
 */
@VisibleForTesting
static TaskChangelogRegistry defaultChangelogRegistry(Executor executor);

Registry Creation Examples:

// Production usage: create with configurable thread count
int discardThreads = config.get(FsStateChangelogOptions.NUM_DISCARD_THREADS);
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(discardThreads);

// Testing usage: create with custom executor
Executor testExecutor = Executors.newSingleThreadExecutor();
TaskChangelogRegistry testRegistry = TaskChangelogRegistry.defaultChangelogRegistry(testExecutor);

// Disabled registry for scenarios where tracking is not needed
TaskChangelogRegistry disabledRegistry = TaskChangelogRegistry.NO_OP;

Reference Counting and Lifecycle

The registry implements reference counting to ensure changelog segments are only cleaned up when no longer referenced.

/**
 * Starts tracking with initial reference count
 * @param handle Changelog segment handle
 * @param refCount Initial number of references
 */
void startTracking(StreamStateHandle handle, long refCount);

/**
 * Decrements reference count by stopping tracking
 * @param handle Changelog segment handle
 */
void stopTracking(StreamStateHandle handle);

/**
 * Releases segment for cleanup when appropriate
 * @param handle Changelog segment handle
 */
void release(StreamStateHandle handle);

Reference Counting Examples:

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);
StreamStateHandle handle = uploadResult.streamStateHandle;

// Scenario 1: Single operator using a segment
registry.startTracking(handle, 1);
// ... operator uses segment ...
registry.stopTracking(handle); // Ref count goes to 0
registry.release(handle); // Cleanup initiated

// Scenario 2: Multiple operators sharing a segment
registry.startTracking(handle, 3); // 3 operators will use this segment

// First operator finishes
registry.stopTracking(handle); // Ref count: 3 -> 2

// Second operator finishes  
registry.stopTracking(handle); // Ref count: 2 -> 1

// Third operator finishes
registry.stopTracking(handle); // Ref count: 1 -> 0

// Now safe to release
registry.release(handle); // Cleanup initiated when ref count is 0

TaskChangelogRegistryImpl Implementation

Internal implementation providing thread-safe reference counting and asynchronous cleanup.

/**
 * Default implementation of TaskChangelogRegistry with thread-safe operations
 */
@Internal
@ThreadSafe
class TaskChangelogRegistryImpl implements TaskChangelogRegistry {
    
    /**
     * Creates registry with executor for async discard operations
     * @param discardExecutor Executor for running cleanup tasks
     */
    public TaskChangelogRegistryImpl(Executor discardExecutor);
}

Implementation Usage Example:

// Create with custom thread pool
ExecutorService discardExecutor = Executors.newFixedThreadPool(3);
TaskChangelogRegistry registry = new TaskChangelogRegistryImpl(discardExecutor);

// Registry will use the executor for async cleanup operations
StreamStateHandle handle = createHandle();
registry.startTracking(handle, 1);
registry.stopTracking(handle);
registry.release(handle); // Cleanup runs asynchronously on discardExecutor

// Clean up executor when done
discardExecutor.shutdown();

Integration with Storage Components

The registry integrates with storage and upload components to coordinate segment lifecycle.

Storage Integration Example:

// Create storage with registry
TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);

FsStateChangelogStorage storage = new FsStateChangelogStorage(
    jobID, 
    config, 
    metricGroup, 
    registry, // Pass registry to storage
    localRecoveryConfig
);

// Writer operations automatically coordinate with registry
FsStateChangelogWriter writer = storage.createWriter(operatorId, keyGroupRange, mailboxExecutor);

// When persist completes, registry tracks the result
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future = 
    writer.persist(sequenceNumber, checkpointId);

future.thenAccept(result -> {
    StreamStateHandle handle = result.getJobManagerOwnedSnapshot().getStreamStateHandle();
    // Registry automatically starts tracking this handle
    System.out.println("Registry now tracking: " + handle);
});

Upload Coordination

The registry coordinates with upload schedulers to manage segment references during upload operations.

Upload Coordination Example:

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);

StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(
    jobID, config, metricGroup, registry, localRecoveryConfig
);

// Upload task completion automatically updates registry
UploadTask task = new UploadTask();
task.changeset = changeSet;

task.onCompleted = uploadResult -> {
    // Upload scheduler coordinates with registry
    StreamStateHandle handle = uploadResult.streamStateHandle;
    
    // Registry tracks the uploaded segment
    registry.startTracking(handle, 1);
    
    System.out.println("Upload completed and registered: " + handle);
};

scheduler.upload(task);

No-Op Registry

Special implementation that performs no tracking, useful for testing or disabled scenarios.

/**
 * No-operation registry that performs no tracking
 */
TaskChangelogRegistry NO_OP = new NoOpTaskChangelogRegistry();

No-Op Usage Example:

// Use NO_OP registry when tracking is not needed
TaskChangelogRegistry noOpRegistry = TaskChangelogRegistry.NO_OP;

// All operations are no-ops (safe but perform no actual tracking)
noOpRegistry.startTracking(handle, 1); // Does nothing
noOpRegistry.stopTracking(handle);     // Does nothing
noOpRegistry.release(handle);          // Does nothing

// Useful for testing or minimal configurations
FsStateChangelogStorage storage = new FsStateChangelogStorage(
    jobID, config, metricGroup, 
    TaskChangelogRegistry.NO_OP, // Disable tracking
    localRecoveryConfig
);

Thread Safety and Concurrency

The registry implementations are thread-safe and coordinate properly with concurrent operations.

Concurrent Usage Example:

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(4);
StreamStateHandle sharedHandle = createSharedHandle();

// Multiple threads can safely interact with the registry
ExecutorService executor = Executors.newFixedThreadPool(10);

// Thread 1: Start tracking
executor.submit(() -> {
    registry.startTracking(sharedHandle, 5);
    System.out.println("Started tracking with 5 references");
});

// Threads 2-6: Stop tracking (decrementing references)
for (int i = 0; i < 5; i++) {
    executor.submit(() -> {
        registry.stopTracking(sharedHandle);
        System.out.println("Stopped tracking (decremented reference)");
    });
}

// Thread 7: Release when ready
executor.submit(() -> {
    // Wait a bit to ensure all stop tracking calls complete
    try { Thread.sleep(100); } catch (InterruptedException e) {}
    registry.release(sharedHandle);
    System.out.println("Released handle for cleanup");
});

executor.shutdown();

Error Handling and Resilience

The registry provides appropriate error handling for various failure scenarios.

Error Handling Examples:

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);

try {
    // Normal operation
    registry.startTracking(handle, 1);
    registry.stopTracking(handle);
    registry.release(handle);
} catch (Exception e) {
    System.err.println("Registry operation failed: " + e.getMessage());
    // Registry errors are typically non-fatal but should be logged
}

// Handle invalid operations gracefully
registry.stopTracking(nonExistentHandle); // Safe no-op in most implementations
registry.release(alreadyReleasedHandle);  // Safe no-op in most implementations

// Defensive programming
if (handle != null) {
    registry.startTracking(handle, 1);
}

Cleanup and Shutdown

Proper cleanup of registry resources when shutting down.

Cleanup Example:

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(3);

// Use registry during application lifecycle
// ...

// During shutdown, ensure proper cleanup
if (registry instanceof TaskChangelogRegistryImpl) {
    TaskChangelogRegistryImpl impl = (TaskChangelogRegistryImpl) registry;
    // Implementation handles cleanup of internal executor and pending operations
    impl.close(); // If close method is available
}

// Or let it clean up naturally during application shutdown
// The internal executor will be shutdown when the JVM exits

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-dstl

docs

index.md

registry.md

storage-factory.md

storage-implementation.md

upload-scheduling.md

writers.md

tile.json