The runtime module of Apache Flink that provides the core execution engine, scheduling, fault tolerance, checkpointing, and resource management capabilities for distributed stream processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-runtime@2.1.00
# Apache Flink Runtime
1
2
Apache Flink Runtime is the core execution engine that powers the Apache Flink distributed stream processing framework. It provides essential infrastructure for executing stream and batch processing applications at scale, including job scheduling, task coordination, fault tolerance through checkpointing, state management, and resource allocation across distributed environments.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink:flink-runtime
7
- **Package Type**: Maven (Java)
8
- **Language**: Java
9
- **Version**: 2.1.0
10
- **Installation**: Add dependency to Maven `pom.xml` or Gradle `build.gradle`
11
12
## Core Imports
13
14
```java
15
// Job graph construction
16
import org.apache.flink.runtime.jobgraph.JobGraph;
17
import org.apache.flink.runtime.jobgraph.JobVertex;
18
import org.apache.flink.runtime.jobgraph.JobVertexID;
19
20
// Execution graph and scheduling
21
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
22
import org.apache.flink.runtime.scheduler.SchedulerNG;
23
import org.apache.flink.runtime.scheduler.DefaultScheduler;
24
25
// State management and checkpointing
26
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
27
import org.apache.flink.runtime.state.KeyedStateBackend;
28
import org.apache.flink.runtime.state.OperatorStateBackend;
29
30
// Task execution
31
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
32
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
33
34
// Resource management
35
import org.apache.flink.runtime.resourcemanager.ResourceManager;
36
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
37
38
// Testing and development
39
import org.apache.flink.runtime.minicluster.MiniCluster;
40
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
41
42
// Job management and monitoring
43
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
44
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
45
```
46
47
## Basic Usage
48
49
```java
50
// Create a simple job graph
51
JobGraph jobGraph = new JobGraph("MyFlinkJob");
52
53
// Create and configure a job vertex
54
JobVertex sourceVertex = new JobVertex("Source");
55
sourceVertex.setParallelism(4);
56
sourceVertex.setInvokableClass(MySourceTask.class);
57
58
JobVertex mapVertex = new JobVertex("Map");
59
mapVertex.setParallelism(4);
60
mapVertex.setInvokableClass(MyMapTask.class);
61
62
// Connect vertices
63
mapVertex.connectNewDataSetAsInput(
64
sourceVertex,
65
DistributionPattern.ALL_TO_ALL,
66
ResultPartitionType.PIPELINED
67
);
68
69
// Add vertices to job graph
70
jobGraph.addVertex(sourceVertex);
71
jobGraph.addVertex(mapVertex);
72
73
// Configure checkpointing
74
JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(
75
Arrays.asList(sourceVertex.getID(), mapVertex.getID()),
76
Arrays.asList(sourceVertex.getID(), mapVertex.getID()),
77
Arrays.asList(sourceVertex.getID(), mapVertex.getID()),
78
new CheckpointCoordinatorConfiguration.Builder()
79
.setCheckpointInterval(5000L)
80
.setCheckpointTimeout(60000L)
81
.build(),
82
null
83
);
84
jobGraph.setSnapshotSettings(checkpointingSettings);
85
86
// For testing - execute job in local MiniCluster
87
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
88
.setNumTaskManagers(1)
89
.setNumSlotsPerTaskManager(4)
90
.build();
91
92
MiniCluster miniCluster = new MiniCluster(config);
93
try {
94
miniCluster.start();
95
CompletableFuture<JobSubmissionResult> submission = miniCluster.submitJob(jobGraph);
96
JobSubmissionResult result = submission.get();
97
System.out.println("Job submitted with ID: " + result.getJobID());
98
99
// Wait for job completion or trigger savepoint
100
CompletableFuture<JobResult> jobResult = miniCluster.requestJobResult(result.getJobID());
101
JobResult finalResult = jobResult.get();
102
System.out.println("Job finished with status: " + finalResult.getJobExecutionResult());
103
} finally {
104
miniCluster.closeAsync().get();
105
}
106
```
107
108
## Architecture
109
110
Apache Flink Runtime is built around several key architectural components:
111
112
- **Job Management Layer**: JobGraph and ExecutionGraph represent job structure at different abstraction levels
113
- **Scheduling Layer**: SchedulerNG implementations coordinate job execution and resource allocation
114
- **Execution Layer**: TaskExecutor instances run individual tasks with full isolation and fault tolerance
115
- **State Management**: Pluggable state backends with checkpointing for exactly-once processing guarantees
116
- **Resource Management**: ResourceManager handles cluster resources and TaskExecutor lifecycle
117
- **Network Layer**: High-throughput, low-latency data exchange between distributed tasks
118
- **High Availability**: Leader election and service discovery for fault-tolerant cluster coordination
119
120
The runtime operates as a distributed system where JobManager coordinates execution while TaskManagers execute the actual data processing tasks.
121
122
## Capabilities
123
124
### Job Graph Management
125
126
Core APIs for defining and configuring Flink jobs as directed acyclic graphs (DAGs) of operations.
127
128
```java { .api }
129
public class JobGraph implements ExecutionPlan {
130
public JobGraph(String jobName);
131
public JobGraph(JobID jobId, String jobName);
132
public void addVertex(JobVertex vertex);
133
public Iterable<JobVertex> getVertices();
134
public JobID getJobID();
135
public String getName();
136
public void setJobType(JobType jobType);
137
public JobType getJobType();
138
public JobVertex findVertexByID(JobVertexID id);
139
public void setJobConfiguration(Configuration jobConfiguration);
140
public Configuration getJobConfiguration();
141
public void setSnapshotSettings(JobCheckpointingSettings settings);
142
public JobCheckpointingSettings getCheckpointingSettings();
143
}
144
145
public class JobVertex implements Serializable {
146
public JobVertex(String name);
147
public JobVertex(String name, JobVertexID id);
148
public JobVertexID getId();
149
public String getName();
150
public void setParallelism(int parallelism);
151
public int getParallelism();
152
public void setInvokableClass(Class<? extends TaskInvokable> invokable);
153
public void connectNewDataSetAsInput(
154
JobVertex input,
155
DistributionPattern distPattern,
156
ResultPartitionType partitionType
157
);
158
}
159
```
160
161
[Job Graph Management](./job-graph.md)
162
163
### Execution and Scheduling
164
165
Advanced scheduling system for distributed job execution with support for batch and streaming workloads.
166
167
```java { .api }
168
public interface SchedulerNG extends GlobalFailureHandler, AutoCloseableAsync {
169
void startScheduling();
170
void cancel();
171
CompletableFuture<JobStatus> getJobTerminationFuture();
172
boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState);
173
CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);
174
CompletableFuture<String> stopWithSavepoint(
175
String targetDirectory,
176
boolean terminate,
177
SavepointFormatType formatType
178
);
179
}
180
181
public interface ExecutionGraph {
182
String getJobName();
183
JobID getJobID();
184
JobStatus getState();
185
ExecutionJobVertex getJobVertex(JobVertexID id);
186
Iterable<ExecutionJobVertex> getAllVertices();
187
CheckpointCoordinator getCheckpointCoordinator();
188
}
189
```
190
191
[Execution and Scheduling](./execution-scheduling.md)
192
193
### State Management and Checkpointing
194
195
Comprehensive state management with pluggable backends and distributed checkpointing for fault tolerance.
196
197
```java { .api }
198
public class CheckpointCoordinator {
199
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic);
200
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);
201
public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
202
@Nullable String targetLocation,
203
SavepointFormatType formatType
204
);
205
public void shutdown() throws Exception;
206
public CompletedCheckpointStore getCheckpointStore();
207
}
208
209
public interface KeyedStateBackend<K> extends KeyedState {
210
<T> InternalKvState<K, ?, T> createState(
211
StateDescriptor<?, T> stateDescriptor,
212
TypeSerializer<T> namespaceSerializer
213
);
214
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
215
long checkpointId,
216
long timestamp,
217
CheckpointStreamFactory streamFactory,
218
CheckpointOptions checkpointOptions
219
);
220
}
221
222
public interface OperatorStateBackend {
223
<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor);
224
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor);
225
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
226
long checkpointId,
227
long timestamp,
228
CheckpointStreamFactory factory,
229
CheckpointOptions checkpointOptions
230
);
231
}
232
```
233
234
[State Management and Checkpointing](./state-checkpointing.md)
235
236
### Task Execution
237
238
Distributed task execution engine with slot management and lifecycle coordination.
239
240
```java { .api }
241
public interface TaskExecutorGateway extends RpcGateway {
242
CompletableFuture<Acknowledge> requestSlot(
243
SlotID slotId,
244
JobID jobId,
245
AllocationID allocationId,
246
ResourceProfile resourceProfile,
247
String targetAddress,
248
ResourceManagerId resourceManagerId,
249
Duration timeout
250
);
251
252
CompletableFuture<Acknowledge> submitTask(
253
TaskDeploymentDescriptor tdd,
254
JobMasterId jobMasterId,
255
Duration timeout
256
);
257
258
CompletableFuture<Acknowledge> cancelTask(
259
ExecutionAttemptID executionAttemptID,
260
Duration timeout
261
);
262
263
CompletableFuture<Acknowledge> triggerCheckpoint(
264
ExecutionAttemptID executionAttemptID,
265
long checkpointId,
266
long checkpointTimestamp,
267
CheckpointOptions checkpointOptions,
268
Duration timeout
269
);
270
}
271
```
272
273
[Task Execution](./task-execution.md)
274
275
### Resource Management
276
277
Cluster resource allocation and TaskExecutor lifecycle management for different deployment environments.
278
279
```java { .api }
280
public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManagerId> {
281
CompletableFuture<RegistrationResponse> registerJobMaster(
282
JobMasterId jobMasterId,
283
ResourceID jobMasterResourceId,
284
String jobMasterAddress,
285
JobID jobId,
286
Duration timeout
287
);
288
289
CompletableFuture<RegistrationResponse> registerTaskExecutor(
290
String taskExecutorAddress,
291
ResourceID resourceId,
292
SlotReport slotReport,
293
ResourceProfile totalResourceProfile,
294
Duration timeout
295
);
296
297
CompletableFuture<Acknowledge> requestSlot(
298
JobMasterId jobMasterId,
299
SlotRequest slotRequest,
300
Duration timeout
301
);
302
}
303
304
public class WorkerResourceSpec {
305
public static WorkerResourceSpec fromTotalResourceProfile(
306
ResourceProfile totalResourceProfile,
307
MemorySize networkMemorySize
308
);
309
310
public ResourceProfile getTotalResourceProfile();
311
public MemorySize getTaskHeapSize();
312
public MemorySize getTaskOffHeapSize();
313
public MemorySize getNetworkMemSize();
314
public MemorySize getManagedMemSize();
315
}
316
```
317
318
[Resource Management](./resource-management.md)
319
320
### High Availability and Coordination
321
322
Leader election, service discovery, and coordination services for fault-tolerant distributed operation.
323
324
```java { .api }
325
public interface HighAvailabilityServices extends AutoCloseableAsync {
326
LeaderElectionService getResourceManagerLeaderElectionService();
327
LeaderElectionService getDispatcherLeaderElectionService();
328
LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
329
LeaderRetrievalService getResourceManagerLeaderRetriever();
330
LeaderRetrievalService getDispatcherLeaderRetriever();
331
CheckpointRecoveryFactory getCheckpointRecoveryFactory();
332
}
333
334
public interface LeaderElectionService {
335
void start(LeaderContender contender);
336
void stop();
337
void confirmLeadership(UUID leaderSessionID);
338
boolean hasLeadership(UUID leaderSessionId);
339
}
340
341
public interface LeaderRetrievalService {
342
void start(LeaderRetrievalListener listener);
343
void stop();
344
}
345
```
346
347
[High Availability and Coordination](./high-availability.md)
348
349
### Testing and Development
350
351
MiniCluster provides an embedded Flink cluster for local testing and development, allowing full Flink functionality in a single JVM.
352
353
```java { .api }
354
public class MiniCluster implements AutoCloseableAsync {
355
public MiniCluster(MiniClusterConfiguration configuration);
356
357
public void start() throws Exception;
358
public CompletableFuture<Void> closeAsync();
359
360
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph);
361
public CompletableFuture<JobResult> requestJobResult(JobID jobId);
362
public CompletableFuture<Acknowledge> cancelJob(JobID jobId);
363
364
public CompletableFuture<String> triggerSavepoint(
365
JobID jobId,
366
String targetDirectory,
367
SavepointFormatType formatType
368
);
369
370
public ClusterClient<MiniClusterJobClient> getClusterClient();
371
public String getRestAddress();
372
public int getRestPort();
373
}
374
375
public class MiniClusterConfiguration {
376
public static Builder newBuilder();
377
378
public static class Builder {
379
public Builder setNumTaskManagers(int numTaskManagers);
380
public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
381
public Builder setConfiguration(Configuration configuration);
382
public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing);
383
public MiniClusterConfiguration build();
384
}
385
}
386
```
387
388
### Job Management and Submission
389
390
DispatcherGateway provides the main interface for job submission, management, and monitoring in Flink clusters.
391
392
```java { .api }
393
public interface DispatcherGateway extends FencedRpcGateway<DispatcherId> {
394
CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Duration timeout);
395
396
CompletableFuture<Collection<JobStatusMessage>> listJobs(Duration timeout);
397
398
CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Duration timeout);
399
400
CompletableFuture<Acknowledge> cancelJob(JobID jobId, Duration timeout);
401
402
CompletableFuture<String> triggerSavepoint(
403
JobID jobId,
404
String targetDirectory,
405
SavepointFormatType formatType,
406
Duration timeout
407
);
408
409
CompletableFuture<String> stopWithSavepoint(
410
JobID jobId,
411
String targetDirectory,
412
SavepointFormatType formatType,
413
Duration timeout
414
);
415
416
CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Duration timeout);
417
418
CompletableFuture<Integer> getBlobServerPort(Duration timeout);
419
}
420
```
421
422
### Execution Monitoring
423
424
AccessExecutionGraph provides read-only access to execution graph information for monitoring and inspection.
425
426
```java { .api }
427
public interface AccessExecutionGraph {
428
String getJobName();
429
JobID getJobID();
430
JobStatus getState();
431
JobType getJobType();
432
433
long getStatusTimestamp(JobStatus status);
434
Throwable getFailureCause();
435
String getFailureCauseAsString();
436
437
Iterable<AccessExecutionJobVertex> getAllVertices();
438
AccessExecutionJobVertex getJobVertex(JobVertexID vertexId);
439
440
Map<JobVertexID, AccessExecutionJobVertex> getAllVerticesAsMap();
441
442
long getCheckpointCoordinatorCheckpointId();
443
CheckpointStatsSnapshot getCheckpointStatsSnapshot();
444
445
Configuration getJobConfiguration();
446
SerializedValue<ExecutionConfig> getSerializedExecutionConfig();
447
448
boolean isStoppable();
449
ArchivedExecutionGraph archive();
450
}
451
```
452
453
## Types
454
455
```java { .api }
456
// Core identifiers
457
public class JobID implements Serializable {
458
public static JobID generate();
459
public static JobID fromHexString(String hexString);
460
}
461
462
public class JobVertexID implements Serializable {
463
public JobVertexID();
464
public JobVertexID(byte[] bytes);
465
}
466
467
public class ExecutionAttemptID implements Serializable {
468
public ExecutionAttemptID();
469
public ExecutionAttemptID(ExecutionVertexID vertexId, int attemptNumber);
470
}
471
472
// Resource specifications
473
public class ResourceProfile implements Serializable {
474
public static final ResourceProfile ZERO;
475
public static final ResourceProfile ANY;
476
477
public static ResourceProfile fromResources(
478
double cpuCores,
479
MemorySize taskHeapMemory,
480
MemorySize taskOffHeapMemory,
481
MemorySize managedMemory,
482
MemorySize networkMemory
483
);
484
485
public CPUResource getCpuCores();
486
public MemorySize getTaskHeapMemory();
487
public MemorySize getTaskOffHeapMemory();
488
}
489
490
// Job configuration
491
public enum JobType {
492
BATCH, STREAMING
493
}
494
495
public enum DistributionPattern {
496
POINTWISE, ALL_TO_ALL
497
}
498
499
// Checkpoint types
500
public enum CheckpointType implements SnapshotType {
501
CHECKPOINT("Checkpoint"),
502
SAVEPOINT("Savepoint");
503
}
504
505
// Job execution results
506
public class JobSubmissionResult {
507
public JobID getJobID();
508
public boolean isJobSubmitted();
509
}
510
511
public class JobResult {
512
public JobID getJobID();
513
public ApplicationStatus getApplicationStatus();
514
public JobExecutionResult getJobExecutionResult();
515
public long getNetRuntime();
516
public Map<String, SerializedValue<Object>> getAccumulatorResults();
517
}
518
519
public enum JobStatus {
520
INITIALIZING, CREATED, RUNNING, FAILING, FAILED,
521
CANCELLING, CANCELED, FINISHED, RESTARTING, SUSPENDED
522
}
523
524
// Mini cluster types
525
public enum RpcServiceSharing {
526
SHARED, DEDICATED
527
}
528
529
// Savepoint format types
530
public enum SavepointFormatType {
531
CANONICAL, NATIVE
532
}
533
```