or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

execution-scheduling.mdhigh-availability.mdindex.mdjob-graph.mdresource-management.mdstate-checkpointing.mdtask-execution.md

task-execution.mddocs/

0

# Task Execution

1

2

Distributed task execution engine with slot management and lifecycle coordination. TaskExecutors are the worker processes responsible for executing individual tasks and managing computational slots.

3

4

## Capabilities

5

6

### TaskExecutorGateway

7

8

RPC gateway interface for communication with TaskExecutor instances, enabling task deployment, slot management, and coordination.

9

10

```java { .api }

11

/**

12

* TaskExecutor RPC gateway interface for remote communication with TaskExecutor instances.

13

* Provides methods for slot management, task lifecycle, and coordination.

14

*/

15

public interface TaskExecutorGateway

16

extends RpcGateway, TaskExecutorOperatorEventGateway, TaskExecutorThreadInfoGateway {

17

18

/** Request a slot from the TaskManager */

19

CompletableFuture<Acknowledge> requestSlot(

20

SlotID slotId,

21

JobID jobId,

22

AllocationID allocationId,

23

ResourceProfile resourceProfile,

24

String targetAddress,

25

ResourceManagerId resourceManagerId,

26

Duration timeout

27

);

28

29

/** Submit a task for execution */

30

CompletableFuture<Acknowledge> submitTask(

31

TaskDeploymentDescriptor tdd,

32

JobMasterId jobMasterId,

33

Duration timeout

34

);

35

36

/** Update partitions for a task */

37

CompletableFuture<Acknowledge> updatePartitions(

38

ExecutionAttemptID executionAttemptID,

39

Iterable<PartitionInfo> partitionInfos,

40

Duration timeout

41

);

42

43

/** Release partitions */

44

void releasePartitions(

45

JobID jobId,

46

Set<ResultPartitionID> partitionIds

47

);

48

49

/** Promote partitions */

50

CompletableFuture<Acknowledge> promotePartitions(

51

JobID jobId,

52

Set<ResultPartitionID> partitionIds,

53

Duration timeout

54

);

55

56

/** Cancel a task */

57

CompletableFuture<Acknowledge> cancelTask(

58

ExecutionAttemptID executionAttemptID,

59

Duration timeout

60

);

61

62

/** Trigger checkpoint for specific task */

63

CompletableFuture<Acknowledge> triggerCheckpoint(

64

ExecutionAttemptID executionAttemptID,

65

long checkpointId,

66

long checkpointTimestamp,

67

CheckpointOptions checkpointOptions,

68

Duration timeout

69

);

70

71

/** Confirm checkpoint complete */

72

CompletableFuture<Acknowledge> confirmCheckpoint(

73

ExecutionAttemptID executionAttemptID,

74

long checkpointId,

75

long checkpointTimestamp,

76

Duration timeout

77

);

78

79

/** Abort checkpoint for specific task */

80

CompletableFuture<Acknowledge> abortCheckpoint(

81

ExecutionAttemptID executionAttemptID,

82

long checkpointId,

83

long checkpointTimestamp,

84

Duration timeout

85

);

86

87

/** Free allocated slot */

88

CompletableFuture<Acknowledge> freeSlot(

89

AllocationID allocationId,

90

Throwable cause,

91

Duration timeout

92

);

93

94

/** Request slot report */

95

CompletableFuture<SlotReport> requestSlotReport(Duration timeout);

96

97

/** Heartbeat from JobManager */

98

void heartbeatFromJobManager(

99

ResourceID resourceID,

100

AllocatedSlotReport allocatedSlotReport

101

);

102

103

/** Heartbeat from ResourceManager */

104

void heartbeatFromResourceManager(ResourceID resourceID);

105

106

/** Disconnect JobManager */

107

void disconnectJobManager(JobID jobId, Exception cause);

108

109

/** Disconnect ResourceManager */

110

void disconnectResourceManager(Exception cause);

111

112

/** Request thread dump info */

113

CompletableFuture<ThreadDumpInfo> requestThreadDump(Duration timeout);

114

115

/** Request profiling info */

116

CompletableFuture<Collection<ProfilingInfo>> requestProfiling(

117

Duration timeout,

118

ProfilingMode mode,

119

Duration profilingDuration

120

);

121

122

/** Request log list */

123

CompletableFuture<Collection<LogInfo>> requestLogList(Duration timeout);

124

125

/** Request specific log file */

126

CompletableFuture<TransientBlobKey> requestFileUpload(

127

FileType fileType,

128

Duration timeout

129

);

130

}

131

```

132

133

**Usage Examples:**

134

135

