or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

execution-scheduling.mdhigh-availability.mdindex.mdjob-graph.mdresource-management.mdstate-checkpointing.mdtask-execution.md

index.mddocs/

0

# Apache Flink Runtime

1

2

Apache Flink Runtime is the core execution engine that powers the Apache Flink distributed stream processing framework. It provides essential infrastructure for executing stream and batch processing applications at scale, including job scheduling, task coordination, fault tolerance through checkpointing, state management, and resource allocation across distributed environments.

3

4

## Package Information

5

6

- **Package Name**: org.apache.flink:flink-runtime

7

- **Package Type**: Maven (Java)

8

- **Language**: Java

9

- **Version**: 2.1.0

10

- **Installation**: Add dependency to Maven `pom.xml` or Gradle `build.gradle`

11

12

## Core Imports

13

14

```java

15

// Job graph construction

16

import org.apache.flink.runtime.jobgraph.JobGraph;

17

import org.apache.flink.runtime.jobgraph.JobVertex;

18

import org.apache.flink.runtime.jobgraph.JobVertexID;

19

20

// Execution graph and scheduling

21

import org.apache.flink.runtime.executiongraph.ExecutionGraph;

22

import org.apache.flink.runtime.scheduler.SchedulerNG;

23

import org.apache.flink.runtime.scheduler.DefaultScheduler;

24

25

// State management and checkpointing

26

import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;

27

import org.apache.flink.runtime.state.KeyedStateBackend;

28

import org.apache.flink.runtime.state.OperatorStateBackend;

29

30

// Task execution

31

import org.apache.flink.runtime.taskexecutor.TaskExecutor;

32

import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;

33

34

// Resource management

35

import org.apache.flink.runtime.resourcemanager.ResourceManager;

36

import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;

37

38

// Testing and development

39

import org.apache.flink.runtime.minicluster.MiniCluster;

40

import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;

41

42

// Job management and monitoring

43

import org.apache.flink.runtime.dispatcher.DispatcherGateway;

44

import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;

45

```

46

47

## Basic Usage

48

49

```java

50

// Create a simple job graph

51

JobGraph jobGraph = new JobGraph("MyFlinkJob");

52

53

// Create and configure a job vertex

54

JobVertex sourceVertex = new JobVertex("Source");

55

sourceVertex.setParallelism(4);

56

sourceVertex.setInvokableClass(MySourceTask.class);

57

58

JobVertex mapVertex = new JobVertex("Map");

59

mapVertex.setParallelism(4);

60

mapVertex.setInvokableClass(MyMapTask.class);

61

62

// Connect vertices

63

mapVertex.connectNewDataSetAsInput(

64

sourceVertex,

65

DistributionPattern.ALL_TO_ALL,

66

ResultPartitionType.PIPELINED

67

);

68

69

// Add vertices to job graph

70

jobGraph.addVertex(sourceVertex);

71

jobGraph.addVertex(mapVertex);

72

73

// Configure checkpointing

74

JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(

75

Arrays.asList(sourceVertex.getID(), mapVertex.getID()),

76

Arrays.asList(sourceVertex.getID(), mapVertex.getID()),

77

Arrays.asList(sourceVertex.getID(), mapVertex.getID()),

78

new CheckpointCoordinatorConfiguration.Builder()

79

.setCheckpointInterval(5000L)

80

.setCheckpointTimeout(60000L)

81

.build(),

82

null

83

);

84

jobGraph.setSnapshotSettings(checkpointingSettings);

85

86

// For testing - execute job in local MiniCluster

87

MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()

88

.setNumTaskManagers(1)

89

.setNumSlotsPerTaskManager(4)

90

.build();

91

92

MiniCluster miniCluster = new MiniCluster(config);

93

try {

94

miniCluster.start();

95

CompletableFuture<JobSubmissionResult> submission = miniCluster.submitJob(jobGraph);

96

JobSubmissionResult result = submission.get();

97

System.out.println("Job submitted with ID: " + result.getJobID());

98

99

// Wait for job completion or trigger savepoint

100

CompletableFuture<JobResult> jobResult = miniCluster.requestJobResult(result.getJobID());

101

JobResult finalResult = jobResult.get();

102

System.out.println("Job finished with status: " + finalResult.getJobExecutionResult());

103

} finally {

104

miniCluster.closeAsync().get();

105

}

106

```

107

108

## Architecture

109

110

Apache Flink Runtime is built around several key architectural components:

111

112

- **Job Management Layer**: JobGraph and ExecutionGraph represent job structure at different abstraction levels

113

