The runtime module of Apache Flink that provides the core execution engine, scheduling, fault tolerance, checkpointing, and resource management capabilities for distributed stream processing applications
—
Comprehensive state management with pluggable backends and distributed checkpointing for fault tolerance. Flink provides exactly-once processing guarantees through its advanced checkpointing mechanism and flexible state backends.
Central coordinator for checkpointing in Flink jobs, managing checkpoint triggering, completion, and recovery across all tasks.
/**
* The checkpoint coordinator coordinates the distributed snapshots of operators and state.
* It triggers the checkpoint by sending messages to the relevant tasks and collects the
* checkpoint acknowledgements. It also maintains and cleans up the checkpoint meta data.
*/
public class CheckpointCoordinator {
/** Trigger a periodic checkpoint for the job */
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic);
/** Trigger a checkpoint of the specified type */
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);
/** Trigger a savepoint */
public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
@Nullable String targetLocation,
SavepointFormatType formatType
);
/** Shutdown the checkpoint coordinator */
public void shutdown() throws Exception;
/** Get the checkpoint store */
public CompletedCheckpointStore getCheckpointStore();
/** Start the checkpoint scheduler */
public void startCheckpointScheduler();
/** Check if periodic checkpointing has been started */
public boolean isPeriodicCheckpointingStarted();
/** Restore from the latest checkpoint */
public boolean restoreLatestCheckpointedStateToAll(
Set<ExecutionJobVertex> tasks,
boolean errorIfNoCheckpoint
);
/** Acknowledge checkpoint from task */
public void receiveAcknowledgeMessage(
AcknowledgeCheckpoint message,
String taskManagerLocationInfo
) throws CheckpointException;
/** Handle checkpoint decline from task */
public void receiveDeclineMessage(
DeclineCheckpoint message,
String taskManagerLocationInfo
);
/** Get number of retained checkpoints */
public int getNumberOfRetainedSuccessfulCheckpoints();
/** Get number of pending checkpoints */
public int getNumberOfPendingCheckpoints();
/** Get checkpoint timeout */
public long getCheckpointTimeout();
/** Check if periodic checkpointing is enabled */
public boolean isPeriodicCheckpointingConfigured();
}Usage Examples:
// Create checkpoint coordinator configuration
CheckpointCoordinatorConfiguration checkpointConfig =
new CheckpointCoordinatorConfiguration.Builder()
.setCheckpointInterval(5000L) // 5 seconds
.setCheckpointTimeout(60000L) // 1 minute
.setMaxConcurrentCheckpoints(1)
.setMinPauseBetweenCheckpoints(1000L)
.setPreferCheckpointForRecovery(true)
.setTolerableCheckpointFailureNumber(3)
.build();
// Enable checkpointing on execution graph
executionGraph.enableCheckpointing(
checkpointConfig,
masterTriggerRestoreHooks,
checkpointIdCounter,
completedCheckpointStore,
stateBackend,
checkpointStorage,
statsTracker,
checkpointsCleaner
);
// Trigger manual checkpoint
CheckpointCoordinator coordinator = executionGraph.getCheckpointCoordinator();
CompletableFuture<CompletedCheckpoint> checkpointFuture =
coordinator.triggerCheckpoint(
CheckpointType.CHECKPOINT,
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
null, // external location
false, // not periodic
System.currentTimeMillis()
);
checkpointFuture.thenAccept(checkpoint -> {
System.out.println("Checkpoint " + checkpoint.getCheckpointId() + " completed");
});Backend for managing keyed state (state associated with keys) with support for different storage engines.
/**
* A keyed state backend provides methods for managing keyed state.
*/
public interface KeyedStateBackend<K> extends KeyedState, Disposable {
/** Create or retrieve a keyed state */
<T> InternalKvState<K, ?, T> createState(
StateDescriptor<?, T> stateDescriptor,
TypeSerializer<T> namespaceSerializer
);
/** Get partitioned state for a specific namespace */
<N, S extends State, T> S getPartitionedState(
N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, T> stateDescriptor
);
/** Take a snapshot of the keyed state */
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions
);
/** Restore from a state handle */
void restore(StateHandles<KeyedStateHandle> restoredState);
/** Get current key */
K getCurrentKey();
/** Set current key context */
void setCurrentKey(K newKey);
/** Get key serializer */
TypeSerializer<K> getKeySerializer();
/** Get key groups for this backend */
KeyGroupRange getKeyGroupRange();
/** Get number of key groups */
int getNumberOfKeyGroups();
/** Close the backend and release resources */
void close();
/** Dispose the backend */
void dispose();
/** Apply state to a key group */
void applyToAllKeys(
N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<?, ?> stateDescriptor,
KeyedStateFunction<K, ?> function
);
/** Get approximate memory usage */
long getApproximateMemoryUsage();
}Backend for managing operator state (state not associated with keys) including list state and broadcast state.
/**
* Interface for operator state backends. Operator state is state that is associated with
* parallel instances of an operator (tasks), as opposed to keyed state.
*/
public interface OperatorStateBackend extends Disposable {
/** Get list state for operator state */
<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor);
/** Get union list state (combines state from all parallel instances) */
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor);
/** Get broadcast state for coordinating across parallel instances */
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor);
/** Take a snapshot of the operator state */
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
long checkpointId,
long timestamp,
CheckpointStreamFactory factory,
CheckpointOptions checkpointOptions
);
/** Restore from state handles */
void restore(StateHandles<OperatorStateHandle> stateHandles);
/** Close the backend */
void close();
/** Dispose the backend */
void dispose();
}Usage Examples:
// Using KeyedStateBackend in a task
public class MyKeyedTask extends AbstractInvokable {
private KeyedStateBackend<String> keyedStateBackend;
private ValueState<Integer> countState;
@Override
public void invoke() throws Exception {
// Get keyed state backend from runtime context
keyedStateBackend = getRuntimeContext().getKeyedStateBackend();
// Create state descriptor
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("count", Integer.class);
// Get state
countState = keyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
descriptor
);
// Use state
keyedStateBackend.setCurrentKey("user123");
Integer currentCount = countState.value();
countState.update(currentCount == null ? 1 : currentCount + 1);
}
}
// Using OperatorStateBackend
public class MyOperatorTask extends AbstractInvokable {
private OperatorStateBackend operatorStateBackend;
private ListState<String> bufferState;
@Override
public void invoke() throws Exception {
operatorStateBackend = getRuntimeContext().getOperatorStateBackend();
// Create list state for buffering
ListStateDescriptor<String> descriptor =
new ListStateDescriptor<>("buffer", String.class);
bufferState = operatorStateBackend.getListState(descriptor);
// Use state
bufferState.add("new_item");
Iterable<String> bufferedItems = bufferState.get();
}
}Represents a successfully completed checkpoint with metadata and state handles.
/**
* A completed checkpoint represents a snapshot of the state of all operators
* that has been acknowledged by all tasks.
*/
public class CompletedCheckpoint implements Serializable {
/** Get the checkpoint ID */
public long getCheckpointId();
/** Get checkpoint timestamp */
public long getTimestamp();
/** Get checkpoint duration in milliseconds */
public long getDuration();
/** Get total checkpoint size in bytes */
public long getSize();
/** Get external pointer (path) for this checkpoint */
public String getExternalPointer();
/** Get operator states */
public Map<OperatorID, OperatorState> getOperatorStates();
/** Get master hook states */
public Collection<MasterState> getMasterHookStates();
/** Get checkpoint properties */
public CheckpointProperties getProperties();
/** Check if checkpoint is discarded */
public boolean isDiscarded();
/** Discard the checkpoint and clean up resources */
public CompletableFuture<Void> discardOnSubsume();
/** Discard the checkpoint on cancellation */
public CompletableFuture<Void> discardOnCancellation();
/** Discard the checkpoint on shutdown */
public CompletableFuture<Void> discardOnShutdown(JobStatus jobStatus);
/** Get state size statistics */
public CheckpointStatsSummarySnapshot getStatsSummary();
}Configuration and factory classes for different state backends.
/**
* Base class for configurable state backends
*/
public abstract class ConfigurableStateBackend implements StateBackend, Configurable {
/** Configure the state backend from configuration */
public abstract StateBackend configure(ReadableConfig config, ClassLoader classLoader);
/** Get default savepoint directory */
public abstract String getDefaultSavepointDirectory();
/** Check if state backend supports async snapshots */
public abstract boolean supportsAsynchronousSnapshots();
}
/**
* State backend loader utility
*/
public class StateBackendLoader {
/** Load state backend from configuration */
public static StateBackend loadStateBackendFromConfig(
ReadableConfig config,
ClassLoader classLoader,
String defaultStateBackend
);
/** Create state backend from factory class name */
public static StateBackend fromApplicationOrConfigOrDefault(
StateBackend fromApplication,
Configuration config,
ClassLoader classLoader,
String defaultStateBackend
);
}Configuration for where checkpoints are stored (filesystem, S3, etc.).
/**
* Checkpoint storage defines how checkpoint data and metadata are persisted
*/
public interface CheckpointStorage {
/** Check if storage supports highly available storage */
boolean supportsHighlyAvailableStorage();
/** Check if storage has default savepoint location */
boolean hasDefaultSavepointLocation();
/** Get default savepoint directory */
String getDefaultSavepointDirectory();
/** Resolve checkpoint storage from configuration */
CheckpointStorageAccess resolveCheckpointStorageAccess(
JobID jobId,
CheckpointStorageAccessCoordinatorView coordinatorView
);
/** Create checkpoint storage access for workers */
CheckpointStorageWorkerView createCheckpointStorageWorkerView(
Configuration configuration,
ResourceID resourceId
);
}
/**
* Configurable checkpoint storage
*/
public abstract class ConfigurableCheckpointStorage
implements CheckpointStorage, Configurable {
/** Configure checkpoint storage from configuration */
public abstract CheckpointStorage configure(
ReadableConfig config,
ClassLoader classLoader
);
}Usage Examples:
// Configure different state backends
Configuration config = new Configuration();
// Memory state backend
config.setString(StateBackendOptions.STATE_BACKEND, "hashmap");
// Filesystem state backend
config.setString(StateBackendOptions.STATE_BACKEND, "filesystem");
config.setString(StateBackendOptions.CHECKPOINTS_DIRECTORY, "hdfs://cluster/checkpoints");
// RocksDB state backend
config.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
config.setString(StateBackendOptions.CHECKPOINTS_DIRECTORY, "s3://bucket/checkpoints");
config.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY, true);
// Load state backend
StateBackend stateBackend = StateBackendLoader.loadStateBackendFromConfig(
config,
classLoader,
null
);// Checkpoint types and properties
public enum CheckpointType implements SnapshotType {
CHECKPOINT("Checkpoint"),
SAVEPOINT("Savepoint");
public String getName();
public boolean isSavepoint();
}
public class CheckpointProperties implements Serializable {
public static CheckpointProperties forCheckpoint(CheckpointRetentionPolicy retentionPolicy);
public static CheckpointProperties forSavepoint(boolean forced);
public CheckpointType getCheckpointType();
public CheckpointRetentionPolicy getRetentionPolicy();
public boolean isForced();
}
public enum CheckpointRetentionPolicy {
NEVER_RETAIN_AFTER_TERMINATION,
RETAIN_ON_CANCELLATION,
RETAIN_ON_FAILURE
}
// State handles
public interface StateHandle extends Serializable {
void discardState();
long getStateSize();
}
public interface KeyedStateHandle extends StateHandle {
KeyGroupRange getKeyGroupRange();
KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange);
}
public interface OperatorStateHandle extends StateHandle {
Map<String, OperatorStateHandle.StateMetaInfo> getStateNameToPartitionOffsets();
FSDataInputStream openInputStream();
}
// Checkpoint options and metadata
public class CheckpointOptions implements Serializable {
public CheckpointOptions(
CheckpointType checkpointType,
CheckpointStorageLocationReference targetLocation
);
public CheckpointType getCheckpointType();
public CheckpointStorageLocationReference getTargetLocation();
public boolean isExactlyOnceMode();
}
public class CheckpointMetaData implements Serializable {
public CheckpointMetaData(long checkpointId, long timestamp);
public long getCheckpointId();
public long getTimestamp();
}
// Snapshot results
public class SnapshotResult<T extends StateObject> implements Serializable {
public static <T extends StateObject> SnapshotResult<T> empty();
public static <T extends StateObject> SnapshotResult<T> of(T jobManagerState);
public static <T extends StateObject> SnapshotResult<T> withLocalState(
T jobManagerState,
T taskLocalState
);
public T getJobManagerOwnedSnapshot();
public T getTaskLocalSnapshot();
public long getStateSize();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime