or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

execution-scheduling.mdhigh-availability.mdindex.mdjob-graph.mdresource-management.mdstate-checkpointing.mdtask-execution.md
tile.json

tessl/maven-org-apache-flink--flink-runtime

The runtime module of Apache Flink that provides the core execution engine, scheduling, fault tolerance, checkpointing, and resource management capabilities for distributed stream processing applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-runtime@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-runtime@2.1.0

index.mddocs/

Apache Flink Runtime

Apache Flink Runtime is the core execution engine that powers the Apache Flink distributed stream processing framework. It provides essential infrastructure for executing stream and batch processing applications at scale, including job scheduling, task coordination, fault tolerance through checkpointing, state management, and resource allocation across distributed environments.

Package Information

  • Package Name: org.apache.flink:flink-runtime
  • Package Type: Maven (Java)
  • Language: Java
  • Version: 2.1.0
  • Installation: Add dependency to Maven pom.xml or Gradle build.gradle

Core Imports

// Job graph construction
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;

// Execution graph and scheduling
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.DefaultScheduler;

// State management and checkpointing
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;

// Task execution
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;

// Resource management
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;

// Testing and development
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;

// Job management and monitoring
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;

Basic Usage

// Create a simple job graph
JobGraph jobGraph = new JobGraph("MyFlinkJob");

// Create and configure a job vertex
JobVertex sourceVertex = new JobVertex("Source");
sourceVertex.setParallelism(4);
sourceVertex.setInvokableClass(MySourceTask.class);

JobVertex mapVertex = new JobVertex("Map");
mapVertex.setParallelism(4);
mapVertex.setInvokableClass(MyMapTask.class);

// Connect vertices
mapVertex.connectNewDataSetAsInput(
    sourceVertex, 
    DistributionPattern.ALL_TO_ALL,
    ResultPartitionType.PIPELINED
);

// Add vertices to job graph
jobGraph.addVertex(sourceVertex);
jobGraph.addVertex(mapVertex);

// Configure checkpointing
JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(
    Arrays.asList(sourceVertex.getID(), mapVertex.getID()),
    Arrays.asList(sourceVertex.getID(), mapVertex.getID()),
    Arrays.asList(sourceVertex.getID(), mapVertex.getID()),
    new CheckpointCoordinatorConfiguration.Builder()
        .setCheckpointInterval(5000L)
        .setCheckpointTimeout(60000L)
        .build(),
    null
);
jobGraph.setSnapshotSettings(checkpointingSettings);

// For testing - execute job in local MiniCluster
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
    .setNumTaskManagers(1)
    .setNumSlotsPerTaskManager(4)
    .build();

MiniCluster miniCluster = new MiniCluster(config);
try {
    miniCluster.start();
    CompletableFuture<JobSubmissionResult> submission = miniCluster.submitJob(jobGraph);
    JobSubmissionResult result = submission.get();
    System.out.println("Job submitted with ID: " + result.getJobID());
    
    // Wait for job completion or trigger savepoint
    CompletableFuture<JobResult> jobResult = miniCluster.requestJobResult(result.getJobID());
    JobResult finalResult = jobResult.get();
    System.out.println("Job finished with status: " + finalResult.getJobExecutionResult());
} finally {
    miniCluster.closeAsync().get();
}

Architecture

Apache Flink Runtime is built around several key architectural components:

  • Job Management Layer: JobGraph and ExecutionGraph represent job structure at different abstraction levels
  • Scheduling Layer: SchedulerNG implementations coordinate job execution and resource allocation
  • Execution Layer: TaskExecutor instances run individual tasks with full isolation and fault tolerance
  • State Management: Pluggable state backends with checkpointing for exactly-once processing guarantees
  • Resource Management: ResourceManager handles cluster resources and TaskExecutor lifecycle
  • Network Layer: High-throughput, low-latency data exchange between distributed tasks
  • High Availability: Leader election and service discovery for fault-tolerant cluster coordination

The runtime operates as a distributed system where JobManager coordinates execution while TaskManagers execute the actual data processing tasks.

Capabilities

Job Graph Management

Core APIs for defining and configuring Flink jobs as directed acyclic graphs (DAGs) of operations.

public class JobGraph implements ExecutionPlan {
    public JobGraph(String jobName);
    public JobGraph(JobID jobId, String jobName);
    public void addVertex(JobVertex vertex);
    public Iterable<JobVertex> getVertices();
    public JobID getJobID();
    public String getName();
    public void setJobType(JobType jobType);
    public JobType getJobType();
    public JobVertex findVertexByID(JobVertexID id);
    public void setJobConfiguration(Configuration jobConfiguration);
    public Configuration getJobConfiguration();
    public void setSnapshotSettings(JobCheckpointingSettings settings);
    public JobCheckpointingSettings getCheckpointingSettings();
}

