CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-dstl

Apache Flink DSTL (Distributed State Timeline) - A filesystem-based state changelog implementation for Flink's state management

Pending
Overview
Eval results
Files

storage-implementation.mddocs/

Main Storage Implementation

Filesystem-based implementation of StateChangelogStorage with thread-safe operations, writer creation, and availability tracking. This is the core storage component that coordinates state change persistence.

Capabilities

FsStateChangelogStorage

Main filesystem-based implementation that extends the recovery storage and adds writer creation capabilities.

/**
 * Filesystem-based implementation of StateChangelogStorage
 */
@Experimental
@ThreadSafe
public class FsStateChangelogStorage 
    extends FsStateChangelogStorageForRecovery 
    implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
    
    /**
     * Creates storage with default changelog registry
     * @param jobID The job identifier
     * @param config Configuration settings
     * @param metricGroup Metric group for monitoring
     * @param localRecoveryConfig Local recovery configuration
     * @throws IOException If storage initialization fails
     */
    public FsStateChangelogStorage(
        JobID jobID,
        Configuration config,
        TaskManagerJobMetricGroup metricGroup,
        LocalRecoveryConfig localRecoveryConfig
    ) throws IOException;
    
    /**
     * Creates storage with custom changelog registry
     * @param jobID The job identifier
     * @param config Configuration settings
     * @param metricGroup Metric group for monitoring
     * @param changelogRegistry Custom changelog registry
     * @param localRecoveryConfig Local recovery configuration
     * @throws IOException If storage initialization fails
     */
    public FsStateChangelogStorage(
        JobID jobID,
        Configuration config,
        TaskManagerJobMetricGroup metricGroup,
        TaskChangelogRegistry changelogRegistry,
        LocalRecoveryConfig localRecoveryConfig
    ) throws IOException;
    
    /**
     * Creates a writer for state changes for a specific operator
     * @param operatorID Identifier of the operator
     * @param keyGroupRange Range of key groups handled by this writer
     * @param mailboxExecutor Executor for asynchronous operations
     * @return FsStateChangelogWriter instance
     */
    public FsStateChangelogWriter createWriter(
        String operatorID,
        KeyGroupRange keyGroupRange,
        MailboxExecutor mailboxExecutor
    );
    
    /**
     * Closes the storage and releases all resources
     * @throws Exception If closing fails
     */
    public void close() throws Exception;
    
    /**
     * Returns availability provider for backpressure handling
     * @return AvailabilityProvider instance
     */
    public AvailabilityProvider getAvailabilityProvider();
}

Usage Examples:

import org.apache.flink.changelog.fs.FsStateChangelogStorage;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;

// Basic storage creation
Configuration config = new Configuration();
config.set(FsStateChangelogOptions.BASE_PATH, "/path/to/changelog");

FsStateChangelogStorage storage = new FsStateChangelogStorage(
    new JobID(),
    config,
    metricGroup,
    localRecoveryConfig
);

// Create writer for an operator
FsStateChangelogWriter writer = storage.createWriter(
    "my-operator-id",
    KeyGroupRange.of(0, 127),
    mailboxExecutor
);

// Use writer to persist state changes
writer.append(5, stateChangeData);
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> result = 
    writer.persist(sequenceNumber, checkpointId);

// Clean up
writer.close();
storage.close();

Testing Constructor

Constructor provided for testing scenarios with direct uploader configuration.

/**
 * Testing constructor with direct uploader configuration
 * @param jobID The job identifier
 * @param basePath Base path for changelog storage
 * @param compression Whether to enable compression
 * @param bufferSize Buffer size for operations
 * @param metricGroup Metric group for monitoring
 * @param changelogRegistry Changelog registry for tracking
 * @param localRecoveryConfig Local recovery configuration
 * @throws IOException If storage initialization fails
 */
