CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-dstl-dfs

Distributed file system-based changelog storage implementation for Apache Flink's streaming state backend.

Pending
Overview
Eval results
Files

storage-implementation.mddocs/

Main Storage Implementation

Core filesystem-based implementation providing changelog writers and managing upload operations for high-throughput streaming applications. The storage implementation coordinates between writers, upload schedulers, and recovery components.

Capabilities

FsStateChangelogStorage

Main storage implementation that manages changelog writers and upload operations for active streaming jobs.

/**
 * Filesystem-based implementation of StateChangelogStorage for write operations.
 * Thread-safe and manages multiple changelog writers for different operators.
 */
public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery
        implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
    
    /**
     * Creates a new changelog writer for a specific operator and key group range
     * @param operatorID Unique identifier for the operator
     * @param keyGroupRange Key group range this writer handles
     * @param mailboxExecutor Executor for callback processing
     * @return FsStateChangelogWriter instance for the operator
     */
    public FsStateChangelogWriter createWriter(
        String operatorID, 
        KeyGroupRange keyGroupRange, 
        MailboxExecutor mailboxExecutor
    );
    
    /**
     * Closes the storage and all associated resources
     * @throws Exception if cleanup fails
     */
    public void close() throws Exception;
    
    /**
     * Returns availability provider for backpressure control
     * @return AvailabilityProvider indicating when storage can accept more data
     */
    public AvailabilityProvider getAvailabilityProvider();
}

Constructors

Multiple constructor variants support different initialization scenarios:

/**
 * Main constructor for production use
 */
public FsStateChangelogStorage(
    JobID jobID,
    Configuration config,
    TaskManagerJobMetricGroup metricGroup,
    LocalRecoveryConfig localRecoveryConfig
) throws IOException;

/**
 * Constructor with custom changelog registry
 */
public FsStateChangelogStorage(
    JobID jobID,
    Configuration config,
    TaskManagerJobMetricGroup metricGroup,
    TaskChangelogRegistry changelogRegistry,
    LocalRecoveryConfig localRecoveryConfig
) throws IOException;

/**
 * Testing constructor with direct parameters
 */
public FsStateChangelogStorage(
    JobID jobID,
    Path basePath,
    boolean compression,
    int bufferSize,
    ChangelogStorageMetricGroup metricGroup,
    TaskChangelogRegistry changelogRegistry,
    LocalRecoveryConfig localRecoveryConfig
) throws IOException;

/**
 * Advanced constructor with custom upload scheduler
 */
public FsStateChangelogStorage(
    StateChangeUploadScheduler uploader,
    long preEmptivePersistThresholdInBytes,
    TaskChangelogRegistry changelogRegistry,
    LocalRecoveryConfig localRecoveryConfig
);

Usage Examples:

import org.apache.flink.changelog.fs.FsStateChangelogStorage;
import org.apache.flink.runtime.state.KeyGroupRange;

// Create storage instance (typically done by factory)
FsStateChangelogStorage storage = new FsStateChangelogStorage(
    jobId, config, metricGroup, localRecoveryConfig
);

// Create writers for different operators
FsStateChangelogWriter operatorWriter1 = storage.createWriter(
    "map-operator", 
    KeyGroupRange.of(0, 63),
    mailboxExecutor
);

FsStateChangelogWriter operatorWriter2 = storage.createWriter(
    "filter-operator",
    KeyGroupRange.of(64, 127), 
    mailboxExecutor
);

// Check if storage can accept more data
AvailabilityProvider availability = storage.getAvailabilityProvider();
if (availability.isAvailable()) {
    // Safe to write more data
    operatorWriter1.append(keyGroup, stateChangeBytes);
}

// Cleanup when done
storage.close();

Integration with Upload System

The storage implementation integrates with the upload scheduling system:

/**
 * Internal components managed by FsStateChangelogStorage
 */
class InternalComponents {
    private final StateChangeUploadScheduler uploader;
    private final long preEmptivePersistThresholdInBytes;
    private final TaskChangelogRegistry changelogRegistry;
    private final AtomicInteger logIdGenerator;
    private final LocalChangelogRegistry localChangelogRegistry;
}

The storage automatically:

  • Creates upload schedulers based on configuration
  • Manages unique log IDs for different writers
  • Coordinates with the changelog registry for state lifecycle management
  • Handles local recovery integration when enabled

Local Recovery Support

When local recovery is enabled, the storage manages local changelog registries:

// Local recovery configuration
LocalRecoveryConfig localRecoveryConfig = LocalRecoveryConfig.enabled(localStateDirectory);

FsStateChangelogStorage storage = new FsStateChangelogStorage(
    jobId, config, metricGroup, localRecoveryConfig
);

// Storage automatically creates LocalChangelogRegistryImpl when enabled
// Handles both remote and local persistence of changelog data

Backpressure and Flow Control

The storage provides backpressure mechanisms through availability providers:

import org.apache.flink.runtime.io.AvailabilityProvider;

// Monitor storage availability
AvailabilityProvider availability = storage.getAvailabilityProvider();

// Use in async context
availability.getAvailabilityFuture().thenRun(() -> {
    // Storage is available, safe to continue writing
    writer.append(keyGroup, data);
});

Error Handling and Lifecycle

The storage handles various error conditions and lifecycle events:

try {
    FsStateChangelogStorage storage = new FsStateChangelogStorage(
        jobId, config, metricGroup, localRecoveryConfig
    );
    
    // Use storage...
    
} catch (IOException e) {
    // Handle initialization or operation errors
    log.error("Storage operation failed", e);
} finally {
    // Always close to cleanup resources
    if (storage != null) {
        storage.close();
    }
}

The storage ensures:

  • Proper cleanup of upload threads and resources
  • Graceful handling of filesystem failures
  • Coordination with Flink's checkpoint lifecycle
  • Thread-safe operations across multiple writers

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-dstl-dfs

docs

changelog-writers.md

configuration-options.md

index.md

metrics-monitoring.md

recovery-system.md

storage-factory.md

storage-implementation.md

upload-system.md

tile.json