public class JobVertex implements Serializable {
    public JobVertex(String name);
    public JobVertex(String name, JobVertexID id);
    public JobVertexID getId();
    public String getName();
    public void setParallelism(int parallelism);
    public int getParallelism();
    public void setInvokableClass(Class<? extends TaskInvokable> invokable);
    public void connectNewDataSetAsInput(
        JobVertex input, 
        DistributionPattern distPattern, 
        ResultPartitionType partitionType
    );
}

Job Graph Management

Execution and Scheduling

Advanced scheduling system for distributed job execution with support for batch and streaming workloads.

public interface SchedulerNG extends GlobalFailureHandler, AutoCloseableAsync {
    void startScheduling();
    void cancel();
    CompletableFuture<JobStatus> getJobTerminationFuture();
    boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState);
    CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);
    CompletableFuture<String> stopWithSavepoint(
        String targetDirectory, 
        boolean terminate,
        SavepointFormatType formatType
    );
}

public interface ExecutionGraph {
    String getJobName();
    JobID getJobID();
    JobStatus getState();
    ExecutionJobVertex getJobVertex(JobVertexID id);
    Iterable<ExecutionJobVertex> getAllVertices();
    CheckpointCoordinator getCheckpointCoordinator();
}

Execution and Scheduling

State Management and Checkpointing

Comprehensive state management with pluggable backends and distributed checkpointing for fault tolerance.

public class CheckpointCoordinator {
    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic);
    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);
    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
        @Nullable String targetLocation,
        SavepointFormatType formatType
    );
    public void shutdown() throws Exception;
    public CompletedCheckpointStore getCheckpointStore();
}

public interface KeyedStateBackend<K> extends KeyedState {
    <T> InternalKvState<K, ?, T> createState(
        StateDescriptor<?, T> stateDescriptor,
        TypeSerializer<T> namespaceSerializer
    );
    RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
        long checkpointId,
        long timestamp,
        CheckpointStreamFactory streamFactory,
        CheckpointOptions checkpointOptions
    );
}

public interface OperatorStateBackend {
    <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor);
    <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor);
    RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
        long checkpointId,
        long timestamp,
        CheckpointStreamFactory factory,
        CheckpointOptions checkpointOptions
    );
}

State Management and Checkpointing

Task Execution

Distributed task execution engine with slot management and lifecycle coordination.

public interface TaskExecutorGateway extends RpcGateway {
    CompletableFuture<Acknowledge> requestSlot(
        SlotID slotId,
        JobID jobId,
        AllocationID allocationId,
        ResourceProfile resourceProfile,
        String targetAddress,
        ResourceManagerId resourceManagerId,
        Duration timeout
    );
    
    CompletableFuture<Acknowledge> submitTask(
        TaskDeploymentDescriptor tdd,
        JobMasterId jobMasterId,
        Duration timeout
    );
    
    CompletableFuture<Acknowledge> cancelTask(
        ExecutionAttemptID executionAttemptID,
        Duration timeout
    );
    
    CompletableFuture<Acknowledge> triggerCheckpoint(
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        long checkpointTimestamp,
        CheckpointOptions checkpointOptions,
        Duration timeout
    );
}

Task Execution

Resource Management

Cluster resource allocation and TaskExecutor lifecycle management for different deployment environments.

public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManagerId> {
    CompletableFuture<RegistrationResponse> registerJobMaster(
        JobMasterId jobMasterId,
        ResourceID jobMasterResourceId,
        String jobMasterAddress,
        JobID jobId,
        Duration timeout
    );
    
    CompletableFuture<RegistrationResponse> registerTaskExecutor(
        String taskExecutorAddress,
        ResourceID resourceId,
        SlotReport slotReport,
        ResourceProfile totalResourceProfile,
        Duration timeout
    );
    
    CompletableFuture<Acknowledge> requestSlot(
        JobMasterId jobMasterId,
        SlotRequest slotRequest,
        Duration timeout
    );
}

public class WorkerResourceSpec {
    public static WorkerResourceSpec fromTotalResourceProfile(
        ResourceProfile totalResourceProfile,
        MemorySize networkMemorySize
    );
    
    public ResourceProfile getTotalResourceProfile();
    public MemorySize getTaskHeapSize();
    public MemorySize getTaskOffHeapSize();
    public MemorySize getNetworkMemSize();
    public MemorySize getManagedMemSize();
}

Resource Management

High Availability and Coordination

Leader election, service discovery, and coordination services for fault-tolerant distributed operation.

public interface HighAvailabilityServices extends AutoCloseableAsync {
    LeaderElectionService getResourceManagerLeaderElectionService();
    LeaderElectionService getDispatcherLeaderElectionService();
    LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
    LeaderRetrievalService getResourceManagerLeaderRetriever();
    LeaderRetrievalService getDispatcherLeaderRetriever();
    CheckpointRecoveryFactory getCheckpointRecoveryFactory();
}

public interface LeaderElectionService {
    void start(LeaderContender contender);
    void stop();
    void confirmLeadership(UUID leaderSessionID);
    boolean hasLeadership(UUID leaderSessionId);
}

public interface LeaderRetrievalService {
    void start(LeaderRetrievalListener listener);
    void stop();
}

