0
# Task Execution
1
2
Distributed task execution engine with slot management and lifecycle coordination. TaskExecutors are the worker processes responsible for executing individual tasks and managing computational slots.
3
4
## Capabilities
5
6
### TaskExecutorGateway
7
8
RPC gateway interface for communication with TaskExecutor instances, enabling task deployment, slot management, and coordination.
9
10
```java { .api }
11
/**
12
* TaskExecutor RPC gateway interface for remote communication with TaskExecutor instances.
13
* Provides methods for slot management, task lifecycle, and coordination.
14
*/
15
public interface TaskExecutorGateway
16
extends RpcGateway, TaskExecutorOperatorEventGateway, TaskExecutorThreadInfoGateway {
17
18
/** Request a slot from the TaskManager */
19
CompletableFuture<Acknowledge> requestSlot(
20
SlotID slotId,
21
JobID jobId,
22
AllocationID allocationId,
23
ResourceProfile resourceProfile,
24
String targetAddress,
25
ResourceManagerId resourceManagerId,
26
Duration timeout
27
);
28
29
/** Submit a task for execution */
30
CompletableFuture<Acknowledge> submitTask(
31
TaskDeploymentDescriptor tdd,
32
JobMasterId jobMasterId,
33
Duration timeout
34
);
35
36
/** Update partitions for a task */
37
CompletableFuture<Acknowledge> updatePartitions(
38
ExecutionAttemptID executionAttemptID,
39
Iterable<PartitionInfo> partitionInfos,
40
Duration timeout
41
);
42
43
/** Release partitions */
44
void releasePartitions(
45
JobID jobId,
46
Set<ResultPartitionID> partitionIds
47
);
48
49
/** Promote partitions */
50
CompletableFuture<Acknowledge> promotePartitions(
51
JobID jobId,
52
Set<ResultPartitionID> partitionIds,
53
Duration timeout
54
);
55
56
/** Cancel a task */
57
CompletableFuture<Acknowledge> cancelTask(
58
ExecutionAttemptID executionAttemptID,
59
Duration timeout
60
);
61
62
/** Trigger checkpoint for specific task */
63
CompletableFuture<Acknowledge> triggerCheckpoint(
64
ExecutionAttemptID executionAttemptID,
65
long checkpointId,
66
long checkpointTimestamp,
67
CheckpointOptions checkpointOptions,
68
Duration timeout
69
);
70
71
/** Confirm checkpoint complete */
72
CompletableFuture<Acknowledge> confirmCheckpoint(
73
ExecutionAttemptID executionAttemptID,
74
long checkpointId,
75
long checkpointTimestamp,
76
Duration timeout
77
);
78
79
/** Abort checkpoint for specific task */
80
CompletableFuture<Acknowledge> abortCheckpoint(
81
ExecutionAttemptID executionAttemptID,
82
long checkpointId,
83
long checkpointTimestamp,
84
Duration timeout
85
);
86
87
/** Free allocated slot */
88
CompletableFuture<Acknowledge> freeSlot(
89
AllocationID allocationId,
90
Throwable cause,
91
Duration timeout
92
);
93
94
/** Request slot report */
95
CompletableFuture<SlotReport> requestSlotReport(Duration timeout);
96
97
/** Heartbeat from JobManager */
98
void heartbeatFromJobManager(
99
ResourceID resourceID,
100
AllocatedSlotReport allocatedSlotReport
101
);
102
103
/** Heartbeat from ResourceManager */
104
void heartbeatFromResourceManager(ResourceID resourceID);
105
106
/** Disconnect JobManager */
107
void disconnectJobManager(JobID jobId, Exception cause);
108
109
/** Disconnect ResourceManager */
110
void disconnectResourceManager(Exception cause);
111
112
/** Request thread dump info */
113
CompletableFuture<ThreadDumpInfo> requestThreadDump(Duration timeout);
114
115
/** Request profiling info */
116
CompletableFuture<Collection<ProfilingInfo>> requestProfiling(
117
Duration timeout,
118
ProfilingMode mode,
119
Duration profilingDuration
120
);
121
122
/** Request log list */
123
CompletableFuture<Collection<LogInfo>> requestLogList(Duration timeout);
124
125
/** Request specific log file */
126
CompletableFuture<TransientBlobKey> requestFileUpload(
127
FileType fileType,
128
Duration timeout
129
);
130
}
131
```
132
133
**Usage Examples:**
134
135
```java
136
// Request slot allocation
137
CompletableFuture<Acknowledge> slotFuture = taskExecutorGateway.requestSlot(
138
new SlotID(resourceId, 0), // slot ID
139
jobId, // job ID
140
allocationId, // allocation ID
141
ResourceProfile.fromResources(2.0, 1024), // resource requirements
142
jobMasterAddress, // target address
143
resourceManagerId, // resource manager ID
144
Duration.ofMinutes(1) // timeout
145
);
146
147
// Submit task for execution
148
TaskDeploymentDescriptor descriptor = new TaskDeploymentDescriptor(
149
jobInformation,
150
taskInformation,
151
executionAttemptId,
152
allocationId,
153
subpartitionIndexRange,
154
targetSlotNumber,
155
taskStateSnapshot,
156
inputGateDeploymentDescriptors,
157
resultPartitionDeploymentDescriptors
158
);
159
160
CompletableFuture<Acknowledge> submitFuture = taskExecutorGateway.submitTask(
161
descriptor,
162
jobMasterId,
163
Duration.ofMinutes(5)
164
);
165
166
// Trigger checkpoint
167
CompletableFuture<Acknowledge> checkpointFuture = taskExecutorGateway.triggerCheckpoint(
168
executionAttemptId,
169
checkpointId,
170
System.currentTimeMillis(),
171
CheckpointOptions.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
172
Duration.ofSeconds(30)
173
);
174
```
175
176
### TaskExecutor
177
178
Main TaskExecutor implementation responsible for executing tasks and managing computational slots on worker nodes.
179
180
```java { .api }
181
/**
182
* TaskExecutor implementation. The TaskExecutor is responsible for the execution of multiple
183
* tasks and manages slots. It offers the slots to the ResourceManager and executes
184
* tasks when the JobManager requests it.
185
*/
186
public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> implements TaskExecutorGateway {
187
/** Start the TaskExecutor service */
188
public void start();
189
190
/** Get the TaskExecutor's resource ID */
191
public ResourceID getResourceID();
192
193
/** Get address of this TaskExecutor */
194
public String getAddress();
195
196
/** Get data port of this TaskExecutor */
197
public int getDataPort();
198
199
/** Get number of slots */
200
public int getNumberOfSlots();
201
202
/** Get hardware description */
203
public HardwareDescription getHardwareDescription();
204
205
/** Get memory configuration */
206
public TaskExecutorMemoryConfiguration getMemoryConfiguration();
207
208
/** Get network environment */
209
public NetworkEnvironment getNetworkEnvironment();
210
211
/** Get blob cache service */
212
public BlobCacheService getBlobCacheService();
213
214
/** Get slot table */
215
public TaskSlotTable<Task> getSlotTable();
216
217
/** Get job table */
218
public JobTable getJobTable();
219
220
/** Get job leader service */
221
public JobLeaderService getJobLeaderService();
222
223
/** Connect to ResourceManager */
224
public void connectToResourceManager(
225
ResourceManagerAddress resourceManagerAddress,
226
ResourceID resourceManagerResourceId
227
);
228
229
/** Establish job manager connection */
230
public void establishedJobManagerConnection(
231
JobID jobId,
232
JobMasterGateway jobMasterGateway,
233
TaskManagerActions taskManagerActions
234
);
235
236
/** Close job manager connection */
237
public void closeJobManagerConnection(JobID jobId, Exception cause);
238
239
/** Get current task slot utilization */
240
public SlotReport getCurrentSlotReport();
241
242
/** Register timeout for slot */
243
public void scheduleSlotTimeout(AllocationID allocationId, Duration timeout);
244
}
245
```
246
247
### TaskManagerServices
248
249
Container for essential TaskManager services including network, memory management, and I/O.
250
251
```java { .api }
252
/**
253
* Container for the TaskManager services like network environment, memory manager,
254
* IOManager, and related components.
255
*/
256
public class TaskManagerServices {
257
/** Get the task manager configuration */
258
public TaskManagerServicesConfiguration getTaskManagerServicesConfiguration();
259
260
/** Get the network environment */
261
public NetworkEnvironment getNetworkEnvironment();
262
263
/** Get the shuffle environment */
264
public ShuffleEnvironment<?, ?> getShuffleEnvironment();
265
266
/** Get the KvState service */
267
public KvStateService getKvStateService();
268
269
/** Get the broadcast variable manager */
270
public BroadcastVariableManager getBroadcastVariableManager();
271
272
/** Get the task slot table */
273
public TaskSlotTable<Task> getTaskSlotTable();
274
275
/** Get the job table */
276
public JobTable getJobTable();
277
278
/** Get the job leader service */
279
public JobLeaderService getJobLeaderService();
280
281
/** Get the task state manager */
282
public TaskExecutorLocalStateStoresManager getTaskManagerStateStore();
283
284
/** Get the memory manager */
285
public MemoryManager getMemoryManager();
286
287
/** Get the IO manager */
288
public IOManager getIOManager();
289
290
/** Get the libraries cache manager */
291
public BlobCacheService getLibraryCacheManager();
292
293
/** Get the task manager metric group */
294
public TaskManagerMetricGroup getTaskManagerMetricGroup();
295
296
/** Get the executor service for async operations */
297
public Executor getIOExecutor();
298
299
/** Get the fatal error handler */
300
public FatalErrorHandler getFatalErrorHandler();
301
302
/** Get the partition tracker */
303
public TaskExecutorPartitionTracker getPartitionTracker();
304
305
/** Get the backpressure sample service */
306
public BackPressureSampleService getBackPressureSampleService();
307
308
/** Shutdown all services */
309
public void shutDown();
310
}
311
```
312
313
### SlotReport
314
315
Report containing information about the status and allocation of slots on a TaskExecutor.
316
317
```java { .api }
318
/**
319
* A slot report contains information about which slots are available and allocated
320
* on a TaskManager.
321
*/
322
public class SlotReport implements Serializable {
323
/** Create a new slot report */
324
public SlotReport();
325
326
/** Create slot report with initial slots */
327
public SlotReport(Collection<SlotStatus> slotStatuses);
328
329
/** Add slot status to the report */
330
public void addSlotStatus(SlotStatus slotStatus);
331
332
/** Get all slot statuses */
333
public Collection<SlotStatus> getSlotsStatus();
334
335
/** Get number of slots */
336
public int getNumSlotStatus();
337
338
/** Check if report is empty */
339
public boolean isEmpty();
340
341
/** Get iterator over slot statuses */
342
public Iterator<SlotStatus> iterator();
343
}
344
345
/**
346
* Status of a single slot on a TaskManager
347
*/
348
public class SlotStatus implements Serializable {
349
/** Create slot status */
350
public SlotStatus(
351
SlotID slotID,
352
ResourceProfile resourceProfile,
353
JobID jobID,
354
AllocationID allocationID
355
);
356
357
/** Get slot ID */
358
public SlotID getSlotID();
359
360
/** Get resource profile of this slot */
361
public ResourceProfile getResourceProfile();
362
363
/** Get job ID if slot is allocated */
364
public JobID getJobID();
365
366
/** Get allocation ID if slot is allocated */
367
public AllocationID getAllocationID();
368
369
/** Check if slot is allocated */
370
public boolean isAllocated();
371
}
372
```
373
374
### TaskDeploymentDescriptor
375
376
Complete descriptor containing all information necessary to deploy and execute a task on a TaskExecutor.
377
378
```java { .api }
379
/**
380
* A TaskDeploymentDescriptor contains all the information necessary to deploy a task
381
* on a TaskManager.
382
*/
383
public class TaskDeploymentDescriptor implements Serializable {
384
/** Create task deployment descriptor */
385
public TaskDeploymentDescriptor(
386
JobInformation jobInformation,
387
TaskInformation taskInformation,
388
ExecutionAttemptID executionAttemptID,
389
AllocationID allocationID,
390
SubpartitionIndexRange subpartitionIndexRange,
391
int targetSlotNumber,
392
TaskStateSnapshot taskStateSnapshot,
393
List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
394
List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors
395
);
396
397
/** Get job information */
398
public JobInformation getJobInformation();
399
400
/** Get task information */
401
public TaskInformation getTaskInformation();
402
403
/** Get execution attempt ID */
404
public ExecutionAttemptID getExecutionAttemptID();
405
406
/** Get allocation ID for the slot */
407
public AllocationID getAllocationID();
408
409
/** Get subpartition index range */
410
public SubpartitionIndexRange getSubpartitionIndexRange();
411
412
/** Get target slot number */
413
public int getTargetSlotNumber();
414
415
/** Get task state snapshot for recovery */
416
public TaskStateSnapshot getTaskStateSnapshot();
417
418
/** Get input gate deployment descriptors */
419
public List<InputGateDeploymentDescriptor> getInputGateDeploymentDescriptors();
420
421
/** Get result partition deployment descriptors */
422
public List<ResultPartitionDeploymentDescriptor> getResultPartitionDeploymentDescriptors();
423
424
/** Get produced partition IDs */
425
public Collection<ResultPartitionID> getProducedPartitions();
426
427
/** Get consumed partition IDs */
428
public Collection<ResultPartitionID> getConsumedPartitions();
429
430
/** Get job ID */
431
public JobID getJobId();
432
433
/** Get job vertex ID */
434
public JobVertexID getJobVertexId();
435
436
/** Get attempt number */
437
public int getAttemptNumber();
438
439
/** Get subtask index */
440
public int getSubtaskIndex();
441
}
442
```
443
444
**Usage Examples:**
445
446
```java
447
// Create and configure TaskManager services
448
TaskManagerServicesConfiguration serviceConfig =
449
TaskManagerServicesConfiguration.fromConfiguration(
450
configuration,
451
resourceId,
452
externalAddress,
453
localCommunicationOnly,
454
taskManagerMetricGroup,
455
tmpDirPaths
456
);
457
458
TaskManagerServices taskManagerServices = TaskManagerServices.createTaskManagerServices(
459
serviceConfig,
460
resourceId,
461
rpcService,
462
highAvailabilityServices,
463
heartbeatServices,
464
metricRegistry,
465
blobCacheService,
466
localRecoveryDirectoryProvider,
467
fatalErrorHandler
468
);
469
470
// Create and start TaskExecutor
471
TaskExecutor taskExecutor = new TaskExecutor(
472
rpcService,
473
taskManagerConfiguration,
474
haServices,
475
taskManagerServices,
476
externalResourceInfoProvider,
477
heartbeatServices,
478
tokenManager,
479
aggregateManager,
480
fatalErrorHandler
481
);
482
483
taskExecutor.start();
484
485
// Monitor slot utilization
486
SlotReport slotReport = taskExecutor.getCurrentSlotReport();
487
for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
488
System.out.println("Slot " + slotStatus.getSlotID() +
489
" allocated: " + slotStatus.isAllocated());
490
if (slotStatus.isAllocated()) {
491
System.out.println(" Job: " + slotStatus.getJobID());
492
System.out.println(" Allocation: " + slotStatus.getAllocationID());
493
}
494
}
495
```
496
497
## Types
498
499
```java { .api }
500
// Slot and allocation identifiers
501
public class SlotID implements Serializable {
502
public SlotID(ResourceID resourceId, int slotNumber);
503
504
public ResourceID getResourceID();
505
public int getSlotNumber();
506
}
507
508
public class AllocationID implements Serializable {
509
public AllocationID();
510
public AllocationID(byte[] bytes);
511
public static AllocationID generate();
512
513
public byte[] getBytes();
514
}
515
516
// Resource specifications
517
public class ResourceID implements Serializable {
518
public ResourceID(String resourceId);
519
public static ResourceID generate();
520
521
public String getResourceIdString();
522
public String getStringWithMetadata();
523
}
524
525
// Task execution states
526
public enum ExecutionState {
527
CREATED,
528
SCHEDULED,
529
DEPLOYING,
530
INITIALIZING,
531
RUNNING,
532
FINISHED,
533
CANCELING,
534
CANCELED,
535
FAILED;
536
537
public boolean isTerminal();
538
public boolean isRunning();
539
}
540
541
// Task manager configuration
542
public class TaskManagerConfiguration {
543
public static TaskManagerConfiguration fromConfiguration(
544
Configuration configuration,
545
TaskManagerOptions.TaskManagerLoadBalanceMode loadBalanceMode,
546
WorkerResourceSpec workerResourceSpec,
547
InetAddress remoteAddress,
548
boolean localCommunicationOnly
549
);
550
551
public String getTmpDirectoryPath();
552
public Time getTaskCancellationTimeout();
553
public Time getTaskCancellationInterval();
554
public Duration getSlotTimeout();
555
public boolean isExitJvmOnOutOfMemoryError();
556
public float getNetworkBuffersMemoryFraction();
557
public int getNetworkBuffersMemoryMin();
558
public int getNetworkBuffersMemoryMax();
559
public int getNetworkBuffersPerChannel();
560
public int getFloatingNetworkBuffersPerGate();
561
public Duration getPartitionRequestInitialBackoff();
562
public Duration getPartitionRequestMaxBackoff();
563
public int getNetworkRequestBackoffMultiplier();
564
}
565
566
// Hardware and memory descriptions
567
public class HardwareDescription implements Serializable {
568
public HardwareDescription(
569
int numberOfCPUCores,
570
long sizeOfPhysicalMemory,
571
long sizeOfJvmHeap,
572
long sizeOfJvmDirectMemory
573
);
574
575
public int getNumberOfCPUCores();
576
public long getSizeOfPhysicalMemory();
577
public long getSizeOfJvmHeap();
578
public long getSizeOfJvmDirectMemory();
579
}
580
581
public class TaskExecutorMemoryConfiguration {
582
public MemorySize getFrameworkHeapSize();
583
public MemorySize getFrameworkOffHeapSize();
584
public MemorySize getTaskHeapSize();
585
public MemorySize getTaskOffHeapSize();
586
public MemorySize getNetworkMemorySize();
587
public MemorySize getManagedMemorySize();
588
public MemorySize getJvmMetaspaceSize();
589
public MemorySize getJvmOverheadSize();
590
public MemorySize getTotalFlinkMemorySize();
591
public MemorySize getTotalProcessMemorySize();
592
}
593
```