The runtime module of Apache Flink that provides the core execution engine, scheduling, fault tolerance, checkpointing, and resource management capabilities for distributed stream processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-runtime@2.1.0Apache Flink Runtime is the core execution engine that powers the Apache Flink distributed stream processing framework. It provides essential infrastructure for executing stream and batch processing applications at scale, including job scheduling, task coordination, fault tolerance through checkpointing, state management, and resource allocation across distributed environments.
pom.xml or Gradle build.gradle// Job graph construction
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
// Execution graph and scheduling
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
// State management and checkpointing
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
// Task execution
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
// Resource management
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
// Testing and development
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
// Job management and monitoring
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;// Create a simple job graph
JobGraph jobGraph = new JobGraph("MyFlinkJob");
// Create and configure a job vertex
JobVertex sourceVertex = new JobVertex("Source");
sourceVertex.setParallelism(4);
sourceVertex.setInvokableClass(MySourceTask.class);
JobVertex mapVertex = new JobVertex("Map");
mapVertex.setParallelism(4);
mapVertex.setInvokableClass(MyMapTask.class);
// Connect vertices
mapVertex.connectNewDataSetAsInput(
sourceVertex,
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED
);
// Add vertices to job graph
jobGraph.addVertex(sourceVertex);
jobGraph.addVertex(mapVertex);
// Configure checkpointing
JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(
Arrays.asList(sourceVertex.getID(), mapVertex.getID()),
Arrays.asList(sourceVertex.getID(), mapVertex.getID()),
Arrays.asList(sourceVertex.getID(), mapVertex.getID()),
new CheckpointCoordinatorConfiguration.Builder()
.setCheckpointInterval(5000L)
.setCheckpointTimeout(60000L)
.build(),
null
);
jobGraph.setSnapshotSettings(checkpointingSettings);
// For testing - execute job in local MiniCluster
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(4)
.build();
MiniCluster miniCluster = new MiniCluster(config);
try {
miniCluster.start();
CompletableFuture<JobSubmissionResult> submission = miniCluster.submitJob(jobGraph);
JobSubmissionResult result = submission.get();
System.out.println("Job submitted with ID: " + result.getJobID());
// Wait for job completion or trigger savepoint
CompletableFuture<JobResult> jobResult = miniCluster.requestJobResult(result.getJobID());
JobResult finalResult = jobResult.get();
System.out.println("Job finished with status: " + finalResult.getJobExecutionResult());
} finally {
miniCluster.closeAsync().get();
}Apache Flink Runtime is built around several key architectural components:
The runtime operates as a distributed system where JobManager coordinates execution while TaskManagers execute the actual data processing tasks.
Core APIs for defining and configuring Flink jobs as directed acyclic graphs (DAGs) of operations.
public class JobGraph implements ExecutionPlan {
public JobGraph(String jobName);
public JobGraph(JobID jobId, String jobName);
public void addVertex(JobVertex vertex);
public Iterable<JobVertex> getVertices();
public JobID getJobID();
public String getName();
public void setJobType(JobType jobType);
public JobType getJobType();
public JobVertex findVertexByID(JobVertexID id);
public void setJobConfiguration(Configuration jobConfiguration);
public Configuration getJobConfiguration();
public void setSnapshotSettings(JobCheckpointingSettings settings);
public JobCheckpointingSettings getCheckpointingSettings();
}
public class JobVertex implements Serializable {
public JobVertex(String name);
public JobVertex(String name, JobVertexID id);
public JobVertexID getId();
public String getName();
public void setParallelism(int parallelism);
public int getParallelism();
public void setInvokableClass(Class<? extends TaskInvokable> invokable);
public void connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType
);
}Advanced scheduling system for distributed job execution with support for batch and streaming workloads.
public interface SchedulerNG extends GlobalFailureHandler, AutoCloseableAsync {
void startScheduling();
void cancel();
CompletableFuture<JobStatus> getJobTerminationFuture();
boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState);
CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);
CompletableFuture<String> stopWithSavepoint(
String targetDirectory,
boolean terminate,
SavepointFormatType formatType
);
}
public interface ExecutionGraph {
String getJobName();
JobID getJobID();
JobStatus getState();
ExecutionJobVertex getJobVertex(JobVertexID id);
Iterable<ExecutionJobVertex> getAllVertices();
CheckpointCoordinator getCheckpointCoordinator();
}Comprehensive state management with pluggable backends and distributed checkpointing for fault tolerance.
public class CheckpointCoordinator {
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic);
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);
public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
@Nullable String targetLocation,
SavepointFormatType formatType
);
public void shutdown() throws Exception;
public CompletedCheckpointStore getCheckpointStore();
}
public interface KeyedStateBackend<K> extends KeyedState {
<T> InternalKvState<K, ?, T> createState(
StateDescriptor<?, T> stateDescriptor,
TypeSerializer<T> namespaceSerializer
);
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions
);
}
public interface OperatorStateBackend {
<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor);
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor);
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
long checkpointId,
long timestamp,
CheckpointStreamFactory factory,
CheckpointOptions checkpointOptions
);
}State Management and Checkpointing
Distributed task execution engine with slot management and lifecycle coordination.
public interface TaskExecutorGateway extends RpcGateway {
CompletableFuture<Acknowledge> requestSlot(
SlotID slotId,
JobID jobId,
AllocationID allocationId,
ResourceProfile resourceProfile,
String targetAddress,
ResourceManagerId resourceManagerId,
Duration timeout
);
CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
Duration timeout
);
CompletableFuture<Acknowledge> cancelTask(
ExecutionAttemptID executionAttemptID,
Duration timeout
);
CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions,
Duration timeout
);
}Cluster resource allocation and TaskExecutor lifecycle management for different deployment environments.
public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManagerId> {
CompletableFuture<RegistrationResponse> registerJobMaster(
JobMasterId jobMasterId,
ResourceID jobMasterResourceId,
String jobMasterAddress,
JobID jobId,
Duration timeout
);
CompletableFuture<RegistrationResponse> registerTaskExecutor(
String taskExecutorAddress,
ResourceID resourceId,
SlotReport slotReport,
ResourceProfile totalResourceProfile,
Duration timeout
);
CompletableFuture<Acknowledge> requestSlot(
JobMasterId jobMasterId,
SlotRequest slotRequest,
Duration timeout
);
}
public class WorkerResourceSpec {
public static WorkerResourceSpec fromTotalResourceProfile(
ResourceProfile totalResourceProfile,
MemorySize networkMemorySize
);
public ResourceProfile getTotalResourceProfile();
public MemorySize getTaskHeapSize();
public MemorySize getTaskOffHeapSize();
public MemorySize getNetworkMemSize();
public MemorySize getManagedMemSize();
}Leader election, service discovery, and coordination services for fault-tolerant distributed operation.
public interface HighAvailabilityServices extends AutoCloseableAsync {
LeaderElectionService getResourceManagerLeaderElectionService();
LeaderElectionService getDispatcherLeaderElectionService();
LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
LeaderRetrievalService getResourceManagerLeaderRetriever();
LeaderRetrievalService getDispatcherLeaderRetriever();
CheckpointRecoveryFactory getCheckpointRecoveryFactory();
}
public interface LeaderElectionService {
void start(LeaderContender contender);
void stop();
void confirmLeadership(UUID leaderSessionID);
boolean hasLeadership(UUID leaderSessionId);
}
public interface LeaderRetrievalService {
void start(LeaderRetrievalListener listener);
void stop();
}High Availability and Coordination
MiniCluster provides an embedded Flink cluster for local testing and development, allowing full Flink functionality in a single JVM.
public class MiniCluster implements AutoCloseableAsync {
public MiniCluster(MiniClusterConfiguration configuration);
public void start() throws Exception;
public CompletableFuture<Void> closeAsync();
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph);
public CompletableFuture<JobResult> requestJobResult(JobID jobId);
public CompletableFuture<Acknowledge> cancelJob(JobID jobId);
public CompletableFuture<String> triggerSavepoint(
JobID jobId,
String targetDirectory,
SavepointFormatType formatType
);
public ClusterClient<MiniClusterJobClient> getClusterClient();
public String getRestAddress();
public int getRestPort();
}
public class MiniClusterConfiguration {
public static Builder newBuilder();
public static class Builder {
public Builder setNumTaskManagers(int numTaskManagers);
public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
public Builder setConfiguration(Configuration configuration);
public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing);
public MiniClusterConfiguration build();
}
}DispatcherGateway provides the main interface for job submission, management, and monitoring in Flink clusters.
public interface DispatcherGateway extends FencedRpcGateway<DispatcherId> {
CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Duration timeout);
CompletableFuture<Collection<JobStatusMessage>> listJobs(Duration timeout);
CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Duration timeout);
CompletableFuture<Acknowledge> cancelJob(JobID jobId, Duration timeout);
CompletableFuture<String> triggerSavepoint(
JobID jobId,
String targetDirectory,
SavepointFormatType formatType,
Duration timeout
);
CompletableFuture<String> stopWithSavepoint(
JobID jobId,
String targetDirectory,
SavepointFormatType formatType,
Duration timeout
);
CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Duration timeout);
CompletableFuture<Integer> getBlobServerPort(Duration timeout);
}AccessExecutionGraph provides read-only access to execution graph information for monitoring and inspection.
public interface AccessExecutionGraph {
String getJobName();
JobID getJobID();
JobStatus getState();
JobType getJobType();
long getStatusTimestamp(JobStatus status);
Throwable getFailureCause();
String getFailureCauseAsString();
Iterable<AccessExecutionJobVertex> getAllVertices();
AccessExecutionJobVertex getJobVertex(JobVertexID vertexId);
Map<JobVertexID, AccessExecutionJobVertex> getAllVerticesAsMap();
long getCheckpointCoordinatorCheckpointId();
CheckpointStatsSnapshot getCheckpointStatsSnapshot();
Configuration getJobConfiguration();
SerializedValue<ExecutionConfig> getSerializedExecutionConfig();
boolean isStoppable();
ArchivedExecutionGraph archive();
}// Core identifiers
public class JobID implements Serializable {
public static JobID generate();
public static JobID fromHexString(String hexString);
}
public class JobVertexID implements Serializable {
public JobVertexID();
public JobVertexID(byte[] bytes);
}
public class ExecutionAttemptID implements Serializable {
public ExecutionAttemptID();
public ExecutionAttemptID(ExecutionVertexID vertexId, int attemptNumber);
}
// Resource specifications
public class ResourceProfile implements Serializable {
public static final ResourceProfile ZERO;
public static final ResourceProfile ANY;
public static ResourceProfile fromResources(
double cpuCores,
MemorySize taskHeapMemory,
MemorySize taskOffHeapMemory,
MemorySize managedMemory,
MemorySize networkMemory
);
public CPUResource getCpuCores();
public MemorySize getTaskHeapMemory();
public MemorySize getTaskOffHeapMemory();
}
// Job configuration
public enum JobType {
BATCH, STREAMING
}
public enum DistributionPattern {
POINTWISE, ALL_TO_ALL
}
// Checkpoint types
public enum CheckpointType implements SnapshotType {
CHECKPOINT("Checkpoint"),
SAVEPOINT("Savepoint");
}
// Job execution results
public class JobSubmissionResult {
public JobID getJobID();
public boolean isJobSubmitted();
}
public class JobResult {
public JobID getJobID();
public ApplicationStatus getApplicationStatus();
public JobExecutionResult getJobExecutionResult();
public long getNetRuntime();
public Map<String, SerializedValue<Object>> getAccumulatorResults();
}
public enum JobStatus {
INITIALIZING, CREATED, RUNNING, FAILING, FAILED,
CANCELLING, CANCELED, FINISHED, RESTARTING, SUSPENDED
}
// Mini cluster types
public enum RpcServiceSharing {
SHARED, DEDICATED
}
// Savepoint format types
public enum SavepointFormatType {
CANONICAL, NATIVE
}