High Availability and Coordination

Testing and Development

MiniCluster provides an embedded Flink cluster for local testing and development, allowing full Flink functionality in a single JVM.

public class MiniCluster implements AutoCloseableAsync {
    public MiniCluster(MiniClusterConfiguration configuration);
    
    public void start() throws Exception;
    public CompletableFuture<Void> closeAsync();
    
    public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph);
    public CompletableFuture<JobResult> requestJobResult(JobID jobId);
    public CompletableFuture<Acknowledge> cancelJob(JobID jobId);
    
    public CompletableFuture<String> triggerSavepoint(
        JobID jobId,
        String targetDirectory,
        SavepointFormatType formatType
    );
    
    public ClusterClient<MiniClusterJobClient> getClusterClient();
    public String getRestAddress();
    public int getRestPort();
}

public class MiniClusterConfiguration {
    public static Builder newBuilder();
    
    public static class Builder {
        public Builder setNumTaskManagers(int numTaskManagers);
        public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
        public Builder setConfiguration(Configuration configuration);
        public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing);
        public MiniClusterConfiguration build();
    }
}

Job Management and Submission

DispatcherGateway provides the main interface for job submission, management, and monitoring in Flink clusters.

public interface DispatcherGateway extends FencedRpcGateway<DispatcherId> {
    CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Duration timeout);
    
    CompletableFuture<Collection<JobStatusMessage>> listJobs(Duration timeout);
    
    CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Duration timeout);
    
    CompletableFuture<Acknowledge> cancelJob(JobID jobId, Duration timeout);
    
    CompletableFuture<String> triggerSavepoint(
        JobID jobId,
        String targetDirectory,
        SavepointFormatType formatType,
        Duration timeout
    );
    
    CompletableFuture<String> stopWithSavepoint(
        JobID jobId,
        String targetDirectory,
        SavepointFormatType formatType,
        Duration timeout
    );
    
    CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Duration timeout);
    
    CompletableFuture<Integer> getBlobServerPort(Duration timeout);
}

Execution Monitoring

AccessExecutionGraph provides read-only access to execution graph information for monitoring and inspection.

public interface AccessExecutionGraph {
    String getJobName();
    JobID getJobID();
    JobStatus getState();
    JobType getJobType();
    
    long getStatusTimestamp(JobStatus status);
    Throwable getFailureCause();
    String getFailureCauseAsString();
    
    Iterable<AccessExecutionJobVertex> getAllVertices();
    AccessExecutionJobVertex getJobVertex(JobVertexID vertexId);
    
    Map<JobVertexID, AccessExecutionJobVertex> getAllVerticesAsMap();
    
    long getCheckpointCoordinatorCheckpointId();
    CheckpointStatsSnapshot getCheckpointStatsSnapshot();
    
    Configuration getJobConfiguration();
    SerializedValue<ExecutionConfig> getSerializedExecutionConfig();
    
    boolean isStoppable();
    ArchivedExecutionGraph archive();
}

Types

// Core identifiers
public class JobID implements Serializable {
    public static JobID generate();
    public static JobID fromHexString(String hexString);
}

public class JobVertexID implements Serializable {
    public JobVertexID();
    public JobVertexID(byte[] bytes);
}

public class ExecutionAttemptID implements Serializable {
    public ExecutionAttemptID();
    public ExecutionAttemptID(ExecutionVertexID vertexId, int attemptNumber);
}

// Resource specifications
public class ResourceProfile implements Serializable {
    public static final ResourceProfile ZERO;
    public static final ResourceProfile ANY;
    
    public static ResourceProfile fromResources(
        double cpuCores,
        MemorySize taskHeapMemory,
        MemorySize taskOffHeapMemory,
        MemorySize managedMemory,
        MemorySize networkMemory
    );
    
    public CPUResource getCpuCores();
    public MemorySize getTaskHeapMemory();
    public MemorySize getTaskOffHeapMemory();
}

// Job configuration
public enum JobType {
    BATCH, STREAMING
}

public enum DistributionPattern {
    POINTWISE, ALL_TO_ALL
}

// Checkpoint types
public enum CheckpointType implements SnapshotType {
    CHECKPOINT("Checkpoint"),
    SAVEPOINT("Savepoint");
}

// Job execution results
public class JobSubmissionResult {
    public JobID getJobID();
    public boolean isJobSubmitted();
}

public class JobResult {
    public JobID getJobID();
    public ApplicationStatus getApplicationStatus();
    public JobExecutionResult getJobExecutionResult();
    public long getNetRuntime();
    public Map<String, SerializedValue<Object>> getAccumulatorResults();
}

public enum JobStatus {
    INITIALIZING, CREATED, RUNNING, FAILING, FAILED, 
    CANCELLING, CANCELED, FINISHED, RESTARTING, SUSPENDED
}

// Mini cluster types
public enum RpcServiceSharing {
    SHARED, DEDICATED
}

// Savepoint format types
public enum SavepointFormatType {
    CANONICAL, NATIVE
}