- **Scheduling Layer**: SchedulerNG implementations coordinate job execution and resource allocation

114

- **Execution Layer**: TaskExecutor instances run individual tasks with full isolation and fault tolerance

115

- **State Management**: Pluggable state backends with checkpointing for exactly-once processing guarantees

116

- **Resource Management**: ResourceManager handles cluster resources and TaskExecutor lifecycle

117

- **Network Layer**: High-throughput, low-latency data exchange between distributed tasks

118

- **High Availability**: Leader election and service discovery for fault-tolerant cluster coordination

119

120

The runtime operates as a distributed system where JobManager coordinates execution while TaskManagers execute the actual data processing tasks.

121

122

## Capabilities

123

124

### Job Graph Management

125

126

Core APIs for defining and configuring Flink jobs as directed acyclic graphs (DAGs) of operations.

127

128

```java { .api }

129

public class JobGraph implements ExecutionPlan {

130

public JobGraph(String jobName);

131

public JobGraph(JobID jobId, String jobName);

132

public void addVertex(JobVertex vertex);

133

public Iterable<JobVertex> getVertices();

134

public JobID getJobID();

135

public String getName();

136

public void setJobType(JobType jobType);

137

public JobType getJobType();

138

public JobVertex findVertexByID(JobVertexID id);

139

public void setJobConfiguration(Configuration jobConfiguration);

140

public Configuration getJobConfiguration();

141

public void setSnapshotSettings(JobCheckpointingSettings settings);

142

public JobCheckpointingSettings getCheckpointingSettings();

143

}

144

145

public class JobVertex implements Serializable {

146

public JobVertex(String name);

147

public JobVertex(String name, JobVertexID id);

148

public JobVertexID getId();

149

public String getName();

150

public void setParallelism(int parallelism);

151

public int getParallelism();

152

public void setInvokableClass(Class<? extends TaskInvokable> invokable);

153

public void connectNewDataSetAsInput(

154

JobVertex input,

155

DistributionPattern distPattern,

156

ResultPartitionType partitionType

157

);

158

}

159

```

160

161

[Job Graph Management](./job-graph.md)

162

163

### Execution and Scheduling

164

165

Advanced scheduling system for distributed job execution with support for batch and streaming workloads.

166

167

```java { .api }

168

public interface SchedulerNG extends GlobalFailureHandler, AutoCloseableAsync {

169

void startScheduling();

170

void cancel();

171

CompletableFuture<JobStatus> getJobTerminationFuture();

172

boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState);

173

CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);

174

CompletableFuture<String> stopWithSavepoint(

175

String targetDirectory,

176

boolean terminate,

177

SavepointFormatType formatType

178

);

179

}

180

181

public interface ExecutionGraph {

182

String getJobName();

183

JobID getJobID();

184

JobStatus getState();

185

ExecutionJobVertex getJobVertex(JobVertexID id);

186

Iterable<ExecutionJobVertex> getAllVertices();

187

CheckpointCoordinator getCheckpointCoordinator();

188

}

189

```

190

191

[Execution and Scheduling](./execution-scheduling.md)

192

193

### State Management and Checkpointing

194

195

Comprehensive state management with pluggable backends and distributed checkpointing for fault tolerance.

196

197

```java { .api }

198

public class CheckpointCoordinator {

199

public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic);

200

public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);

201

public CompletableFuture<CompletedCheckpoint> triggerSavepoint(

202

@Nullable String targetLocation,

203

SavepointFormatType formatType

204

);

205

public void shutdown() throws Exception;

206

public CompletedCheckpointStore getCheckpointStore();

207

}

208

209

public interface KeyedStateBackend<K> extends KeyedState {

210

<T> InternalKvState<K, ?, T> createState(

211

StateDescriptor<?, T> stateDescriptor,

212

TypeSerializer<T> namespaceSerializer

213

);

214

RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(

215

long checkpointId,

216

long timestamp,

217

CheckpointStreamFactory streamFactory,

218

CheckpointOptions checkpointOptions

219

);

220

}

221

222

public interface OperatorStateBackend {

223

<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor);

224

<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor);

225

RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(

226

long checkpointId,

227

long timestamp,

228

CheckpointStreamFactory factory,

229

CheckpointOptions checkpointOptions

230

);

231

}

232

```

233

234

[State Management and Checkpointing](./state-checkpointing.md)

235

236

### Task Execution

237

238

Distributed task execution engine with slot management and lifecycle coordination.

239

240

