CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-dstl

Apache Flink DSTL (Distributed State Timeline) - A filesystem-based state changelog implementation for Flink's state management

Pending
Overview
Eval results
Files

upload-scheduling.mddocs/

Upload Scheduling and Management

Upload scheduler interfaces and implementations for batching and coordinating state change uploads. The upload system provides pluggable strategies for persistence with retry mechanisms and availability tracking.

Capabilities

StateChangeUploadScheduler Interface

Core interface for scheduling upload tasks with support for batching and backpressure handling.

/**
 * Interface for scheduling upload tasks for state changes
 */
@Internal
public interface StateChangeUploadScheduler extends AutoCloseable {
    
    /**
     * Schedules an upload task for execution
     * @param uploadTask Task containing state changes to upload
     * @throws IOException If the upload cannot be scheduled
     */
    void upload(UploadTask uploadTask) throws IOException;
    
    /**
     * Creates a direct scheduler that executes uploads immediately
     * @param uploader The uploader to use for executing tasks
     * @return StateChangeUploadScheduler that uploads directly
     */
    static StateChangeUploadScheduler directScheduler(StateChangeUploader uploader);
    
    /**
     * Creates a scheduler from configuration with batching and threading
     * @param jobID Job identifier
     * @param config Configuration containing scheduler settings
     * @param metricGroup Metrics for monitoring upload behavior
     * @param changelogRegistry Registry for tracking changelog segments
     * @param localRecoveryConfig Local recovery configuration
     * @return Configured StateChangeUploadScheduler instance
     * @throws IOException If scheduler creation fails
     */
    static StateChangeUploadScheduler fromConfig(
        JobID jobID,
        ReadableConfig config,
        ChangelogStorageMetricGroup metricGroup,
        TaskChangelogRegistry changelogRegistry,
        LocalRecoveryConfig localRecoveryConfig
    ) throws IOException;
    
    /**
     * Returns availability provider for backpressure coordination
     * @return AvailabilityProvider indicating when scheduler can accept more uploads
     */
    default AvailabilityProvider getAvailabilityProvider() {
        return AvailabilityProvider.AVAILABLE;
    }
}

Basic Scheduler Usage Example:

import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
import org.apache.flink.changelog.fs.StateChangeFsUploader;

// Create direct scheduler for immediate uploads
StateChangeFsUploader uploader = new StateChangeFsUploader(/* ... */);
StateChangeUploadScheduler directScheduler = 
    StateChangeUploadScheduler.directScheduler(uploader);

// Create upload task
StateChangeSet changeSet = new StateChangeSet(logId, sequenceNumber, changes);
UploadTask task = new UploadTask();
task.changeset = changeSet;
task.onCompleted = result -> System.out.println("Upload completed: " + result);
task.onFailed = throwable -> System.err.println("Upload failed: " + throwable);

// Schedule upload
directScheduler.upload(task);

// Clean up
directScheduler.close();

UploadTask Definition

Task structure containing collections of state changes and completion callbacks for upload operations.

/**
 * Upload Task for StateChangeUploadScheduler
 */
@ThreadSafe
final class UploadTask {
    /** Collection of state change sets to upload */
    final Collection<StateChangeSet> changeSets;
    
    /** Callback invoked when upload completes successfully */
    final Consumer<List<UploadResult>> successCallback;
    
    /** Callback invoked when upload fails */
    final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;
    
    /**
     * Creates upload task with change sets and callbacks
     * @param changeSets Collection of state change sets to upload
     * @param successCallback Callback for successful upload with results
     * @param failureCallback Callback for failed upload with sequence numbers and error
     */
    public UploadTask(
        Collection<StateChangeSet> changeSets,
        Consumer<List<UploadResult>> successCallback,
        BiConsumer<List<SequenceNumber>, Throwable> failureCallback
    );
    
    /**
     * Completes the task with successful results
     * @param results List of upload results
     */
    public void complete(List<UploadResult> results);
    
    /**
     * Fails the task with an error
     * @param error Throwable representing the failure
     */
    public void fail(Throwable error);
    
    /**
     * Gets total size of all change sets in this task
     * @return Total size in bytes
     */
    public long getSize();
    
    /**
     * Gets the collection of change sets
     * @return Collection of StateChangeSet objects
     */
    public Collection<StateChangeSet> getChangeSets();
}

UploadTask Usage Examples:

