or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

execution-scheduling.mdhigh-availability.mdindex.mdjob-graph.mdresource-management.mdstate-checkpointing.mdtask-execution.md

execution-scheduling.mddocs/

0

# Execution and Scheduling

1

2

Advanced scheduling system for distributed job execution with support for batch and streaming workloads. The execution layer transforms JobGraphs into ExecutionGraphs and coordinates task execution across the cluster.

3

4

## Capabilities

5

6

### SchedulerNG

7

8

Main interface for job scheduling with support for different scheduling strategies including adaptive scheduling.

9

10

```java { .api }

11

/**

12

* Interface for scheduling Flink jobs. Implementations receive a JobGraph when instantiated

13

* and coordinate the distributed execution of the job.

14

*/

15

public interface SchedulerNG extends GlobalFailureHandler, AutoCloseableAsync {

16

/** Start scheduling the job */

17

void startScheduling();

18

19

/** Cancel the job execution */

20

void cancel();

21

22

/** Get future that completes when job terminates */

23

CompletableFuture<JobStatus> getJobTerminationFuture();

24

25

/** Update task execution state from TaskExecutor */

26

boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState);

27

28

/** Request next input split for a task */

29

SerializedInputSplit requestNextInputSplit(

30

JobVertexID vertexID,

31

ExecutionAttemptID executionAttempt

32

);

33

34

/** Trigger a checkpoint */

35

CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);

36

37

/** Stop job with savepoint */

38

CompletableFuture<String> stopWithSavepoint(

39

String targetDirectory,

40

boolean terminate,

41

SavepointFormatType formatType

42

);

43

44

/** Trigger savepoint */

45

CompletableFuture<String> triggerSavepoint(

46

String targetDirectory,

47

SavepointFormatType formatType

48

);

49

50

/** Deliver coordination events to operator coordinators */

51

CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(

52

OperatorID operatorId,

53

CoordinationRequest request

54

);

55

56

/** Handle operator event from task */

57

void handleOperatorEvent(

58

ExecutionAttemptID task,

59

OperatorID operatorId,

60

OperatorEvent evt

61

);

62

63

/** Get current job status */

64

JobStatus getJobStatus();

65

66

/** Suspend the job */

67

CompletableFuture<Void> suspend(Throwable cause);

68

69

/** Close the scheduler */

70

CompletableFuture<Void> closeAsync();

71

}

72

```

73

74

**Usage Examples:**

75

76

```java

77

// Create scheduler configuration

78

SchedulerNGFactory schedulerFactory = new DefaultSchedulerFactory();

79

ComponentMainThreadExecutor mainThreadExecutor = // ... get executor

80

81

// Create scheduler

82

SchedulerNG scheduler = schedulerFactory.createInstance(

83

log,

84

jobGraph,

85

ioExecutor,

86

jobMasterConfiguration,

87

slotPoolServiceFactory,

88

mainThreadExecutor,

89

heartbeatServices,

90

jobManagerJobMetricGroup,

91

shuffleMaster,

92

jobMasterPartitionTracker,

93

executionGraph -> {}, // execution graph handler

94

fatalErrorHandler

95

);

96

97

// Start scheduling

98

scheduler.startScheduling();

99

100

// Trigger checkpoint

101

CompletableFuture<CompletedCheckpoint> checkpointFuture =

102

scheduler.triggerCheckpoint(CheckpointType.CHECKPOINT);

103

104

// Stop with savepoint

105

CompletableFuture<String> savepointFuture = scheduler.stopWithSavepoint(

106

"hdfs://cluster/savepoints/",

107

true, // terminate after savepoint

108

SavepointFormatType.CANONICAL

109

);

110

```

111

112

### ExecutionGraph

113

114

The execution graph represents the parallel execution of a job, containing detailed information about tasks, their current state, and execution history.

115

116

```java { .api }

117

/**

118

* The execution graph is the central data structure that coordinates the distributed

119

* execution of a data flow. It keeps representations of each parallel task, each

120

* intermediate stream, and the communication between them.

121

*/

122

public interface ExecutionGraph {

123

/** Get the job name */

124

String getJobName();

125

126

/** Get the job ID */

127

JobID getJobID();

128

129

/** Get current job status */

130

JobStatus getState();

131

132

/** Get job vertex by ID */

133

ExecutionJobVertex getJobVertex(JobVertexID id);

134

135

/** Get all job vertices */

136

Map<JobVertexID, ExecutionJobVertex> getAllVertices();

137

138

/** Get vertices in topological order */

139

Iterable<ExecutionJobVertex> getVerticesTopologically();

140

141

/** Get all execution vertices */

142

Iterable<ExecutionVertex> getAllExecutionVertices();

143

144

/** Get total number of vertices */

145

int getTotalNumberOfVertices();

146

147

/** Get checkpoint coordinator */

148

CheckpointCoordinator getCheckpointCoordinator();

149

150

/** Get job configuration */

151

Configuration getJobConfiguration();

152

153

/** Get job class loader */

154

ClassLoader getUserClassLoader();

155

156

/** Get execution config */

157

ExecutionConfig getExecutionConfig();

158

159

/** Get JSON execution plan */

160

String getJsonPlan();

161

162

/** Get failure cause if job failed */

163

Throwable getFailureCause();

164

165

/** Get job start timestamp */

166

long getStatusTimestamp(JobStatus status);

167

168

/** Get job state history */

169

List<ExecutionGraphHistoryEntry> getHistoryEntries();

170

171

/** Check if job is stoppable */

172

boolean isStoppable();

173

174

/** Get KV state location registry */

175

KvStateLocationRegistry getKvStateLocationRegistry();

176

177

/** Enable checkpointing */

178

void enableCheckpointing(

179

CheckpointCoordinatorConfiguration chkConfig,

180

List<MasterTriggerRestoreHook<?>> masterHooks,

181

CheckpointIDCounter checkpointIDCounter,

182

CompletedCheckpointStore checkpointStore,

183

StateBackend checkpointStateBackend,

184

CheckpointStorage checkpointStorage,

185

CheckpointStatsTracker statsTracker,

186

CheckpointsCleaner checkpointsCleaner

187

);

188

}

189

```

