Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-runtime-2-10@1.3.0Apache Flink Runtime is the core execution engine component of the Apache Flink distributed stream processing framework. This library provides essential services for orchestrating distributed dataflow execution across clusters, including task scheduling and deployment, operator lifecycle management, inter-task communication, fault tolerance with exactly-once processing guarantees, custom memory management for efficient data processing, and state management capabilities.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.minicluster.MiniCluster;import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
// Create a mini cluster for local execution
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(4)
.build();
MiniCluster miniCluster = new MiniCluster(config);
miniCluster.start();
// Create and submit a job
JobGraph jobGraph = new JobGraph("My Flink Job");
// ... configure job graph with vertices and edges
// Execute job and wait for completion
JobExecutionResult result = miniCluster.runJobBlocking(jobGraph);
miniCluster.close();Apache Flink Runtime is built around several key architectural components:
Core APIs for job lifecycle management, execution planning, and monitoring. Essential for submitting and controlling Flink dataflow programs.
public class JobGraph implements Serializable {
public JobGraph(String jobName);
public JobGraph(JobID jobId, String jobName);
public JobGraph(JobVertex... vertices);
public void addVertex(JobVertex vertex);
public void addJar(Path jar);
public void addBlob(BlobKey key);
public JobID getJobID();
public String getName();
public Configuration getJobConfiguration();
public JobVertex[] getVerticesAsArray();
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException;
public int getNumberOfVertices();
public Iterable<JobVertex> getVertices();
public void setScheduleMode(ScheduleMode scheduleMode);
public ScheduleMode getScheduleMode();
public void setSessionTimeout(long sessionTimeout);
public long getSessionTimeout();
public SavepointRestoreSettings getSavepointRestoreSettings();
public void setSavepointRestoreSettings(SavepointRestoreSettings settings);
public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException;
public SerializedValue<ExecutionConfig> getSerializedExecutionConfig();
public JobCheckpointingSettings getCheckpointingSettings();
public void setSnapshotSettings(JobCheckpointingSettings settings);
public List<URL> getClasspaths();
public void setClasspaths(List<URL> paths);
}
public class JobClient {
public static ActorSystem startJobClientActorSystem(Configuration config) throws IOException;
public static JobListeningContext submitJob(ActorSystem actorSystem, Configuration config, HighAvailabilityServices highAvailabilityServices, ActorGateway jobManagerGateway, JobGraph jobGraph, Time timeout, boolean sysoutLogUpdates, ClassLoader userCodeClassLoader) throws JobExecutionException;
public static JobListeningContext attachToRunningJob(JobID jobID, ActorGateway jobManagerGateway, Configuration configuration, ActorSystem actorSystem, HighAvailabilityServices highAvailabilityServices, Time timeout, boolean sysoutLogUpdates);
public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException;
public static JobExecutionResult submitJobAndWait(ActorSystem actorSystem, Configuration config, HighAvailabilityServices highAvailabilityServices, ActorGateway jobManagerGateway, JobGraph jobGraph, Time timeout, boolean sysoutLogUpdates, ClassLoader userCodeClassLoader) throws JobExecutionException;
public static void submitJobDetached(ActorGateway jobManagerGateway, Configuration config, JobGraph jobGraph, Time timeout, ClassLoader userCodeClassLoader) throws JobExecutionException;
}Pluggable state backend system with checkpointing mechanisms for fault tolerance and exactly-once processing guarantees.
public interface StateBackend extends Serializable {
<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry
) throws Exception;
}
public interface FunctionInitializationContext {
boolean isRestored();
OperatorStateStore getOperatorStateStore();
KeyedStateStore getKeyedStateStore();
}Framework for implementing and executing user-defined tasks with resource management and environment access.
public abstract class AbstractInvokable {
public abstract void invoke() throws Exception;
public void cancel() throws Exception;
public final Environment getEnvironment();
}
public interface Environment {
JobID getJobID();
JobVertexID getJobVertexId();
ExecutionAttemptID getExecutionId();
TaskInfo getTaskInfo();
Configuration getTaskConfiguration();
}Embedded Flink cluster implementation for testing, development, and local execution scenarios.
public class MiniCluster {
public MiniCluster();
public MiniCluster(MiniClusterConfiguration configuration);
public void start() throws Exception;
public void runDetached(JobGraph job) throws JobExecutionException;
public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException;
public void close();
}
public class MiniClusterConfiguration {
public static class Builder {
public Builder setNumTaskManagers(int numTaskManagers);
public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
public MiniClusterConfiguration build();
}
}Cluster coordination and high availability infrastructure for leader election and distributed storage.
public interface HighAvailabilityServices extends AutoCloseable {
LeaderRetrievalService getResourceManagerLeaderRetriever();
LeaderRetrievalService getDispatcherLeaderRetriever();
LeaderElectionService getResourceManagerLeaderElectionService();
LeaderElectionService getDispatcherLeaderElectionService();
}Remote procedure call infrastructure for cluster-wide communication and distributed coordination.
public interface RpcService extends AutoCloseable {
<C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);
<F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
String address, F fencingToken, Class<C> clazz);
void stopServer(RpcEndpoint endpoint);
}
public abstract class RpcEndpoint {
protected RpcEndpoint(RpcService rpcService);
public void start() throws Exception;
public CompletableFuture<Void> closeAsync();
}Comprehensive metrics collection, registration, and reporting infrastructure for monitoring runtime behavior.
public class MetricRegistry implements MetricRegistryImpl {
public void register(Metric metric, String metricName, AbstractMetricGroup group);
public void unregister(Metric metric, String metricName, AbstractMetricGroup group);
public void startReporters(Configuration config);
}
public class MetricRegistryConfiguration {
public static MetricRegistryConfiguration fromConfiguration(Configuration config);
public long getQueryServiceUpdateInterval();
public int getQueryServicePort();
}Read-only access interfaces for execution graph inspection, monitoring, and runtime introspection.
public interface AccessExecutionGraph {
JobID getJobID();
String getJobName();
JobStatus getState();
long getStatusTimestamp(JobStatus status);
Iterable<AccessExecutionJobVertex> getVerticesTopologically();
}
public interface AccessExecutionVertex {
AccessExecutionJobVertex getJobVertex();
int getParallelSubtaskIndex();
ExecutionState getExecutionState();
AccessExecution getCurrentExecutionAttempt();
}Network communication patterns and data exchange mechanisms for inter-task communication.
public enum DataExchangeMode {
PIPELINED, // Streamed data exchange with back-pressure
BATCH, // Decoupled data exchange with full result materialization
PIPELINE_WITH_BATCH_FALLBACK; // Pipelined with batch fallback for recovery
public static DataExchangeMode getForForwardExchange(ExecutionMode mode);
public static DataExchangeMode getForShuffleOrBroadcast(ExecutionMode mode);
public static DataExchangeMode getPipelineBreakingExchange(ExecutionMode mode);
}
public enum ResultPartitionType {
BLOCKING(true, false, false),
PIPELINED(false, true, false),
PIPELINED_BOUNDED(false, true, true);
public boolean isBlocking();
public boolean isPipelined();
public boolean isBounded();
}Scala-based message definitions for actor-based communication within the Flink runtime cluster.
object JobManagerMessages {
case class SubmitJob(jobGraph: JobGraph, listeningBehaviour: ListeningBehaviour)
case class CancelJob(jobID: JobID)
case class RequestJobStatus(jobID: JobID)
case class JobStatusResponse(jobID: JobID, status: JobStatus)
}
object TaskManagerMessages {
case class SubmitTask(tdd: TaskDeploymentDescriptor)
case class CancelTask(executionAttemptID: ExecutionAttemptID)
case class TaskInFinalState(executionAttemptID: ExecutionAttemptID)
}public class JobVertexID implements Comparable<JobVertexID>, Serializable {
public JobVertexID();
public JobVertexID(byte[] bytes);
public static JobVertexID fromHexString(String hexString);
}
public enum JobStatus {
CREATED, RUNNING, FAILING, FAILED, CANCELLING, CANCELED, FINISHED, RESTARTING, SUSPENDED;
public boolean isGloballyTerminalState();
public boolean isTerminalState();
}
public enum ExecutionState {
CREATED, SCHEDULED, DEPLOYING, RUNNING, FINISHED, CANCELING, CANCELED, FAILED;
public boolean isTerminal();
}
public class JobExecutionException extends FlinkException {
public JobExecutionException(JobID jobId, String msg);
public JobExecutionException(JobID jobId, String msg, Throwable cause);
public JobID getJobID();
}