CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime

The runtime module of Apache Flink that provides the core execution engine, scheduling, fault tolerance, checkpointing, and resource management capabilities for distributed stream processing applications

Pending
Overview
Eval results
Files

state-checkpointing.mddocs/

State Management and Checkpointing

Comprehensive state management with pluggable backends and distributed checkpointing for fault tolerance. Flink provides exactly-once processing guarantees through its advanced checkpointing mechanism and flexible state backends.

Capabilities

CheckpointCoordinator

Central coordinator for checkpointing in Flink jobs, managing checkpoint triggering, completion, and recovery across all tasks.

/**
 * The checkpoint coordinator coordinates the distributed snapshots of operators and state.
 * It triggers the checkpoint by sending messages to the relevant tasks and collects the
 * checkpoint acknowledgements. It also maintains and cleans up the checkpoint meta data.
 */
public class CheckpointCoordinator {
    /** Trigger a periodic checkpoint for the job */
    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic);
    
    /** Trigger a checkpoint of the specified type */
    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);
    
    /** Trigger a savepoint */
    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
        @Nullable String targetLocation,
        SavepointFormatType formatType
    );
    
    
    /** Shutdown the checkpoint coordinator */
    public void shutdown() throws Exception;
    
    /** Get the checkpoint store */
    public CompletedCheckpointStore getCheckpointStore();
    
    /** Start the checkpoint scheduler */
    public void startCheckpointScheduler();
    
    /** Check if periodic checkpointing has been started */
    public boolean isPeriodicCheckpointingStarted();
    
    /** Restore from the latest checkpoint */
    public boolean restoreLatestCheckpointedStateToAll(
        Set<ExecutionJobVertex> tasks,
        boolean errorIfNoCheckpoint
    );
    
    /** Acknowledge checkpoint from task */
    public void receiveAcknowledgeMessage(
        AcknowledgeCheckpoint message,
        String taskManagerLocationInfo
    ) throws CheckpointException;
    
    /** Handle checkpoint decline from task */
    public void receiveDeclineMessage(
        DeclineCheckpoint message,
        String taskManagerLocationInfo
    );
    
    /** Get number of retained checkpoints */
    public int getNumberOfRetainedSuccessfulCheckpoints();
    
    /** Get number of pending checkpoints */
    public int getNumberOfPendingCheckpoints();
    
    /** Get checkpoint timeout */
    public long getCheckpointTimeout();
    
    /** Check if periodic checkpointing is enabled */
    public boolean isPeriodicCheckpointingConfigured();
}

Usage Examples:

// Create checkpoint coordinator configuration
CheckpointCoordinatorConfiguration checkpointConfig = 
    new CheckpointCoordinatorConfiguration.Builder()
        .setCheckpointInterval(5000L)  // 5 seconds
        .setCheckpointTimeout(60000L)  // 1 minute
        .setMaxConcurrentCheckpoints(1)
        .setMinPauseBetweenCheckpoints(1000L)
        .setPreferCheckpointForRecovery(true)
        .setTolerableCheckpointFailureNumber(3)
        .build();

// Enable checkpointing on execution graph
executionGraph.enableCheckpointing(
    checkpointConfig,
    masterTriggerRestoreHooks,
    checkpointIdCounter,
    completedCheckpointStore,
    stateBackend,
    checkpointStorage,
    statsTracker,
    checkpointsCleaner
);

// Trigger manual checkpoint
CheckpointCoordinator coordinator = executionGraph.getCheckpointCoordinator();
CompletableFuture<CompletedCheckpoint> checkpointFuture = 
    coordinator.triggerCheckpoint(
        CheckpointType.CHECKPOINT,
        CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
        null,  // external location
        false, // not periodic
        System.currentTimeMillis()
    );

checkpointFuture.thenAccept(checkpoint -> {
    System.out.println("Checkpoint " + checkpoint.getCheckpointId() + " completed");
});

KeyedStateBackend

Backend for managing keyed state (state associated with keys) with support for different storage engines.

/**
 * A keyed state backend provides methods for managing keyed state.
 */
public interface KeyedStateBackend<K> extends KeyedState, Disposable {
    /** Create or retrieve a keyed state */
    <T> InternalKvState<K, ?, T> createState(
        StateDescriptor<?, T> stateDescriptor,
        TypeSerializer<T> namespaceSerializer
    );
    
    /** Get partitioned state for a specific namespace */
    <N, S extends State, T> S getPartitionedState(
        N namespace,
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, T> stateDescriptor
    );
    