// Create upload task with multiple change sets
Collection<StateChangeSet> changeSets = Arrays.asList(
    new StateChangeSet(logId1, sequenceNumber1, stateChanges1),
    new StateChangeSet(logId2, sequenceNumber2, stateChanges2)
);

// Success callback - receives list of results
Consumer<List<UploadResult>> successCallback = uploadResults -> {
    for (UploadResult result : uploadResults) {
        System.out.println("Uploaded to: " + result.streamStateHandle);
        System.out.println("Offset: " + result.offset);
        System.out.println("Size: " + result.size);
    }
    // Update tracking or notify other components
    updateCheckpointTracking(uploadResults);
};

// Failure callback - receives sequence numbers and error
BiConsumer<List<SequenceNumber>, Throwable> failureCallback = (sequenceNumbers, throwable) -> {
    System.err.println("Upload failed for sequences: " + sequenceNumbers);
    System.err.println("Error: " + throwable.getMessage());
    
    // Handle failure: retry, fail checkpoint, etc.
    handleUploadFailure(sequenceNumbers, throwable);
};

// Create and schedule the task
UploadTask task = new UploadTask(changeSets, successCallback, failureCallback);
scheduler.upload(task);

Configuration-Based Scheduler Creation

Factory method for creating schedulers with batching, threading, and retry configuration.

/**
 * Creates scheduler from configuration with batching and advanced features
 * @param jobID Job identifier for naming and metrics
 * @param config Configuration containing scheduler settings
 * @param metricGroup Metrics group for monitoring
 * @param changelogRegistry Registry for tracking uploaded segments
 * @param localRecoveryConfig Local recovery configuration
 * @return Configured scheduler with batching and threading
 * @throws IOException If scheduler creation fails
 */
static StateChangeUploadScheduler fromConfig(
    JobID jobID,
    ReadableConfig config,
    ChangelogStorageMetricGroup metricGroup,
    TaskChangelogRegistry changelogRegistry,
    LocalRecoveryConfig localRecoveryConfig
) throws IOException;

Configuration-Based Scheduler Example:

import org.apache.flink.changelog.fs.FsStateChangelogOptions;

// Configure upload behavior
Configuration config = new Configuration();
config.set(FsStateChangelogOptions.NUM_UPLOAD_THREADS, 10);
config.set(FsStateChangelogOptions.UPLOAD_BUFFER_SIZE, MemorySize.parse("2MB"));
config.set(FsStateChangelogOptions.IN_FLIGHT_DATA_LIMIT, MemorySize.parse("200MB"));
config.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 5);
config.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofSeconds(30));

// Create scheduler with configuration
StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(
    new JobID(),
    config,
    metricGroup,
    changelogRegistry,
    localRecoveryConfig
);

// Use scheduler with batching and threading
for (StateChangeSet changeSet : changeSets) {
    UploadTask task = createUploadTask(changeSet);
    scheduler.upload(task);
}

StateChangeUploader Interface

Core uploader interface that handles the actual upload execution for collections of tasks.

/**
 * Interface for uploading state change tasks
 */
@Internal
public interface StateChangeUploader extends AutoCloseable {
    
    /**
     * Uploads a collection of tasks and returns results
     * @param tasks Collection of upload tasks to execute
     * @return UploadTasksResult containing individual results
     * @throws IOException If upload operation fails
     */
    UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException;
}

UploadTasksResult Structure

Result structure containing outcomes of batch upload operations with task-to-offset mappings.

/**
 * Result of executing one or more upload tasks
 */
final class UploadTasksResult {
    /** Mapping of tasks to their state change set offsets in the uploaded stream */
    private final Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets;
    
    /** Handle to the uploaded remote stream */
    private final StreamStateHandle handle;
    
    /** Handle to the local backup stream (if local recovery enabled) */
    private final StreamStateHandle localHandle;
    
    /**
     * Creates result with task offsets and remote handle
     * @param tasksOffsets Mapping of tasks to their offsets in the stream
     * @param handle Remote stream handle
     */
    public UploadTasksResult(
        Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets,
        StreamStateHandle handle
    );
    
    /**
     * Creates result with task offsets, remote and local handles
     * @param tasksOffsets Mapping of tasks to their offsets in the stream
     * @param handle Remote stream handle
     * @param localHandle Local stream handle (nullable)
     */
    public UploadTasksResult(
        Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets,
        StreamStateHandle handle,
        @Nullable StreamStateHandle localHandle
    );
    