190

191

### ExecutionJobVertex

192

193

Represents a job vertex during execution, managing all parallel subtasks (ExecutionVertices) for a single operation.

194

195

```java { .api }

196

/**

197

* Represents one vertex from the JobGraph during execution. Holds the aggregated

198

* state of all parallel subtasks.

199

*/

200

public class ExecutionJobVertex {

201

/** Get the job vertex ID */

202

public JobVertexID getJobVertexId();

203

204

/** Get the job vertex name */

205

public String getName();

206

207

/** Get current parallelism */

208

public int getParallelism();

209

210

/** Get maximum parallelism */

211

public int getMaxParallelism();

212

213

/** Get resource profile */

214

public ResourceProfile getResourceProfile();

215

216

/** Get all task vertices (parallel subtasks) */

217

public ExecutionVertex[] getTaskVertices();

218

219

/** Get specific task vertex by subtask index */

220

public ExecutionVertex getTaskVertex(int subtask);

221

222

/** Get operator coordinators */

223

public Collection<OperatorCoordinatorHolder> getOperatorCoordinators();

224

225

/** Get produced data sets */

226

public IntermediateResult[] getProducedDataSets();

227

228

/** Get input edges */

229

public List<IntermediateResult> getInputs();

230

231

/** Get aggregated task resource profile */

232

public ResourceProfile getAggregatedTaskResourceProfile();

233

234

/** Get slot sharing group */

235

public SlotSharingGroup getSlotSharingGroup();

236

237

/** Get co-location group */

238

public CoLocationGroup getCoLocationGroup();

239

240

/** Get vertex execution state */

241

public ExecutionState getAggregateState();

242

243

/** Check if vertex is finished */

244

public boolean isFinished();

245

246

/** Get input split assigner */

247

public InputSplitAssigner getSplitAssigner();

248

}

249

```

250

251

### ExecutionVertex

252

253

Represents one parallel subtask of an ExecutionJobVertex, containing execution attempts and current state.

254

255

```java { .api }

256

/**

257

* Represents one parallel subtask. For each ExecutionJobVertex, there are as many

258

* ExecutionVertices as the parallelism.

259

*/

260

public class ExecutionVertex {

261

/** Get the job vertex this belongs to */

262

public ExecutionJobVertex getJobVertex();

263

264

/** Get subtask index */

265

public int getParallelSubtaskIndex();

266

267

/** Get current execution attempt */

268

public Execution getCurrentExecutionAttempt();

269

270

/** Get prior execution attempts */

271

public ExecutionHistory getPriorExecutionHistory();

272

273

/** Get execution state */

274

public ExecutionState getExecutionState();

275

276

/** Get failure cause if failed */

277

public Throwable getFailureCause();

278

279

/** Get assigned slot */

280

public LogicalSlot getCurrentAssignedResource();

281

282

/** Get current assigned resource location */

283

public TaskManagerLocation getCurrentAssignedResourceLocation();

284

285

/** Get preferred locations for scheduling */

286

public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs();

287

288

/** Check if execution is finished */

289

public boolean isFinished();

290

291

/** Get task name with subtask info */

292

public String getTaskNameWithSubtaskIndex();

293

294

/** Create deployment descriptor */

295

public TaskDeploymentDescriptor createDeploymentDescriptor(

296

ExecutionAttemptID executionId,

297

LogicalSlot slot,

298

TaskManagerGateway taskManagerGateway,

299

int attemptNumber

300

);

301

302

/** Reset for new execution attempt */

303

public void resetForNewExecution();

304

}

305

```

306

307

### Execution

308

309

Represents one attempt to execute an ExecutionVertex, tracking deployment, state transitions, and task lifecycle.

310

311