```java

136

// Request slot allocation

137

CompletableFuture<Acknowledge> slotFuture = taskExecutorGateway.requestSlot(

138

new SlotID(resourceId, 0), // slot ID

139

jobId, // job ID

140

allocationId, // allocation ID

141

ResourceProfile.fromResources(2.0, 1024), // resource requirements

142

jobMasterAddress, // target address

143

resourceManagerId, // resource manager ID

144

Duration.ofMinutes(1) // timeout

145

);

146

147

// Submit task for execution

148

TaskDeploymentDescriptor descriptor = new TaskDeploymentDescriptor(

149

jobInformation,

150

taskInformation,

151

executionAttemptId,

152

allocationId,

153

subpartitionIndexRange,

154

targetSlotNumber,

155

taskStateSnapshot,

156

inputGateDeploymentDescriptors,

157

resultPartitionDeploymentDescriptors

158

);

159

160

CompletableFuture<Acknowledge> submitFuture = taskExecutorGateway.submitTask(

161

descriptor,

162

jobMasterId,

163

Duration.ofMinutes(5)

164

);

165

166

// Trigger checkpoint

167

CompletableFuture<Acknowledge> checkpointFuture = taskExecutorGateway.triggerCheckpoint(

168

executionAttemptId,

169

checkpointId,

170

System.currentTimeMillis(),

171

CheckpointOptions.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),

172

Duration.ofSeconds(30)

173

);

174

```

175

176

### TaskExecutor

177

178

Main TaskExecutor implementation responsible for executing tasks and managing computational slots on worker nodes.

179

180

```java { .api }

181

/**

182

* TaskExecutor implementation. The TaskExecutor is responsible for the execution of multiple

183

* tasks and manages slots. It offers the slots to the ResourceManager and executes

184

* tasks when the JobManager requests it.

185

*/

186

public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> implements TaskExecutorGateway {

187

/** Start the TaskExecutor service */

188

public void start();

189

190

/** Get the TaskExecutor's resource ID */

191

public ResourceID getResourceID();

192

193

/** Get address of this TaskExecutor */

194

public String getAddress();

195

196

/** Get data port of this TaskExecutor */

197

public int getDataPort();

198

199

/** Get number of slots */

200

public int getNumberOfSlots();

201

202

/** Get hardware description */

203

public HardwareDescription getHardwareDescription();

204

205

/** Get memory configuration */

206

public TaskExecutorMemoryConfiguration getMemoryConfiguration();

207

208

/** Get network environment */

209

public NetworkEnvironment getNetworkEnvironment();

210

211

/** Get blob cache service */

212

public BlobCacheService getBlobCacheService();

213

214

/** Get slot table */

215

public TaskSlotTable<Task> getSlotTable();

216

217

/** Get job table */

218

public JobTable getJobTable();

219

220

/** Get job leader service */

221

public JobLeaderService getJobLeaderService();

222

223

/** Connect to ResourceManager */

224

public void connectToResourceManager(

225

ResourceManagerAddress resourceManagerAddress,

226

ResourceID resourceManagerResourceId

227

);

228

229

/** Establish job manager connection */

230

public void establishedJobManagerConnection(

231

JobID jobId,

232

JobMasterGateway jobMasterGateway,

233

TaskManagerActions taskManagerActions

234

);

235

236

/** Close job manager connection */

237

public void closeJobManagerConnection(JobID jobId, Exception cause);

238

239

/** Get current task slot utilization */

240

public SlotReport getCurrentSlotReport();

241

242

/** Register timeout for slot */

243

public void scheduleSlotTimeout(AllocationID allocationId, Duration timeout);

244

}

245

```

246

247

### TaskManagerServices

248

249

Container for essential TaskManager services including network, memory management, and I/O.

250

251

```java { .api }

252

/**

253

* Container for the TaskManager services like network environment, memory manager,

254

* IOManager, and related components.

255

*/

256

public class TaskManagerServices {

257

/** Get the task manager configuration */

258

public TaskManagerServicesConfiguration getTaskManagerServicesConfiguration();

259

260

/** Get the network environment */

261

public NetworkEnvironment getNetworkEnvironment();

262

263

/** Get the shuffle environment */

264

public ShuffleEnvironment<?, ?> getShuffleEnvironment();

265

266

/** Get the KvState service */

267

public KvStateService getKvStateService();

268

269

/** Get the broadcast variable manager */

270

public BroadcastVariableManager getBroadcastVariableManager();

271

272

/** Get the task slot table */

273

public TaskSlotTable<Task> getTaskSlotTable();

274

275

/** Get the job table */

276

public JobTable getJobTable();

277

278

/** Get the job leader service */

279

public JobLeaderService getJobLeaderService();

280

281

/** Get the task state manager */

282

public TaskExecutorLocalStateStoresManager getTaskManagerStateStore();

283

284

/** Get the memory manager */

285

public MemoryManager getMemoryManager();

286

287

/** Get the IO manager */

288

public IOManager getIOManager();

289

290

/** Get the libraries cache manager */

291

public BlobCacheService getLibraryCacheManager();

292

293

/** Get the task manager metric group */

294

public TaskManagerMetricGroup getTaskManagerMetricGroup();

295

296

/** Get the executor service for async operations */

297

public Executor getIOExecutor();

298

299

/** Get the fatal error handler */

300

public FatalErrorHandler getFatalErrorHandler();

301

302

/** Get the partition tracker */

303

public TaskExecutorPartitionTracker getPartitionTracker();

304

305

/** Get the backpressure sample service */

306

public BackPressureSampleService getBackPressureSampleService();

307

308

/** Shutdown all services */

309

public void shutDown();

310

}

311

```