    /** Take a snapshot of the keyed state */
    RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
        long checkpointId,
        long timestamp,
        CheckpointStreamFactory streamFactory,
        CheckpointOptions checkpointOptions
    );
    
    /** Restore from a state handle */
    void restore(StateHandles<KeyedStateHandle> restoredState);
    
    /** Get current key */
    K getCurrentKey();
    
    /** Set current key context */
    void setCurrentKey(K newKey);
    
    /** Get key serializer */
    TypeSerializer<K> getKeySerializer();
    
    /** Get key groups for this backend */
    KeyGroupRange getKeyGroupRange();
    
    /** Get number of key groups */
    int getNumberOfKeyGroups();
    
    /** Close the backend and release resources */
    void close();
    
    /** Dispose the backend */
    void dispose();
    
    /** Apply state to a key group */
    void applyToAllKeys(
        N namespace,
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<?, ?> stateDescriptor,
        KeyedStateFunction<K, ?> function
    );
    
    /** Get approximate memory usage */
    long getApproximateMemoryUsage();
}

OperatorStateBackend

Backend for managing operator state (state not associated with keys) including list state and broadcast state.

/**
 * Interface for operator state backends. Operator state is state that is associated with
 * parallel instances of an operator (tasks), as opposed to keyed state.
 */
public interface OperatorStateBackend extends Disposable {
    /** Get list state for operator state */
    <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor);
    
    /** Get union list state (combines state from all parallel instances) */
    <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor);
    
    /** Get broadcast state for coordinating across parallel instances */
    <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor);
    
    /** Take a snapshot of the operator state */
    RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
        long checkpointId,
        long timestamp,
        CheckpointStreamFactory factory,
        CheckpointOptions checkpointOptions
    );
    
    /** Restore from state handles */
    void restore(StateHandles<OperatorStateHandle> stateHandles);
    
    /** Close the backend */
    void close();
    
    /** Dispose the backend */
    void dispose();
}

Usage Examples:

// Using KeyedStateBackend in a task
public class MyKeyedTask extends AbstractInvokable {
    private KeyedStateBackend<String> keyedStateBackend;
    private ValueState<Integer> countState;
    
    @Override
    public void invoke() throws Exception {
        // Get keyed state backend from runtime context
        keyedStateBackend = getRuntimeContext().getKeyedStateBackend();
        
        // Create state descriptor
        ValueStateDescriptor<Integer> descriptor = 
            new ValueStateDescriptor<>("count", Integer.class);
        
        // Get state
        countState = keyedStateBackend.getPartitionedState(
            VoidNamespace.INSTANCE,
            VoidNamespaceSerializer.INSTANCE,
            descriptor
        );
        
        // Use state
        keyedStateBackend.setCurrentKey("user123");
        Integer currentCount = countState.value();
        countState.update(currentCount == null ? 1 : currentCount + 1);
    }
}

// Using OperatorStateBackend
public class MyOperatorTask extends AbstractInvokable {
    private OperatorStateBackend operatorStateBackend;
    private ListState<String> bufferState;
    
    @Override
    public void invoke() throws Exception {
        operatorStateBackend = getRuntimeContext().getOperatorStateBackend();
        
        // Create list state for buffering
        ListStateDescriptor<String> descriptor = 
            new ListStateDescriptor<>("buffer", String.class);
        bufferState = operatorStateBackend.getListState(descriptor);
        
        // Use state
        bufferState.add("new_item");
        Iterable<String> bufferedItems = bufferState.get();
    }
}

CompletedCheckpoint

Represents a successfully completed checkpoint with metadata and state handles.

/**
 * A completed checkpoint represents a snapshot of the state of all operators
 * that has been acknowledged by all tasks.
 */
public class CompletedCheckpoint implements Serializable {
    /** Get the checkpoint ID */
    public long getCheckpointId();
    
    /** Get checkpoint timestamp */
    public long getTimestamp();
    
    /** Get checkpoint duration in milliseconds */
    public long getDuration();
    
    /** Get total checkpoint size in bytes */
    public long getSize();
    
    /** Get external pointer (path) for this checkpoint */
    public String getExternalPointer();
    
    /** Get operator states */
    public Map<OperatorID, OperatorState> getOperatorStates();
    
    /** Get master hook states */
    public Collection<MasterState> getMasterHookStates();
    
    /** Get checkpoint properties */
    public CheckpointProperties getProperties();
    
    /** Check if checkpoint is discarded */
    public boolean isDiscarded();
    
    /** Discard the checkpoint and clean up resources */
    public CompletableFuture<Void> discardOnSubsume();
    
    /** Discard the checkpoint on cancellation */
    public CompletableFuture<Void> discardOnCancellation();
    
    /** Discard the checkpoint on shutdown */
    public CompletableFuture<Void> discardOnShutdown(JobStatus jobStatus);
    
    /** Get state size statistics */
    public CheckpointStatsSummarySnapshot getStatsSummary();
}

StateBackend Configuration

Configuration and factory classes for different state backends.

/**
 * Base class for configurable state backends
 */
public abstract class ConfigurableStateBackend implements StateBackend, Configurable {
    /** Configure the state backend from configuration */
    public abstract StateBackend configure(ReadableConfig config, ClassLoader classLoader);
    
    /** Get default savepoint directory */
    public abstract String getDefaultSavepointDirectory();
    
