0
# Apache Flink Runtime
1
2
Apache Flink Runtime is the core execution engine component of the Apache Flink distributed stream processing framework. This library provides essential services for orchestrating distributed dataflow execution across clusters, including task scheduling and deployment, operator lifecycle management, inter-task communication, fault tolerance with exactly-once processing guarantees, custom memory management for efficient data processing, and state management capabilities.
3
4
## Package Information
5
6
- **Package Name**: flink-runtime_2.10
7
- **Package Type**: maven
8
- **Language**: Java/Scala
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-runtime_2.10</artifactId>
14
<version>1.3.3</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```java
21
import org.apache.flink.runtime.jobgraph.JobGraph;
22
import org.apache.flink.runtime.client.JobClient;
23
import org.apache.flink.runtime.state.StateBackend;
24
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
25
import org.apache.flink.runtime.minicluster.MiniCluster;
26
```
27
28
## Basic Usage
29
30
```java
31
import org.apache.flink.runtime.jobgraph.JobGraph;
32
import org.apache.flink.api.common.JobExecutionResult;
33
import org.apache.flink.runtime.minicluster.MiniCluster;
34
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
35
36
// Create a mini cluster for local execution
37
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
38
.setNumTaskManagers(1)
39
.setNumSlotsPerTaskManager(4)
40
.build();
41
42
MiniCluster miniCluster = new MiniCluster(config);
43
miniCluster.start();
44
45
// Create and submit a job
46
JobGraph jobGraph = new JobGraph("My Flink Job");
47
// ... configure job graph with vertices and edges
48
49
// Execute job and wait for completion
50
JobExecutionResult result = miniCluster.runJobBlocking(jobGraph);
51
52
miniCluster.close();
53
```
54
55
## Architecture
56
57
Apache Flink Runtime is built around several key architectural components:
58
59
- **Job Execution Engine**: Core job lifecycle management with JobGraph representation and client APIs
60
- **Task Execution Framework**: Base classes and interfaces for implementing user-defined tasks
61
- **State Management System**: Pluggable state backends with checkpointing and recovery mechanisms
62
- **Distributed Communication**: RPC framework for cluster-wide communication and message passing
63
- **Cluster Coordination**: High availability services for leader election and distributed coordination
64
- **Resource Management**: Task scheduling, memory management, and resource allocation
65
- **Fault Tolerance**: Checkpointing coordinator and recovery mechanisms for exactly-once processing
66
- **Monitoring & Metrics**: Comprehensive metrics collection and reporting system
67
68
## Capabilities
69
70
### Job Execution and Management
71
72
Core APIs for job lifecycle management, execution planning, and monitoring. Essential for submitting and controlling Flink dataflow programs.
73
74
```java { .api }
75
public class JobGraph implements Serializable {
76
public JobGraph(String jobName);
77
public JobGraph(JobID jobId, String jobName);
78
public JobGraph(JobVertex... vertices);
79
80
public void addVertex(JobVertex vertex);
81
public void addJar(Path jar);
82
public void addBlob(BlobKey key);
83
84
public JobID getJobID();
85
public String getName();
86
public Configuration getJobConfiguration();
87
public JobVertex[] getVerticesAsArray();
88
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException;
89
public int getNumberOfVertices();
90
public Iterable<JobVertex> getVertices();
91
92
public void setScheduleMode(ScheduleMode scheduleMode);
93
public ScheduleMode getScheduleMode();
94
95
public void setSessionTimeout(long sessionTimeout);
96
public long getSessionTimeout();
97
98
public SavepointRestoreSettings getSavepointRestoreSettings();
99
public void setSavepointRestoreSettings(SavepointRestoreSettings settings);
100
101
public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException;
102
public SerializedValue<ExecutionConfig> getSerializedExecutionConfig();
103
104
public JobCheckpointingSettings getCheckpointingSettings();
105
public void setSnapshotSettings(JobCheckpointingSettings settings);
106
107
public List<URL> getClasspaths();
108
public void setClasspaths(List<URL> paths);
109
}
110
111
public class JobClient {
112
public static ActorSystem startJobClientActorSystem(Configuration config) throws IOException;
113
public static JobListeningContext submitJob(ActorSystem actorSystem, Configuration config, HighAvailabilityServices highAvailabilityServices, ActorGateway jobManagerGateway, JobGraph jobGraph, Time timeout, boolean sysoutLogUpdates, ClassLoader userCodeClassLoader) throws JobExecutionException;
114
public static JobListeningContext attachToRunningJob(JobID jobID, ActorGateway jobManagerGateway, Configuration configuration, ActorSystem actorSystem, HighAvailabilityServices highAvailabilityServices, Time timeout, boolean sysoutLogUpdates);
115
public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException;
116
public static JobExecutionResult submitJobAndWait(ActorSystem actorSystem, Configuration config, HighAvailabilityServices highAvailabilityServices, ActorGateway jobManagerGateway, JobGraph jobGraph, Time timeout, boolean sysoutLogUpdates, ClassLoader userCodeClassLoader) throws JobExecutionException;
117
public static void submitJobDetached(ActorGateway jobManagerGateway, Configuration config, JobGraph jobGraph, Time timeout, ClassLoader userCodeClassLoader) throws JobExecutionException;
118
}
119
```
120
121
[Job Management](./job-management.md)
122
123
### State Management and Checkpointing
124
125
Pluggable state backend system with checkpointing mechanisms for fault tolerance and exactly-once processing guarantees.
126
127
```java { .api }
128
public interface StateBackend extends Serializable {
129
<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
130
Environment env,
131
JobID jobID,
132
String operatorIdentifier,
133
TypeSerializer<K> keySerializer,
134
int numberOfKeyGroups,
135
KeyGroupRange keyGroupRange,
136
TaskKvStateRegistry kvStateRegistry
137
) throws Exception;
138
}
139
140
public interface FunctionInitializationContext {
141
boolean isRestored();
142
OperatorStateStore getOperatorStateStore();
143
KeyedStateStore getKeyedStateStore();
144
}
145
```
146
147
[State Management](./state-management.md)
148
149
### Task Execution Framework
150
151
Framework for implementing and executing user-defined tasks with resource management and environment access.
152
153
```java { .api }
154
public abstract class AbstractInvokable {
155
public abstract void invoke() throws Exception;
156
public void cancel() throws Exception;
157
public final Environment getEnvironment();
158
}
159
160
public interface Environment {
161
JobID getJobID();
162
JobVertexID getJobVertexId();
163
ExecutionAttemptID getExecutionId();
164
TaskInfo getTaskInfo();
165
Configuration getTaskConfiguration();
166
}
167
```
168
169
[Task Execution](./task-execution.md)
170
171
### Mini Cluster (Testing/Embedded)
172
173
Embedded Flink cluster implementation for testing, development, and local execution scenarios.
174
175
```java { .api }
176
public class MiniCluster {
177
public MiniCluster();
178
public MiniCluster(MiniClusterConfiguration configuration);
179
public void start() throws Exception;
180
public void runDetached(JobGraph job) throws JobExecutionException;
181
public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException;
182
public void close();
183
}
184
185
public class MiniClusterConfiguration {
186
public static class Builder {
187
public Builder setNumTaskManagers(int numTaskManagers);
188
public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
189
public MiniClusterConfiguration build();
190
}
191
}
192
```
193
194
[Mini Cluster](./mini-cluster.md)
195
196
### High Availability Services
197
198
Cluster coordination and high availability infrastructure for leader election and distributed storage.
199
200
```java { .api }
201
public interface HighAvailabilityServices extends AutoCloseable {
202
LeaderRetrievalService getResourceManagerLeaderRetriever();
203
LeaderRetrievalService getDispatcherLeaderRetriever();
204
LeaderElectionService getResourceManagerLeaderElectionService();
205
LeaderElectionService getDispatcherLeaderElectionService();
206
}
207
```
208
209
[High Availability](./high-availability.md)
210
211
### RPC Framework
212
213
Remote procedure call infrastructure for cluster-wide communication and distributed coordination.
214
215
```java { .api }
216
public interface RpcService extends AutoCloseable {
217
<C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);
218
<F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
219
String address, F fencingToken, Class<C> clazz);
220
void stopServer(RpcEndpoint endpoint);
221
}
222
223
public abstract class RpcEndpoint {
224
protected RpcEndpoint(RpcService rpcService);
225
public void start() throws Exception;
226
public CompletableFuture<Void> closeAsync();
227
}
228
```
229
230
[RPC Framework](./rpc-framework.md)
231
232
### Metrics System
233
234
Comprehensive metrics collection, registration, and reporting infrastructure for monitoring runtime behavior.
235
236
```java { .api }
237
public class MetricRegistry implements MetricRegistryImpl {
238
public void register(Metric metric, String metricName, AbstractMetricGroup group);
239
public void unregister(Metric metric, String metricName, AbstractMetricGroup group);
240
public void startReporters(Configuration config);
241
}
242
243
public class MetricRegistryConfiguration {
244
public static MetricRegistryConfiguration fromConfiguration(Configuration config);
245
public long getQueryServiceUpdateInterval();
246
public int getQueryServicePort();
247
}
248
```
249
250
[Metrics System](./metrics.md)
251
252
### Execution Graph Access
253
254
Read-only access interfaces for execution graph inspection, monitoring, and runtime introspection.
255
256
```java { .api }
257
public interface AccessExecutionGraph {
258
JobID getJobID();
259
String getJobName();
260
JobStatus getState();
261
long getStatusTimestamp(JobStatus status);
262
Iterable<AccessExecutionJobVertex> getVerticesTopologically();
263
}
264
265
public interface AccessExecutionVertex {
266
AccessExecutionJobVertex getJobVertex();
267
int getParallelSubtaskIndex();
268
ExecutionState getExecutionState();
269
AccessExecution getCurrentExecutionAttempt();
270
}
271
```
272
273
[Execution Graph](./execution-graph.md)
274
275
### Data Exchange and Networking
276
277
Network communication patterns and data exchange mechanisms for inter-task communication.
278
279
```java { .api }
280
public enum DataExchangeMode {
281
PIPELINED, // Streamed data exchange with back-pressure
282
BATCH, // Decoupled data exchange with full result materialization
283
PIPELINE_WITH_BATCH_FALLBACK; // Pipelined with batch fallback for recovery
284
285
public static DataExchangeMode getForForwardExchange(ExecutionMode mode);
286
public static DataExchangeMode getForShuffleOrBroadcast(ExecutionMode mode);
287
public static DataExchangeMode getPipelineBreakingExchange(ExecutionMode mode);
288
}
289
290
public enum ResultPartitionType {
291
BLOCKING(true, false, false),
292
PIPELINED(false, true, false),
293
PIPELINED_BOUNDED(false, true, true);
294
295
public boolean isBlocking();
296
public boolean isPipelined();
297
public boolean isBounded();
298
}
299
```
300
301
[Data Exchange](./data-exchange.md)
302
303
### Message Passing (Scala APIs)
304
305
Scala-based message definitions for actor-based communication within the Flink runtime cluster.
306
307
```scala { .api }
308
object JobManagerMessages {
309
case class SubmitJob(jobGraph: JobGraph, listeningBehaviour: ListeningBehaviour)
310
case class CancelJob(jobID: JobID)
311
case class RequestJobStatus(jobID: JobID)
312
case class JobStatusResponse(jobID: JobID, status: JobStatus)
313
}
314
315
object TaskManagerMessages {
316
case class SubmitTask(tdd: TaskDeploymentDescriptor)
317
case class CancelTask(executionAttemptID: ExecutionAttemptID)
318
case class TaskInFinalState(executionAttemptID: ExecutionAttemptID)
319
}
320
```
321
322
[Message Passing](./message-passing.md)
323
324
## Common Types
325
326
```java { .api }
327
public class JobVertexID implements Comparable<JobVertexID>, Serializable {
328
public JobVertexID();
329
public JobVertexID(byte[] bytes);
330
public static JobVertexID fromHexString(String hexString);
331
}
332
333
public enum JobStatus {
334
CREATED, RUNNING, FAILING, FAILED, CANCELLING, CANCELED, FINISHED, RESTARTING, SUSPENDED;
335
336
public boolean isGloballyTerminalState();
337
public boolean isTerminalState();
338
}
339
340
public enum ExecutionState {
341
CREATED, SCHEDULED, DEPLOYING, RUNNING, FINISHED, CANCELING, CANCELED, FAILED;
342
343
public boolean isTerminal();
344
}
345
346
public class JobExecutionException extends FlinkException {
347
public JobExecutionException(JobID jobId, String msg);
348
public JobExecutionException(JobID jobId, String msg, Throwable cause);
349
public JobID getJobID();
350
}
351
```