Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.
—
The state management system provides pluggable state backends and checkpointing mechanisms for fault tolerance and exactly-once processing guarantees. This system enables stateful stream processing applications to maintain state consistently across failures and restarts.
The primary interface for state storage and checkpointing backends. State backends determine where and how state is stored during execution and checkpointing.
public interface StateBackend extends Serializable {
<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry
) throws Exception;
OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier
) throws Exception;
CompletableFuture<CheckpointStorageLocation> resolveCheckpoint(String checkpointPointer) throws IOException;
}Factory interface for creating state backend instances from configuration.
public interface StateBackendFactory<T extends StateBackend> {
T createFromConfig(Configuration config) throws IllegalConfigurationException;
String getIdentifier();
}Context provided to user functions during initialization to set up managed and keyed state.
public interface FunctionInitializationContext {
boolean isRestored();
OperatorStateStore getOperatorStateStore();
KeyedStateStore getKeyedStateStore();
ManagedInitializationContext getManagedInitializationContext();
}Context provided to user functions during state snapshotting operations.
public interface FunctionSnapshotContext {
long getCheckpointId();
long getCheckpointTimestamp();
}General context interface for state snapshotting operations.
public interface StateSnapshotContext {
long getCheckpointId();
long getCheckpointTimestamp();
CheckpointStreamFactory getCheckpointStreamFactory();
}Store for operator state that is partitioned per parallel operator instance.
public interface OperatorStateStore {
<T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor) throws Exception;
<T> ListState<T> getUnionListState(ListStateDescriptor<T> stateDescriptor) throws Exception;
<T> BroadcastState<String, T> getBroadcastState(MapStateDescriptor<String, T> stateDescriptor) throws Exception;
Set<String> getRegisteredStateNames();
Set<String> getRegisteredBroadcastStateNames();
}Store for keyed state that is partitioned and scoped by key.
public interface KeyedStateStore {
<T> ValueState<T> getState(ValueStateDescriptor<T> stateDescriptor) throws Exception;
<T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor) throws Exception;
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateDescriptor) throws Exception;
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor) throws Exception;
<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateDescriptor) throws Exception;
<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateDescriptor) throws Exception;
}Provides access to state partition streams during restore operations.
public interface StatePartitionStreamProvider {
FSDataInputStream getStream() throws IOException;
}Coordinates the distributed checkpointing process across all operators in a job.
public class CheckpointCoordinator {
public CheckpointCoordinator(
JobID job,
CheckpointConfig chkConfig,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
ClassLoader userClassLoader,
CheckpointIDCounter checkpointIdCounter,
CompletedCheckpointStore completedCheckpointStore,
StateBackend checkpointStateBackend,
Executor executor,
CheckpointFailureManager failureManager
);
public void startCheckpointScheduler();
public void stopCheckpointScheduler();
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
CheckpointProperties props,
String externalSavepointLocation,
boolean isPeriodic
);
public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message);
public void receiveDeclineMessage(DeclineCheckpoint message);
public void restoreLatestCheckpointedState(
Map<JobVertexID, ExecutionJobVertex> tasks,
boolean errorIfNoCheckpoint,
boolean allOrNothingState
) throws Exception;
}Metadata associated with checkpoints.
public class CheckpointMetaData implements Serializable {
public CheckpointMetaData(long checkpointId, long timestamp);
public long getCheckpointId();
public long getTimestamp();
}Configuration options for checkpoint behavior.
public class CheckpointOptions implements Serializable {
public CheckpointOptions(CheckpointType checkpointType, CheckpointStorageLocationReference targetLocation);
public CheckpointType getCheckpointType();
public CheckpointStorageLocationReference getTargetLocation();
public static CheckpointOptions forCheckpointWithDefaultLocation();
public static CheckpointOptions forSavepoint(CheckpointStorageLocationReference location);
}Enumeration of checkpoint types.
public enum CheckpointType {
CHECKPOINT(false),
SAVEPOINT(true);
private final boolean isSavepoint;
public boolean isSavepoint();
}Exception for checkpoint-related failures.
public class CheckpointException extends FlinkException {
public CheckpointException(String message);
public CheckpointException(String message, Throwable cause);
public CheckpointException(CheckpointFailureReason reason);
public CheckpointException(CheckpointFailureReason reason, Throwable cause);
public CheckpointFailureReason getCheckpointFailureReason();
}Enumeration of checkpoint failure reasons.
public enum CheckpointFailureReason {
CHECKPOINT_COORDINATOR_SHUTDOWN,
CHECKPOINT_COORDINATOR_SUSPEND,
CHECKPOINT_DECLINED_TASK_NOT_READY,
CHECKPOINT_DECLINED_SUBSUMED,
CHECKPOINT_DECLINED_TIME_OUT,
CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED,
CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER,
CHECKPOINT_DECLINED_INPUT_END_OF_STREAM,
CHECKPOINT_ASYNC_EXCEPTION,
CHECKPOINT_EXPIRED,
TASK_CHECKPOINT_FAILURE,
TASK_FAILURE_DURING_CHECKPOINT,
UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE,
TRIGGER_CHECKPOINT_FAILURE,
FINALIZE_CHECKPOINT_FAILURE,
IO_EXCEPTION;
}import org.apache.flink.runtime.state.*;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
public class StatefulMapFunction extends RichMapFunction<String, String>
implements CheckpointedFunction {
private ValueState<Integer> countState;
private ListState<String> bufferState;
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Initialize keyed state
ValueStateDescriptor<Integer> countDescriptor =
new ValueStateDescriptor<>("count", Integer.class);
countState = context.getKeyedStateStore().getState(countDescriptor);
// Initialize operator state
ListStateDescriptor<String> bufferDescriptor =
new ListStateDescriptor<>("buffer", String.class);
bufferState = context.getOperatorStateStore().getListState(bufferDescriptor);
// Restore state if recovering from checkpoint
if (context.isRestored()) {
System.out.println("Restoring state from checkpoint");
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// State is automatically managed for managed state
System.out.println("Taking snapshot for checkpoint: " + context.getCheckpointId());
}
@Override
public String map(String value) throws Exception {
// Use keyed state
Integer currentCount = countState.value();
if (currentCount == null) {
currentCount = 0;
}
countState.update(currentCount + 1);
// Use operator state
bufferState.add(value);
return value + " (processed " + (currentCount + 1) + " times)";
}
}import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
// Configure memory state backend (for testing/development)
StateBackend memoryBackend = new MemoryStateBackend();
// Configure filesystem state backend (for production)
StateBackend fsBackend = new FsStateBackend("file:///checkpoints");
// Configure state backend in job configuration
Configuration jobConfig = new Configuration();
jobConfig.setString("state.backend", "filesystem");
jobConfig.setString("state.checkpoints.dir", "file:///checkpoints");
jobConfig.setString("state.savepoints.dir", "file:///savepoints");
// Set advanced checkpointing options
jobConfig.setLong("execution.checkpointing.interval", 30000L);
jobConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
jobConfig.setLong("execution.checkpointing.timeout", 600000L);
jobConfig.setInteger("execution.checkpointing.max-concurrent-checkpoints", 1);
jobConfig.setLong("execution.checkpointing.min-pause", 5000L);import org.apache.flink.runtime.checkpoint.*;
// Set up checkpoint coordinator
CheckpointConfig checkpointConfig = new CheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setCheckpointInterval(30000); // 30 seconds
checkpointConfig.setCheckpointTimeout(600000); // 10 minutes
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.setMinPauseBetweenCheckpoints(5000); // 5 seconds
// Configure checkpoint retention
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// Set up checkpoint storage
checkpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoints"));
CheckpointCoordinator coordinator = new CheckpointCoordinator(
jobGraph.getJobID(),
checkpointConfig,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
userClassLoader,
checkpointIdCounter,
completedCheckpointStore,
stateBackend,
executor,
failureManager
);
// Start periodic checkpointing
coordinator.startCheckpointScheduler();// Trigger a checkpoint manually
CheckpointProperties properties = CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION);
CompletableFuture<CompletedCheckpoint> checkpointFuture = coordinator.triggerCheckpoint(
properties,
null, // no external savepoint location
false // not periodic
);
checkpointFuture.whenComplete((checkpoint, throwable) -> {
if (throwable != null) {
System.err.println("Checkpoint failed: " + throwable.getMessage());
} else {
System.out.println("Checkpoint completed: " + checkpoint.getCheckpointID());
}
});// Handle state schema evolution
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ValueStateDescriptor<MyState> descriptor = new ValueStateDescriptor<>(
"myState",
new MyStateTypeSerializer()
);
// Configure state migration
descriptor.initializeSerializerUnlessSet(new MyStateTypeSerializer());
state = context.getKeyedStateStore().getState(descriptor);
}// Set up broadcast state for configuration
MapStateDescriptor<String, Configuration> configDescriptor =
new MapStateDescriptor<>("config", String.class, Configuration.class);
BroadcastState<String, Configuration> broadcastState =
context.getOperatorStateStore().getBroadcastState(configDescriptor);
// Update broadcast state
broadcastState.put("global-config", newConfiguration);
// Read from broadcast state in processing function
Configuration config = broadcastState.get("global-config");Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10