312

313

### SlotReport

314

315

Report containing information about the status and allocation of slots on a TaskExecutor.

316

317

```java { .api }

318

/**

319

* A slot report contains information about which slots are available and allocated

320

* on a TaskManager.

321

*/

322

public class SlotReport implements Serializable {

323

/** Create a new slot report */

324

public SlotReport();

325

326

/** Create slot report with initial slots */

327

public SlotReport(Collection<SlotStatus> slotStatuses);

328

329

/** Add slot status to the report */

330

public void addSlotStatus(SlotStatus slotStatus);

331

332

/** Get all slot statuses */

333

public Collection<SlotStatus> getSlotsStatus();

334

335

/** Get number of slots */

336

public int getNumSlotStatus();

337

338

/** Check if report is empty */

339

public boolean isEmpty();

340

341

/** Get iterator over slot statuses */

342

public Iterator<SlotStatus> iterator();

343

}

344

345

/**

346

* Status of a single slot on a TaskManager

347

*/

348

public class SlotStatus implements Serializable {

349

/** Create slot status */

350

public SlotStatus(

351

SlotID slotID,

352

ResourceProfile resourceProfile,

353

JobID jobID,

354

AllocationID allocationID

355

);

356

357

/** Get slot ID */

358

public SlotID getSlotID();

359

360

/** Get resource profile of this slot */

361

public ResourceProfile getResourceProfile();

362

363

/** Get job ID if slot is allocated */

364

public JobID getJobID();

365

366

/** Get allocation ID if slot is allocated */

367

public AllocationID getAllocationID();

368

369

/** Check if slot is allocated */

370

public boolean isAllocated();

371

}

372

```

373

374

### TaskDeploymentDescriptor

375

376

Complete descriptor containing all information necessary to deploy and execute a task on a TaskExecutor.

377

378

```java { .api }

379

/**

380

* A TaskDeploymentDescriptor contains all the information necessary to deploy a task

381

* on a TaskManager.

382

*/

383

public class TaskDeploymentDescriptor implements Serializable {

384

/** Create task deployment descriptor */

385

public TaskDeploymentDescriptor(

386

JobInformation jobInformation,

387

TaskInformation taskInformation,

388

ExecutionAttemptID executionAttemptID,

389

AllocationID allocationID,

390

SubpartitionIndexRange subpartitionIndexRange,

391

int targetSlotNumber,

392

TaskStateSnapshot taskStateSnapshot,

393

List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,

394

List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors

395

);

396

397

/** Get job information */

398

public JobInformation getJobInformation();

399

400

/** Get task information */

401

public TaskInformation getTaskInformation();

402

403

/** Get execution attempt ID */

404

public ExecutionAttemptID getExecutionAttemptID();

405

406

/** Get allocation ID for the slot */

407

public AllocationID getAllocationID();

408

409

/** Get subpartition index range */

410

public SubpartitionIndexRange getSubpartitionIndexRange();

411

412

/** Get target slot number */

413

public int getTargetSlotNumber();

414

415

/** Get task state snapshot for recovery */

416

public TaskStateSnapshot getTaskStateSnapshot();

417

418

/** Get input gate deployment descriptors */

419

public List<InputGateDeploymentDescriptor> getInputGateDeploymentDescriptors();

420

421

/** Get result partition deployment descriptors */

422

public List<ResultPartitionDeploymentDescriptor> getResultPartitionDeploymentDescriptors();

423

424

/** Get produced partition IDs */

425

public Collection<ResultPartitionID> getProducedPartitions();

426

427

/** Get consumed partition IDs */

428

public Collection<ResultPartitionID> getConsumedPartitions();

429

430

/** Get job ID */

431

public JobID getJobId();

432

433

/** Get job vertex ID */

434

public JobVertexID getJobVertexId();

435

436

/** Get attempt number */

437

public int getAttemptNumber();

438

439

/** Get subtask index */

440

public int getSubtaskIndex();

441

}

442

```

443

444

**Usage Examples:**

445

446

