0
# Execution and Scheduling
1
2
Advanced scheduling system for distributed job execution with support for batch and streaming workloads. The execution layer transforms JobGraphs into ExecutionGraphs and coordinates task execution across the cluster.
3
4
## Capabilities
5
6
### SchedulerNG
7
8
Main interface for job scheduling with support for different scheduling strategies including adaptive scheduling.
9
10
```java { .api }
11
/**
12
* Interface for scheduling Flink jobs. Implementations receive a JobGraph when instantiated
13
* and coordinate the distributed execution of the job.
14
*/
15
public interface SchedulerNG extends GlobalFailureHandler, AutoCloseableAsync {
16
/** Start scheduling the job */
17
void startScheduling();
18
19
/** Cancel the job execution */
20
void cancel();
21
22
/** Get future that completes when job terminates */
23
CompletableFuture<JobStatus> getJobTerminationFuture();
24
25
/** Update task execution state from TaskExecutor */
26
boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState);
27
28
/** Request next input split for a task */
29
SerializedInputSplit requestNextInputSplit(
30
JobVertexID vertexID,
31
ExecutionAttemptID executionAttempt
32
);
33
34
/** Trigger a checkpoint */
35
CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);
36
37
/** Stop job with savepoint */
38
CompletableFuture<String> stopWithSavepoint(
39
String targetDirectory,
40
boolean terminate,
41
SavepointFormatType formatType
42
);
43
44
/** Trigger savepoint */
45
CompletableFuture<String> triggerSavepoint(
46
String targetDirectory,
47
SavepointFormatType formatType
48
);
49
50
/** Deliver coordination events to operator coordinators */
51
CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
52
OperatorID operatorId,
53
CoordinationRequest request
54
);
55
56
/** Handle operator event from task */
57
void handleOperatorEvent(
58
ExecutionAttemptID task,
59
OperatorID operatorId,
60
OperatorEvent evt
61
);
62
63
/** Get current job status */
64
JobStatus getJobStatus();
65
66
/** Suspend the job */
67
CompletableFuture<Void> suspend(Throwable cause);
68
69
/** Close the scheduler */
70
CompletableFuture<Void> closeAsync();
71
}
72
```
73
74
**Usage Examples:**
75
76
```java
77
// Create scheduler configuration
78
SchedulerNGFactory schedulerFactory = new DefaultSchedulerFactory();
79
ComponentMainThreadExecutor mainThreadExecutor = // ... get executor
80
81
// Create scheduler
82
SchedulerNG scheduler = schedulerFactory.createInstance(
83
log,
84
jobGraph,
85
ioExecutor,
86
jobMasterConfiguration,
87
slotPoolServiceFactory,
88
mainThreadExecutor,
89
heartbeatServices,
90
jobManagerJobMetricGroup,
91
shuffleMaster,
92
jobMasterPartitionTracker,
93
executionGraph -> {}, // execution graph handler
94
fatalErrorHandler
95
);
96
97
// Start scheduling
98
scheduler.startScheduling();
99
100
// Trigger checkpoint
101
CompletableFuture<CompletedCheckpoint> checkpointFuture =
102
scheduler.triggerCheckpoint(CheckpointType.CHECKPOINT);
103
104
// Stop with savepoint
105
CompletableFuture<String> savepointFuture = scheduler.stopWithSavepoint(
106
"hdfs://cluster/savepoints/",
107
true, // terminate after savepoint
108
SavepointFormatType.CANONICAL
109
);
110
```
111
112
### ExecutionGraph
113
114
The execution graph represents the parallel execution of a job, containing detailed information about tasks, their current state, and execution history.
115
116
```java { .api }
117
/**
118
* The execution graph is the central data structure that coordinates the distributed
119
* execution of a data flow. It keeps representations of each parallel task, each
120
* intermediate stream, and the communication between them.
121
*/
122
public interface ExecutionGraph {
123
/** Get the job name */
124
String getJobName();
125
126
/** Get the job ID */
127
JobID getJobID();
128
129
/** Get current job status */
130
JobStatus getState();
131
132
/** Get job vertex by ID */
133
ExecutionJobVertex getJobVertex(JobVertexID id);
134
135
/** Get all job vertices */
136
Map<JobVertexID, ExecutionJobVertex> getAllVertices();
137
138
/** Get vertices in topological order */
139
Iterable<ExecutionJobVertex> getVerticesTopologically();
140
141
/** Get all execution vertices */
142
Iterable<ExecutionVertex> getAllExecutionVertices();
143
144
/** Get total number of vertices */
145
int getTotalNumberOfVertices();
146
147
/** Get checkpoint coordinator */
148
CheckpointCoordinator getCheckpointCoordinator();
149
150
/** Get job configuration */
151
Configuration getJobConfiguration();
152
153
/** Get job class loader */
154
ClassLoader getUserClassLoader();
155
156
/** Get execution config */
157
ExecutionConfig getExecutionConfig();
158
159
/** Get JSON execution plan */
160
String getJsonPlan();
161
162
/** Get failure cause if job failed */
163
Throwable getFailureCause();
164
165
/** Get job start timestamp */
166
long getStatusTimestamp(JobStatus status);
167
168
/** Get job state history */
169
List<ExecutionGraphHistoryEntry> getHistoryEntries();
170
171
/** Check if job is stoppable */
172
boolean isStoppable();
173
174
/** Get KV state location registry */
175
KvStateLocationRegistry getKvStateLocationRegistry();
176
177
/** Enable checkpointing */
178
void enableCheckpointing(
179
CheckpointCoordinatorConfiguration chkConfig,
180
List<MasterTriggerRestoreHook<?>> masterHooks,
181
CheckpointIDCounter checkpointIDCounter,
182
CompletedCheckpointStore checkpointStore,
183
StateBackend checkpointStateBackend,
184
CheckpointStorage checkpointStorage,
185
CheckpointStatsTracker statsTracker,
186
CheckpointsCleaner checkpointsCleaner
187
);
188
}
189
```
190
191
### ExecutionJobVertex
192
193
Represents a job vertex during execution, managing all parallel subtasks (ExecutionVertices) for a single operation.
194
195
```java { .api }
196
/**
197
* Represents one vertex from the JobGraph during execution. Holds the aggregated
198
* state of all parallel subtasks.
199
*/
200
public class ExecutionJobVertex {
201
/** Get the job vertex ID */
202
public JobVertexID getJobVertexId();
203
204
/** Get the job vertex name */
205
public String getName();
206
207
/** Get current parallelism */
208
public int getParallelism();
209
210
/** Get maximum parallelism */
211
public int getMaxParallelism();
212
213
/** Get resource profile */
214
public ResourceProfile getResourceProfile();
215
216
/** Get all task vertices (parallel subtasks) */
217
public ExecutionVertex[] getTaskVertices();
218
219
/** Get specific task vertex by subtask index */
220
public ExecutionVertex getTaskVertex(int subtask);
221
222
/** Get operator coordinators */
223
public Collection<OperatorCoordinatorHolder> getOperatorCoordinators();
224
225
/** Get produced data sets */
226
public IntermediateResult[] getProducedDataSets();
227
228
/** Get input edges */
229
public List<IntermediateResult> getInputs();
230
231
/** Get aggregated task resource profile */
232
public ResourceProfile getAggregatedTaskResourceProfile();
233
234
/** Get slot sharing group */
235
public SlotSharingGroup getSlotSharingGroup();
236
237
/** Get co-location group */
238
public CoLocationGroup getCoLocationGroup();
239
240
/** Get vertex execution state */
241
public ExecutionState getAggregateState();
242
243
/** Check if vertex is finished */
244
public boolean isFinished();
245
246
/** Get input split assigner */
247
public InputSplitAssigner getSplitAssigner();
248
}
249
```
250
251
### ExecutionVertex
252
253
Represents one parallel subtask of an ExecutionJobVertex, containing execution attempts and current state.
254
255
```java { .api }
256
/**
257
* Represents one parallel subtask. For each ExecutionJobVertex, there are as many
258
* ExecutionVertices as the parallelism.
259
*/
260
public class ExecutionVertex {
261
/** Get the job vertex this belongs to */
262
public ExecutionJobVertex getJobVertex();
263
264
/** Get subtask index */
265
public int getParallelSubtaskIndex();
266
267
/** Get current execution attempt */
268
public Execution getCurrentExecutionAttempt();
269
270
/** Get prior execution attempts */
271
public ExecutionHistory getPriorExecutionHistory();
272
273
/** Get execution state */
274
public ExecutionState getExecutionState();
275
276
/** Get failure cause if failed */
277
public Throwable getFailureCause();
278
279
/** Get assigned slot */
280
public LogicalSlot getCurrentAssignedResource();
281
282
/** Get current assigned resource location */
283
public TaskManagerLocation getCurrentAssignedResourceLocation();
284
285
/** Get preferred locations for scheduling */
286
public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs();
287
288
/** Check if execution is finished */
289
public boolean isFinished();
290
291
/** Get task name with subtask info */
292
public String getTaskNameWithSubtaskIndex();
293
294
/** Create deployment descriptor */
295
public TaskDeploymentDescriptor createDeploymentDescriptor(
296
ExecutionAttemptID executionId,
297
LogicalSlot slot,
298
TaskManagerGateway taskManagerGateway,
299
int attemptNumber
300
);
301
302
/** Reset for new execution attempt */
303
public void resetForNewExecution();
304
}
305
```
306
307
### Execution
308
309
Represents one attempt to execute an ExecutionVertex, tracking deployment, state transitions, and task lifecycle.
310
311
```java { .api }
312
/**
313
* One attempt to execute an ExecutionVertex. There may be multiple Executions
314
* for each ExecutionVertex in case of failures or restarts.
315
*/
316
public class Execution {
317
/** Get execution attempt ID */
318
public ExecutionAttemptID getAttemptId();
319
320
/** Get execution vertex this belongs to */
321
public ExecutionVertex getVertex();
322
323
/** Get execution state */
324
public ExecutionState getState();
325
326
/** Get assigned logical slot */
327
public LogicalSlot getAssignedResource();
328
329
/** Get assigned resource location */
330
public TaskManagerLocation getAssignedResourceLocation();
331
332
/** Get failure cause if failed */
333
public Throwable getFailureCause();
334
335
/** Get state timestamps */
336
public long[] getStateTimestamps();
337
338
/** Get state timestamp for specific state */
339
public long getStateTimestamp(ExecutionState state);
340
341
/** Get state end timestamp for specific state */
342
public long getStateEndTimestamp(ExecutionState state);
343
344
/** Deploy execution to assigned TaskManager */
345
public CompletableFuture<Void> deploy();
346
347
/** Cancel execution */
348
public void cancel();
349
350
/** Fail execution with cause */
351
public void fail(Throwable t);
352
353
/** Mark execution as finished */
354
public void markFinished();
355
356
/** Update execution state */
357
public boolean updateState(TaskExecutionState state);
358
359
/** Trigger checkpoint for this execution */
360
public void triggerCheckpoint(
361
long checkpointId,
362
long timestamp,
363
CheckpointOptions checkpointOptions
364
);
365
366
/** Notify checkpoint complete */
367
public void notifyCheckpointComplete(
368
long checkpointId,
369
long timestamp
370
);
371
372
/** Notify checkpoint aborted */
373
public void notifyCheckpointAborted(
374
long checkpointId,
375
long timestamp
376
);
377
}
378
```
379
380
**Usage Examples:**
381
382
```java
383
// Access execution graph information
384
ExecutionGraph executionGraph = scheduler.getExecutionGraph();
385
386
// Get job status
387
JobStatus status = executionGraph.getState();
388
System.out.println("Job status: " + status);
389
390
// Iterate through vertices
391
for (ExecutionJobVertex jobVertex : executionGraph.getVerticesTopologically()) {
392
System.out.println("Vertex: " + jobVertex.getName() +
393
", Parallelism: " + jobVertex.getParallelism());
394
395
// Check individual subtasks
396
for (ExecutionVertex execVertex : jobVertex.getTaskVertices()) {
397
ExecutionState state = execVertex.getExecutionState();
398
System.out.println(" Subtask " + execVertex.getParallelSubtaskIndex() +
399
": " + state);
400
}
401
}
402
403
// Get checkpoint coordinator
404
CheckpointCoordinator coordinator = executionGraph.getCheckpointCoordinator();
405
if (coordinator != null) {
406
CompletedCheckpointStore store = coordinator.getCheckpointStore();
407
System.out.println("Latest checkpoint: " + store.getLatestCheckpoint());
408
}
409
```
410
411
## Types
412
413
```java { .api }
414
// Execution identifiers
415
public class ExecutionAttemptID implements Serializable {
416
public ExecutionAttemptID();
417
public ExecutionAttemptID(ExecutionVertexID vertexId, int attemptNumber);
418
public static ExecutionAttemptID randomId();
419
420
public ExecutionVertexID getExecutionVertexId();
421
public int getAttemptNumber();
422
}
423
424
public class ExecutionVertexID implements Serializable {
425
public ExecutionVertexID(JobVertexID jobVertexId, int subtaskIndex);
426
427
public JobVertexID getJobVertexId();
428
public int getSubtaskIndex();
429
}
430
431
// Execution states
432
public enum ExecutionState {
433
CREATED,
434
SCHEDULED,
435
DEPLOYING,
436
INITIALIZING,
437
RUNNING,
438
FINISHED,
439
CANCELING,
440
CANCELED,
441
FAILED;
442
443
public boolean isTerminal();
444
public boolean isRunning();
445
}
446
447
// Task execution state transition
448
public class TaskExecutionStateTransition implements Serializable {
449
public TaskExecutionStateTransition(TaskExecutionState taskExecutionState);
450
451
public ExecutionAttemptID getID();
452
public ExecutionState getExecutionState();
453
public Throwable getError();
454
public TaskExecutionState getTaskExecutionState();
455
}
456
457
// Scheduler configurations
458
public class DefaultSchedulerComponents {
459
public static DefaultSchedulerComponents createSchedulerComponents(
460
JobType jobType,
461
boolean isApproximateLocalRecoveryEnabled,
462
Configuration jobMasterConfiguration,
463
SlotPool slotPool,
464
Duration slotRequestTimeout
465
);
466
467
public ExecutionSlotAllocatorFactory getAllocatorFactory();
468
public RestartStrategy getRestartStrategy();
469
public ExecutionVertexVersioner getVersioner();
470
}
471
472
// Scheduling strategies
473
public enum SchedulingStrategy {
474
LEGACY_SCHEDULER,
475
DEFAULT,
476
ADAPTIVE_BATCH
477
}
478
```