CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-dstl-dfs

Distributed file system-based changelog storage implementation for Apache Flink's streaming state backend.

Pending
Overview
Eval results
Files

upload-system.mddocs/

Upload System

Upload scheduling system with batching, throttling, and retry capabilities for efficient distributed file system operations. The upload system coordinates between multiple changelog writers and provides backpressure control.

Capabilities

StateChangeUploadScheduler Interface

Core interface for scheduling and managing upload operations with backpressure support.

/**
 * Interface for scheduling state change uploads with backpressure control.
 * Implementations handle batching, scheduling, and coordination between multiple writers.
 */
public interface StateChangeUploadScheduler extends AutoCloseable {
    
    /**
     * Schedules an upload task for execution
     * @param task Upload task containing change sets and completion callbacks
     */
    void upload(UploadTask task);
    
    /**
     * Returns availability provider for backpressure control
     * @return AvailabilityProvider indicating when scheduler can accept more tasks
     */
    AvailabilityProvider getAvailabilityProvider();
    
    /**
     * Closes the scheduler and releases all resources
     * @throws Exception if cleanup fails
     */
    void close() throws Exception;
    
    /**
     * Creates a scheduler from configuration
     * @param jobID Job identifier
     * @param config Flink configuration
     * @param metricGroup Metric group for collecting upload metrics
     * @param changelogRegistry Registry for managing state handle lifecycle
     * @param localRecoveryConfig Configuration for local recovery
     * @return Configured StateChangeUploadScheduler instance
     * @throws IOException if scheduler creation fails
     */
    static StateChangeUploadScheduler fromConfig(
        JobID jobID,
        Configuration config,
        ChangelogStorageMetricGroup metricGroup,
        TaskChangelogRegistry changelogRegistry,
        LocalRecoveryConfig localRecoveryConfig
    ) throws IOException;
    
    /**
     * Creates a direct scheduler that uploads immediately
     * @param uploader State change uploader implementation
     * @return Direct scheduler with no batching
     */
    static StateChangeUploadScheduler directScheduler(StateChangeUploader uploader);
}

StateChangeUploader Interface

Interface for the actual upload operations to distributed file systems.

/**
 * Interface for uploading state changes to distributed file systems.
 * Implementations handle the actual persistence operations.
 */
public interface StateChangeUploader extends AutoCloseable {
    
    /**
     * Executes upload tasks and returns results
     * @param tasks Collection of upload tasks to execute
     * @return UploadTasksResult containing successful and failed uploads
     * @throws IOException if upload execution fails
     */
    UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException;
    
    /**
     * Closes the uploader and releases resources
     * @throws Exception if cleanup fails
     */
    void close() throws Exception;
}

Upload Task Structure

Upload tasks encapsulate the work to be performed and completion callbacks.

/**
 * Represents an upload task containing change sets and completion callbacks
 */
public class UploadTask {
    
    /**
     * Creates an upload task
     * @param changeSets Collection of state change sets to upload
     * @param successCallback Callback for successful uploads
     * @param failureCallback Callback for failed uploads
     */
    public UploadTask(
        Collection<StateChangeSet> changeSets,
        Consumer<List<UploadResult>> successCallback,
        BiConsumer<List<SequenceNumber>, Throwable> failureCallback
    );
    
    /**
     * Completes the task with upload results
     * @param results List of upload results
     */
    public void complete(List<UploadResult> results);
    
    /**
     * Fails the task with an exception
     * @param exception Failure cause
     */
    public void fail(Throwable exception);
}

/**
 * Result of executing upload tasks
 */
public class UploadTasksResult {
    
    /**
     * Creates upload result
     * @param successful Map of successfully uploaded tasks to their results
     * @param failed Map of failed tasks to their exceptions
     */
    public UploadTasksResult(
        Map<UploadTask, List<UploadResult>> successful,
        Map<UploadTask, Throwable> failed
    );
    
    public Map<UploadTask, List<UploadResult>> getSuccessful();
    public Map<UploadTask, Throwable> getFailed();
}

Usage Examples:

import org.apache.flink.changelog.fs.*;

// Create scheduler from configuration
StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(
    jobId, config, metricGroup, changelogRegistry, localRecoveryConfig
);

// Create upload task
Collection<StateChangeSet> changeSets = Arrays.asList(changeSet1, changeSet2);
UploadTask task = new UploadTask(
    changeSets,
    results -> {
        // Handle successful upload
        log.info("Uploaded {} change sets", results.size());
        for (UploadResult result : results) {
            log.debug("Uploaded sequence {}, size {}", 
                     result.getSequenceNumber(), result.getSize());
        }
    },
    (failedSequenceNumbers, throwable) -> {
        // Handle upload failure
        log.error("Upload failed for sequences: {}", failedSequenceNumbers, throwable);
    }
);

// Schedule upload
scheduler.upload(task);

// Check backpressure
AvailabilityProvider availability = scheduler.getAvailabilityProvider();
if (!availability.isAvailable()) {
    // Wait for availability
    availability.getAvailabilityFuture().thenRun(() -> {
        // Scheduler is available again
        scheduler.upload(nextTask);
    });
}

BatchingStateChangeUploadScheduler

Implementation that batches multiple upload requests for efficiency.