@VisibleForTesting
public FsStateChangelogStorage(
    JobID jobID,
    Path basePath,
    boolean compression,
    int bufferSize,
    ChangelogStorageMetricGroup metricGroup,
    TaskChangelogRegistry changelogRegistry,
    LocalRecoveryConfig localRecoveryConfig
) throws IOException;

Low-Level Constructor

Low-level constructor for advanced testing scenarios with custom upload scheduler.

/**
 * Low-level constructor with custom upload scheduler
 * @param uploader Custom state change upload scheduler
 * @param preEmptivePersistThresholdInBytes Threshold for preemptive persistence
 * @param changelogRegistry Changelog registry for tracking
 * @param localRecoveryConfig Local recovery configuration
 */
@VisibleForTesting
public FsStateChangelogStorage(
    StateChangeUploadScheduler uploader,
    long preEmptivePersistThresholdInBytes,
    TaskChangelogRegistry changelogRegistry,
    LocalRecoveryConfig localRecoveryConfig
);

Testing Usage Example:

// Testing with custom parameters
ChangelogStorageMetricGroup testMetricGroup = new ChangelogStorageMetricGroup(metricGroup);
TaskChangelogRegistry testRegistry = TaskChangelogRegistry.defaultChangelogRegistry(1);

FsStateChangelogStorage testStorage = new FsStateChangelogStorage(
    new JobID(),
    new Path("file:///tmp/test-changelog"),
    true, // compression enabled
    1024 * 1024, // 1MB buffer
    testMetricGroup,
    testRegistry,
    localRecoveryConfig
);

Availability and Backpressure Handling

The storage provides availability information for backpressure handling in streaming applications.

/**
 * Returns availability provider for coordinating backpressure
 * @return AvailabilityProvider that signals when storage is available for writes
 */
public AvailabilityProvider getAvailabilityProvider();

Availability Usage Example:

FsStateChangelogStorage storage = new FsStateChangelogStorage(/* ... */);

// Check if storage is available for writes
AvailabilityProvider availability = storage.getAvailabilityProvider();

// Wait for availability if needed
CompletableFuture<?> availabilityFuture = availability.getAvailabilityFuture();
availabilityFuture.thenRun(() -> {
    // Storage is now available for writes
    FsStateChangelogWriter writer = storage.createWriter(operatorID, keyGroupRange, mailboxExecutor);
    // Proceed with writing operations
});

Local Recovery Integration

The storage integrates with Flink's local recovery mechanism for improved fault tolerance.

Local Recovery Configuration Example:

// Configure local recovery
LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(
    true, // enable local recovery
    new LocalRecoveryDirectoryProvider() {
        @Override
        public File allocationBaseDirForCheckpoint(long checkpointId) {
            return new File("/local/recovery/checkpoint-" + checkpointId);
        }
        
        @Override
        public File subtaskSpecificCheckpointDirectory(long checkpointId, AllocationID allocationID, JobID jobID, int subtaskIndex) {
            return new File("/local/recovery/checkpoint-" + checkpointId + "/task-" + subtaskIndex);
        }
    }
);

// Create storage with local recovery enabled
FsStateChangelogStorage storage = new FsStateChangelogStorage(
    jobID, config, metricGroup, localRecoveryConfig
);

Error Handling

The storage handles various error conditions and provides appropriate exception handling.

Common Error Scenarios:

try {
    FsStateChangelogStorage storage = new FsStateChangelogStorage(
        jobID, config, metricGroup, localRecoveryConfig
    );
} catch (IOException e) {
    // Handle storage initialization failure
    // Common causes: invalid base path, permissions, filesystem issues
    log.error("Failed to initialize changelog storage", e);
}

try {
    storage.close();
} catch (Exception e) {
    // Handle cleanup failure
    // May include upload scheduler shutdown, resource cleanup
    log.warn("Error during storage cleanup", e);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-dstl

docs

index.md

registry.md

storage-factory.md

storage-implementation.md

upload-scheduling.md

writers.md

tile.json