    /**
     * Completes all tasks in this result by calling their completion callbacks
     */
    public void complete();
    
    /**
     * Gets the total state size of the uploaded stream
     * @return Size in bytes
     */
    public long getStateSize();
    
    /**
     * Discards the uploaded state handle
     * @throws Exception If discard fails
     */
    public void discard() throws Exception;
}

Uploader Implementation Example:

// Custom uploader implementation
public class MyStateChangeUploader implements StateChangeUploader {
    
    @Override
    public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
        List<UploadResult> successful = new ArrayList<>();
        Map<UploadTask, Throwable> failed = new HashMap<>();
        
        for (UploadTask task : tasks) {
            try {
                // Perform upload operation
                UploadResult result = performUpload(task.changeset);
                successful.add(result);
                task.onCompleted.accept(result);
            } catch (Exception e) {
                failed.put(task, e);
                task.onFailed.accept(e);
            }
        }
        
        return new UploadTasksResult(successful, failed);
    }
    
    private UploadResult performUpload(StateChangeSet changeSet) throws IOException {
        // Implementation-specific upload logic
        StreamStateHandle handle = writeToFileSystem(changeSet);
        return UploadResult.of(handle, null, changeSet, 0, 0);
    }
    
    @Override
    public void close() throws Exception {
        // Clean up resources
    }
}

Batching Upload Scheduler

Internal implementation that batches upload tasks for efficiency.

/**
 * Upload scheduler that batches tasks for efficient uploading
 */
@ThreadSafe
class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler {
    
    /**
     * Creates batching scheduler with configuration
     * @param uploader Underlying uploader for executing batches
     * @param maxBatchSize Maximum number of tasks per batch
     * @param batchTimeout Timeout for incomplete batches
     * @param executor Executor for upload operations
     */
    public BatchingStateChangeUploadScheduler(
        StateChangeUploader uploader,
        int maxBatchSize,
        Duration batchTimeout,
        Executor executor
    );
}

Availability and Backpressure

Upload schedulers support backpressure through availability providers to coordinate with upstream components.

/**
 * Returns availability provider for backpressure coordination
 * @return AvailabilityProvider indicating scheduler capacity
 */
default AvailabilityProvider getAvailabilityProvider();

Backpressure Handling Example:

StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(/* ... */);

// Check availability before scheduling uploads
AvailabilityProvider availability = scheduler.getAvailabilityProvider();

if (availability.isAvailable()) {
    // Scheduler can accept more uploads
    scheduler.upload(uploadTask);
} else {
    // Wait for availability
    availability.getAvailabilityFuture().thenRun(() -> {
        try {
            scheduler.upload(uploadTask);
        } catch (IOException e) {
            System.err.println("Upload failed: " + e.getMessage());
        }
    });
}

Retry Policy Integration

Upload schedulers integrate with retry policies for handling transient failures.

Retry Configuration Example:

Configuration config = new Configuration();
config.set(FsStateChangelogOptions.RETRY_POLICY, "fixed");
config.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 5);
config.set(FsStateChangelogOptions.RETRY_DELAY_AFTER_FAILURE, Duration.ofSeconds(2));
config.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofSeconds(30));

StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(
    jobID, config, metricGroup, changelogRegistry, localRecoveryConfig
);

// Scheduler will automatically retry failed uploads according to policy

Error Handling and Monitoring

Upload schedulers provide comprehensive error handling and metrics integration.

Error Handling Example:

StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(/* ... */);

UploadTask task = new UploadTask();
task.changeset = changeSet;

// Handle different failure scenarios
task.onFailed = throwable -> {
    if (throwable instanceof IOException) {
        System.err.println("I/O error during upload: " + throwable.getMessage());
        // May retry or fail checkpoint
    } else if (throwable instanceof TimeoutException) {
        System.err.println("Upload timed out: " + throwable.getMessage());
        // May increase timeout or fail
    } else {
        System.err.println("Unexpected upload failure: " + throwable.getMessage());
        // Log and fail checkpoint
    }
};

try {
    scheduler.upload(task);
} catch (IOException e) {
    System.err.println("Failed to schedule upload: " + e.getMessage());
    // Scheduler may be overloaded or closed
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-dstl

docs

index.md

registry.md

storage-factory.md

storage-implementation.md

upload-scheduling.md

writers.md

tile.json