    /** Check if state backend supports async snapshots */
    public abstract boolean supportsAsynchronousSnapshots();
}

/**
 * State backend loader utility
 */
public class StateBackendLoader {
    /** Load state backend from configuration */
    public static StateBackend loadStateBackendFromConfig(
        ReadableConfig config,
        ClassLoader classLoader,
        String defaultStateBackend
    );
    
    /** Create state backend from factory class name */
    public static StateBackend fromApplicationOrConfigOrDefault(
        StateBackend fromApplication,
        Configuration config,
        ClassLoader classLoader,
        String defaultStateBackend
    );
}

Checkpoint Storage

Configuration for where checkpoints are stored (filesystem, S3, etc.).

/**
 * Checkpoint storage defines how checkpoint data and metadata are persisted
 */
public interface CheckpointStorage {
    /** Check if storage supports highly available storage */
    boolean supportsHighlyAvailableStorage();
    
    /** Check if storage has default savepoint location */
    boolean hasDefaultSavepointLocation();
    
    /** Get default savepoint directory */
    String getDefaultSavepointDirectory();
    
    /** Resolve checkpoint storage from configuration */
    CheckpointStorageAccess resolveCheckpointStorageAccess(
        JobID jobId,
        CheckpointStorageAccessCoordinatorView coordinatorView
    );
    
    /** Create checkpoint storage access for workers */
    CheckpointStorageWorkerView createCheckpointStorageWorkerView(
        Configuration configuration,
        ResourceID resourceId
    );
}

/**
 * Configurable checkpoint storage
 */
public abstract class ConfigurableCheckpointStorage 
    implements CheckpointStorage, Configurable {
    
    /** Configure checkpoint storage from configuration */
    public abstract CheckpointStorage configure(
        ReadableConfig config, 
        ClassLoader classLoader
    );
}

Usage Examples:

// Configure different state backends
Configuration config = new Configuration();

// Memory state backend
config.setString(StateBackendOptions.STATE_BACKEND, "hashmap");

// Filesystem state backend
config.setString(StateBackendOptions.STATE_BACKEND, "filesystem");
config.setString(StateBackendOptions.CHECKPOINTS_DIRECTORY, "hdfs://cluster/checkpoints");

// RocksDB state backend
config.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
config.setString(StateBackendOptions.CHECKPOINTS_DIRECTORY, "s3://bucket/checkpoints");
config.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY, true);

// Load state backend
StateBackend stateBackend = StateBackendLoader.loadStateBackendFromConfig(
    config,
    classLoader,
    null
);

Types

// Checkpoint types and properties
public enum CheckpointType implements SnapshotType {
    CHECKPOINT("Checkpoint"),
    SAVEPOINT("Savepoint");
    
    public String getName();
    public boolean isSavepoint();
}

public class CheckpointProperties implements Serializable {
    public static CheckpointProperties forCheckpoint(CheckpointRetentionPolicy retentionPolicy);
    public static CheckpointProperties forSavepoint(boolean forced);
    
    public CheckpointType getCheckpointType();
    public CheckpointRetentionPolicy getRetentionPolicy();
    public boolean isForced();
}

public enum CheckpointRetentionPolicy {
    NEVER_RETAIN_AFTER_TERMINATION,
    RETAIN_ON_CANCELLATION,
    RETAIN_ON_FAILURE
}

// State handles
public interface StateHandle extends Serializable {
    void discardState();
    long getStateSize();
}

public interface KeyedStateHandle extends StateHandle {
    KeyGroupRange getKeyGroupRange();
    KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange);
}

public interface OperatorStateHandle extends StateHandle {
    Map<String, OperatorStateHandle.StateMetaInfo> getStateNameToPartitionOffsets();
    FSDataInputStream openInputStream();
}

// Checkpoint options and metadata
public class CheckpointOptions implements Serializable {
    public CheckpointOptions(
        CheckpointType checkpointType,
        CheckpointStorageLocationReference targetLocation
    );
    
    public CheckpointType getCheckpointType();
    public CheckpointStorageLocationReference getTargetLocation();
    public boolean isExactlyOnceMode();
}

public class CheckpointMetaData implements Serializable {
    public CheckpointMetaData(long checkpointId, long timestamp);
    
    public long getCheckpointId();
    public long getTimestamp();
}

// Snapshot results
public class SnapshotResult<T extends StateObject> implements Serializable {
    public static <T extends StateObject> SnapshotResult<T> empty();
    public static <T extends StateObject> SnapshotResult<T> of(T jobManagerState);
    public static <T extends StateObject> SnapshotResult<T> withLocalState(
        T jobManagerState, 
        T taskLocalState
    );
    
    public T getJobManagerOwnedSnapshot();
    public T getTaskLocalSnapshot();
    public long getStateSize();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime

docs

execution-scheduling.md

high-availability.md

index.md

job-graph.md

resource-management.md

state-checkpointing.md

task-execution.md

tile.json