```java { .api }

241

public interface TaskExecutorGateway extends RpcGateway {

242

CompletableFuture<Acknowledge> requestSlot(

243

SlotID slotId,

244

JobID jobId,

245

AllocationID allocationId,

246

ResourceProfile resourceProfile,

247

String targetAddress,

248

ResourceManagerId resourceManagerId,

249

Duration timeout

250

);

251

252

CompletableFuture<Acknowledge> submitTask(

253

TaskDeploymentDescriptor tdd,

254

JobMasterId jobMasterId,

255

Duration timeout

256

);

257

258

CompletableFuture<Acknowledge> cancelTask(

259

ExecutionAttemptID executionAttemptID,

260

Duration timeout

261

);

262

263

CompletableFuture<Acknowledge> triggerCheckpoint(

264

ExecutionAttemptID executionAttemptID,

265

long checkpointId,

266

long checkpointTimestamp,

267

CheckpointOptions checkpointOptions,

268

Duration timeout

269

);

270

}

271

```

272

273

[Task Execution](./task-execution.md)

274

275

### Resource Management

276

277

Cluster resource allocation and TaskExecutor lifecycle management for different deployment environments.

278

279

```java { .api }

280

public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManagerId> {

281

CompletableFuture<RegistrationResponse> registerJobMaster(

282

JobMasterId jobMasterId,

283

ResourceID jobMasterResourceId,

284

String jobMasterAddress,

285

JobID jobId,

286

Duration timeout

287

);

288

289

CompletableFuture<RegistrationResponse> registerTaskExecutor(

290

String taskExecutorAddress,

291

ResourceID resourceId,

292

SlotReport slotReport,

293

ResourceProfile totalResourceProfile,

294

Duration timeout

295

);

296

297

CompletableFuture<Acknowledge> requestSlot(

298

JobMasterId jobMasterId,

299

SlotRequest slotRequest,

300

Duration timeout

301

);

302

}

303

304

public class WorkerResourceSpec {

305

public static WorkerResourceSpec fromTotalResourceProfile(

306

ResourceProfile totalResourceProfile,

307

MemorySize networkMemorySize

308

);

309

310

public ResourceProfile getTotalResourceProfile();

311

public MemorySize getTaskHeapSize();

312

public MemorySize getTaskOffHeapSize();

313

public MemorySize getNetworkMemSize();

314

public MemorySize getManagedMemSize();

315

}

316

```

317

318

[Resource Management](./resource-management.md)

319

320

### High Availability and Coordination

321

322

Leader election, service discovery, and coordination services for fault-tolerant distributed operation.

323

324

```java { .api }

325

public interface HighAvailabilityServices extends AutoCloseableAsync {

326

LeaderElectionService getResourceManagerLeaderElectionService();

327

LeaderElectionService getDispatcherLeaderElectionService();

328

LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);

329

LeaderRetrievalService getResourceManagerLeaderRetriever();

330

LeaderRetrievalService getDispatcherLeaderRetriever();

331

CheckpointRecoveryFactory getCheckpointRecoveryFactory();

332

}

333

334

public interface LeaderElectionService {

335

void start(LeaderContender contender);

336

void stop();

337

void confirmLeadership(UUID leaderSessionID);

338

boolean hasLeadership(UUID leaderSessionId);

339

}

340

341

public interface LeaderRetrievalService {

342

void start(LeaderRetrievalListener listener);

343

void stop();

344

}

345

```

346

347

[High Availability and Coordination](./high-availability.md)

348

349

### Testing and Development

350

351

MiniCluster provides an embedded Flink cluster for local testing and development, allowing full Flink functionality in a single JVM.

352

353

```java { .api }

354

public class MiniCluster implements AutoCloseableAsync {

355

public MiniCluster(MiniClusterConfiguration configuration);

356

357

public void start() throws Exception;

358

public CompletableFuture<Void> closeAsync();

359

360

public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph);

361

public CompletableFuture<JobResult> requestJobResult(JobID jobId);

362

public CompletableFuture<Acknowledge> cancelJob(JobID jobId);

363

364

public CompletableFuture<String> triggerSavepoint(

365

JobID jobId,

366

String targetDirectory,

367

SavepointFormatType formatType

368

);

369

370

public ClusterClient<MiniClusterJobClient> getClusterClient();

371

public String getRestAddress();

372

public int getRestPort();

373

}

374

375

public class MiniClusterConfiguration {

376

public static Builder newBuilder();

377

378

public static class Builder {

379

public Builder setNumTaskManagers(int numTaskManagers);

380

public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);

381

public Builder setConfiguration(Configuration configuration);

382

public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing);

383

public MiniClusterConfiguration build();

384

}

385

}

386

```