```java { .api }

312

/**

313

* One attempt to execute an ExecutionVertex. There may be multiple Executions

314

* for each ExecutionVertex in case of failures or restarts.

315

*/

316

public class Execution {

317

/** Get execution attempt ID */

318

public ExecutionAttemptID getAttemptId();

319

320

/** Get execution vertex this belongs to */

321

public ExecutionVertex getVertex();

322

323

/** Get execution state */

324

public ExecutionState getState();

325

326

/** Get assigned logical slot */

327

public LogicalSlot getAssignedResource();

328

329

/** Get assigned resource location */

330

public TaskManagerLocation getAssignedResourceLocation();

331

332

/** Get failure cause if failed */

333

public Throwable getFailureCause();

334

335

/** Get state timestamps */

336

public long[] getStateTimestamps();

337

338

/** Get state timestamp for specific state */

339

public long getStateTimestamp(ExecutionState state);

340

341

/** Get state end timestamp for specific state */

342

public long getStateEndTimestamp(ExecutionState state);

343

344

/** Deploy execution to assigned TaskManager */

345

public CompletableFuture<Void> deploy();

346

347

/** Cancel execution */

348

public void cancel();

349

350

/** Fail execution with cause */

351

public void fail(Throwable t);

352

353

/** Mark execution as finished */

354

public void markFinished();

355

356

/** Update execution state */

357

public boolean updateState(TaskExecutionState state);

358

359

/** Trigger checkpoint for this execution */

360

public void triggerCheckpoint(

361

long checkpointId,

362

long timestamp,

363

CheckpointOptions checkpointOptions

364

);

365

366

/** Notify checkpoint complete */

367

public void notifyCheckpointComplete(

368

long checkpointId,

369

long timestamp

370

);

371

372

/** Notify checkpoint aborted */

373

public void notifyCheckpointAborted(

374

long checkpointId,

375

long timestamp

376

);

377

}

378

```

379

380

**Usage Examples:**

381

382

```java

383

// Access execution graph information

384

ExecutionGraph executionGraph = scheduler.getExecutionGraph();

385

386

// Get job status

387

JobStatus status = executionGraph.getState();

388

System.out.println("Job status: " + status);

389

390

// Iterate through vertices

391

for (ExecutionJobVertex jobVertex : executionGraph.getVerticesTopologically()) {

392

System.out.println("Vertex: " + jobVertex.getName() +

393

", Parallelism: " + jobVertex.getParallelism());

394

395

// Check individual subtasks

396

for (ExecutionVertex execVertex : jobVertex.getTaskVertices()) {

397

ExecutionState state = execVertex.getExecutionState();

398

System.out.println(" Subtask " + execVertex.getParallelSubtaskIndex() +

399

": " + state);

400

}

401

}

402

403

// Get checkpoint coordinator

404

CheckpointCoordinator coordinator = executionGraph.getCheckpointCoordinator();

405

if (coordinator != null) {

406

CompletedCheckpointStore store = coordinator.getCheckpointStore();

407

System.out.println("Latest checkpoint: " + store.getLatestCheckpoint());

408

}

409

```

410

411

## Types

412

413

```java { .api }

414

// Execution identifiers

415

public class ExecutionAttemptID implements Serializable {

416

public ExecutionAttemptID();

417

public ExecutionAttemptID(ExecutionVertexID vertexId, int attemptNumber);

418

public static ExecutionAttemptID randomId();

419

420

public ExecutionVertexID getExecutionVertexId();

421

public int getAttemptNumber();

422

}

423

424

public class ExecutionVertexID implements Serializable {

425

public ExecutionVertexID(JobVertexID jobVertexId, int subtaskIndex);

426

427

public JobVertexID getJobVertexId();

428

public int getSubtaskIndex();

429

}

430

431

// Execution states

432

public enum ExecutionState {

433

CREATED,

434

SCHEDULED,

435

DEPLOYING,

436

INITIALIZING,

437

RUNNING,

438

FINISHED,

439

CANCELING,

440

CANCELED,

441

FAILED;

442

443

public boolean isTerminal();

444

public boolean isRunning();

445

}

446

447

// Task execution state transition

448

public class TaskExecutionStateTransition implements Serializable {

449

public TaskExecutionStateTransition(TaskExecutionState taskExecutionState);

450

451

public ExecutionAttemptID getID();

452

public ExecutionState getExecutionState();

453

public Throwable getError();

454

public TaskExecutionState getTaskExecutionState();

455

}

456

457

// Scheduler configurations

458

public class DefaultSchedulerComponents {

459

public static DefaultSchedulerComponents createSchedulerComponents(

460

JobType jobType,

461

boolean isApproximateLocalRecoveryEnabled,

462

Configuration jobMasterConfiguration,

463

SlotPool slotPool,

464

Duration slotRequestTimeout

465

);

466

467

public ExecutionSlotAllocatorFactory getAllocatorFactory();

468

public RestartStrategy getRestartStrategy();

469

public ExecutionVertexVersioner getVersioner();

470

}

471

472

// Scheduling strategies

473

public enum SchedulingStrategy {

474

LEGACY_SCHEDULER,

475

DEFAULT,

476

ADAPTIVE_BATCH

477

}

478

```