/**
 * Upload scheduler that batches requests to reduce the number of upload operations.
 * Collects upload tasks for a configurable delay period before executing them together.
 */
public class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler {
    
    /**
     * Creates batching scheduler
     * @param uploader Underlying uploader for executing batched requests
     * @param persistDelay Delay before executing batched uploads
     * @param persistSizeThreshold Size threshold to trigger immediate upload
     * @param inFlightDataLimit Maximum in-flight data for backpressure
     * @param executor Executor for running upload operations
     * @param metricGroup Metrics for tracking upload performance
     */
    public BatchingStateChangeUploadScheduler(
        StateChangeUploader uploader,
        Duration persistDelay,
        long persistSizeThreshold,
        long inFlightDataLimit,
        Executor executor,
        ChangelogStorageMetricGroup metricGroup
    );
}

The batching scheduler:

  • Collects upload tasks for the configured persistDelay period
  • Triggers immediate upload when accumulated size exceeds persistSizeThreshold
  • Provides backpressure when in-flight data exceeds inFlightDataLimit
  • Merges compatible tasks to reduce filesystem operations

StateChangeFsUploader

Filesystem-specific implementation for uploading to distributed file systems.

/**
 * Filesystem-based uploader for state changes.
 * Handles serialization, compression, and persistence to distributed file systems.
 */
public class StateChangeFsUploader extends AbstractStateChangeFsUploader {
    
    /**
     * Creates filesystem uploader
     * @param jobID Job identifier for organizing files
     * @param basePath Base path for changelog files
     * @param fileSystem FileSystem instance for the base path
     * @param compression Whether to enable compression
     * @param bufferSize Buffer size for write operations
     * @param metricGroup Metrics for tracking upload performance
     * @param changelogRegistry Registry for managing uploaded state
     */
    public StateChangeFsUploader(
        JobID jobID,
        Path basePath,
        org.apache.flink.core.fs.FileSystem fileSystem,
        boolean compression,
        int bufferSize,
        ChangelogStorageMetricGroup metricGroup,
        TaskChangelogRegistry changelogRegistry
    );
}

DuplicatingStateChangeFsUploader

Specialized uploader that creates both remote and local copies for recovery.

/**
 * Uploader that creates duplicates for local recovery.
 * Writes to both distributed file system and local storage simultaneously.
 */
public class DuplicatingStateChangeFsUploader extends AbstractStateChangeFsUploader {
    
    /**
     * Creates duplicating uploader
     * @param remoteUploader Primary uploader for distributed file system
     * @param localUploader Secondary uploader for local storage
     */
    public DuplicatingStateChangeFsUploader(
        StateChangeFsUploader remoteUploader,
        StateChangeFsUploader localUploader
    );
}

Retry and Error Handling

The upload system integrates with retry policies and error handling:

/**
 * Executor that applies retry policies to upload operations
 */
public class RetryingExecutor {
    
    /**
     * Executes operation with retry policy
     * @param operation Operation to execute
     * @param retryPolicy Retry policy for handling failures
     * @return Result of successful execution
     * @throws Exception if all retry attempts fail
     */
    public <T> T execute(
        Callable<T> operation,
        RetryPolicy retryPolicy
    ) throws Exception;
}

Error Handling Examples:

// Configure retry policy
RetryPolicy retryPolicy = RetryPolicy.fixed(
    3,  // max attempts
    Duration.ofSeconds(5).toMillis(),  // timeout
    Duration.ofMillis(500).toMillis()  // delay after failure
);

// Upload with retry handling
try {
    UploadTasksResult result = uploader.upload(tasks);
    
    // Process successful uploads
    result.getSuccessful().forEach((task, uploadResults) -> {
        task.complete(uploadResults);
    });
    
    // Handle failed uploads
    result.getFailed().forEach((task, exception) -> {
        task.fail(exception);
    });
    
} catch (IOException e) {
    log.error("Upload operation failed after retries", e);
    // Trigger checkpoint failure and recovery
}

Throttling and Flow Control

Upload throttling prevents overwhelming the distributed file system:

/**
 * Throttle for controlling upload rate and preventing system overload
 */
public class UploadThrottle {
    
    /**
     * Requests permission to upload data
     * @param size Size of data to upload
     * @return CompletableFuture that completes when upload is permitted
     */
    public CompletableFuture<Void> requestUpload(long size);
    
    /**
     * Notifies throttle of completed upload
     * @param size Size of completed upload
     */
    public void uploadCompleted(long size);
}

The throttling system:

  • Limits concurrent in-flight data based on IN_FLIGHT_DATA_LIMIT
  • Provides backpressure to prevent memory exhaustion
  • Coordinates across multiple writers and operators
  • Integrates with Flink's availability provider system

Performance Optimization

The upload system includes several optimizations:

  • Batching: Reduces filesystem operation overhead
  • Compression: Reduces network and storage usage
  • Parallel uploads: Multiple threads for concurrent operations
  • Buffer management: Configurable buffer sizes for different workloads
  • Connection pooling: Reuses filesystem connections when possible

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-dstl-dfs

docs

changelog-writers.md

configuration-options.md

index.md

metrics-monitoring.md

recovery-system.md

storage-factory.md

storage-implementation.md

upload-system.md

tile.json