387

388

### Job Management and Submission

389

390

DispatcherGateway provides the main interface for job submission, management, and monitoring in Flink clusters.

391

392

```java { .api }

393

public interface DispatcherGateway extends FencedRpcGateway<DispatcherId> {

394

CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Duration timeout);

395

396

CompletableFuture<Collection<JobStatusMessage>> listJobs(Duration timeout);

397

398

CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Duration timeout);

399

400

CompletableFuture<Acknowledge> cancelJob(JobID jobId, Duration timeout);

401

402

CompletableFuture<String> triggerSavepoint(

403

JobID jobId,

404

String targetDirectory,

405

SavepointFormatType formatType,

406

Duration timeout

407

);

408

409

CompletableFuture<String> stopWithSavepoint(

410

JobID jobId,

411

String targetDirectory,

412

SavepointFormatType formatType,

413

Duration timeout

414

);

415

416

CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Duration timeout);

417

418

CompletableFuture<Integer> getBlobServerPort(Duration timeout);

419

}

420

```

421

422

### Execution Monitoring

423

424

AccessExecutionGraph provides read-only access to execution graph information for monitoring and inspection.

425

426

```java { .api }

427

public interface AccessExecutionGraph {

428

String getJobName();

429

JobID getJobID();

430

JobStatus getState();

431

JobType getJobType();

432

433

long getStatusTimestamp(JobStatus status);

434

Throwable getFailureCause();

435

String getFailureCauseAsString();

436

437

Iterable<AccessExecutionJobVertex> getAllVertices();

438

AccessExecutionJobVertex getJobVertex(JobVertexID vertexId);

439

440

Map<JobVertexID, AccessExecutionJobVertex> getAllVerticesAsMap();

441

442

long getCheckpointCoordinatorCheckpointId();

443

CheckpointStatsSnapshot getCheckpointStatsSnapshot();

444

445

Configuration getJobConfiguration();

446

SerializedValue<ExecutionConfig> getSerializedExecutionConfig();

447

448

boolean isStoppable();

449

ArchivedExecutionGraph archive();

450

}

451

```

452

453

## Types

454

455

```java { .api }

456

// Core identifiers

457

public class JobID implements Serializable {

458

public static JobID generate();

459

public static JobID fromHexString(String hexString);

460

}

461

462

public class JobVertexID implements Serializable {

463

public JobVertexID();

464

public JobVertexID(byte[] bytes);

465

}

466

467

public class ExecutionAttemptID implements Serializable {

468

public ExecutionAttemptID();

469

public ExecutionAttemptID(ExecutionVertexID vertexId, int attemptNumber);

470

}

471

472

// Resource specifications

473

public class ResourceProfile implements Serializable {

474

public static final ResourceProfile ZERO;

475

public static final ResourceProfile ANY;

476

477

public static ResourceProfile fromResources(

478

double cpuCores,

479

MemorySize taskHeapMemory,

480

MemorySize taskOffHeapMemory,

481

MemorySize managedMemory,

482

MemorySize networkMemory

483

);

484

485

public CPUResource getCpuCores();

486

public MemorySize getTaskHeapMemory();

487

public MemorySize getTaskOffHeapMemory();

488

}

489

490

// Job configuration

491

public enum JobType {

492

BATCH, STREAMING

493

}

494

495

public enum DistributionPattern {

496

POINTWISE, ALL_TO_ALL

497

}

498

499

// Checkpoint types

500

public enum CheckpointType implements SnapshotType {

501

CHECKPOINT("Checkpoint"),

502

SAVEPOINT("Savepoint");

503

}

504

505

// Job execution results

506

public class JobSubmissionResult {

507

public JobID getJobID();

508

public boolean isJobSubmitted();

509

}

510

511

public class JobResult {

512

public JobID getJobID();

513

public ApplicationStatus getApplicationStatus();

514

public JobExecutionResult getJobExecutionResult();

515

public long getNetRuntime();

516

public Map<String, SerializedValue<Object>> getAccumulatorResults();

517

}

518

519

public enum JobStatus {

520

INITIALIZING, CREATED, RUNNING, FAILING, FAILED,

521

CANCELLING, CANCELED, FINISHED, RESTARTING, SUSPENDED

522

}

523

524

// Mini cluster types

525

public enum RpcServiceSharing {

526

SHARED, DEDICATED

527

}

528

529

// Savepoint format types

530

public enum SavepointFormatType {

531

CANONICAL, NATIVE

532

}

533

```