```java

447

// Create and configure TaskManager services

448

TaskManagerServicesConfiguration serviceConfig =

449

TaskManagerServicesConfiguration.fromConfiguration(

450

configuration,

451

resourceId,

452

externalAddress,

453

localCommunicationOnly,

454

taskManagerMetricGroup,

455

tmpDirPaths

456

);

457

458

TaskManagerServices taskManagerServices = TaskManagerServices.createTaskManagerServices(

459

serviceConfig,

460

resourceId,

461

rpcService,

462

highAvailabilityServices,

463

heartbeatServices,

464

metricRegistry,

465

blobCacheService,

466

localRecoveryDirectoryProvider,

467

fatalErrorHandler

468

);

469

470

// Create and start TaskExecutor

471

TaskExecutor taskExecutor = new TaskExecutor(

472

rpcService,

473

taskManagerConfiguration,

474

haServices,

475

taskManagerServices,

476

externalResourceInfoProvider,

477

heartbeatServices,

478

tokenManager,

479

aggregateManager,

480

fatalErrorHandler

481

);

482

483

taskExecutor.start();

484

485

// Monitor slot utilization

486

SlotReport slotReport = taskExecutor.getCurrentSlotReport();

487

for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {

488

System.out.println("Slot " + slotStatus.getSlotID() +

489

" allocated: " + slotStatus.isAllocated());

490

if (slotStatus.isAllocated()) {

491

System.out.println(" Job: " + slotStatus.getJobID());

492

System.out.println(" Allocation: " + slotStatus.getAllocationID());

493

}

494

}

495

```

496

497

## Types

498

499

```java { .api }

500

// Slot and allocation identifiers

501

public class SlotID implements Serializable {

502

public SlotID(ResourceID resourceId, int slotNumber);

503

504

public ResourceID getResourceID();

505

public int getSlotNumber();

506

}

507

508

public class AllocationID implements Serializable {

509

public AllocationID();

510

public AllocationID(byte[] bytes);

511

public static AllocationID generate();

512

513

public byte[] getBytes();

514

}

515

516

// Resource specifications

517

public class ResourceID implements Serializable {

518

public ResourceID(String resourceId);

519

public static ResourceID generate();

520

521

public String getResourceIdString();

522

public String getStringWithMetadata();

523

}

524

525

// Task execution states

526

public enum ExecutionState {

527

CREATED,

528

SCHEDULED,

529

DEPLOYING,

530

INITIALIZING,

531

RUNNING,

532

FINISHED,

533

CANCELING,

534

CANCELED,

535

FAILED;

536

537

public boolean isTerminal();

538

public boolean isRunning();

539

}

540

541

// Task manager configuration

542

public class TaskManagerConfiguration {

543

public static TaskManagerConfiguration fromConfiguration(

544

Configuration configuration,

545

TaskManagerOptions.TaskManagerLoadBalanceMode loadBalanceMode,

546

WorkerResourceSpec workerResourceSpec,

547

InetAddress remoteAddress,

548

boolean localCommunicationOnly

549

);

550

551

public String getTmpDirectoryPath();

552

public Time getTaskCancellationTimeout();

553

public Time getTaskCancellationInterval();

554

public Duration getSlotTimeout();

555

public boolean isExitJvmOnOutOfMemoryError();

556

public float getNetworkBuffersMemoryFraction();

557

public int getNetworkBuffersMemoryMin();

558

public int getNetworkBuffersMemoryMax();

559

public int getNetworkBuffersPerChannel();

560

public int getFloatingNetworkBuffersPerGate();

561

public Duration getPartitionRequestInitialBackoff();

562

public Duration getPartitionRequestMaxBackoff();

563

public int getNetworkRequestBackoffMultiplier();

564

}

565

566

// Hardware and memory descriptions

567

public class HardwareDescription implements Serializable {

568

public HardwareDescription(

569

int numberOfCPUCores,

570

long sizeOfPhysicalMemory,

571

long sizeOfJvmHeap,

572

long sizeOfJvmDirectMemory

573

);

574

575

public int getNumberOfCPUCores();

576

public long getSizeOfPhysicalMemory();

577

public long getSizeOfJvmHeap();

578

public long getSizeOfJvmDirectMemory();

579

}

580

581

public class TaskExecutorMemoryConfiguration {

582

public MemorySize getFrameworkHeapSize();

583

public MemorySize getFrameworkOffHeapSize();

584

public MemorySize getTaskHeapSize();

585

public MemorySize getTaskOffHeapSize();

586

public MemorySize getNetworkMemorySize();

587

public MemorySize getManagedMemorySize();

588

public MemorySize getJvmMetaspaceSize();

589

public MemorySize getJvmOverheadSize();

590

public MemorySize getTotalFlinkMemorySize();

591

public MemorySize getTotalProcessMemorySize();

592

}

593

```