or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

execution-scheduling.mdhigh-availability.mdindex.mdjob-graph.mdresource-management.mdstate-checkpointing.mdtask-execution.md

resource-management.mddocs/

0

# Resource Management

1

2

Cluster resource allocation and TaskExecutor lifecycle management for different deployment environments. The ResourceManager is responsible for managing cluster resources, allocating slots, and coordinating with TaskExecutors.

3

4

## Capabilities

5

6

### ResourceManagerGateway

7

8

RPC gateway interface for communication with ResourceManager, handling job registration, slot allocation, and cluster coordination.

9

10

```java { .api }

11

/**

12

* The ResourceManager's RPC gateway interface for communication with JobMasters

13

* and TaskExecutors.

14

*/

15

public interface ResourceManagerGateway

16

extends FencedRpcGateway<ResourceManagerId>, ClusterPartitionManager, BlocklistListener {

17

18

/** Register a JobMaster with the ResourceManager */

19

CompletableFuture<RegistrationResponse> registerJobMaster(

20

JobMasterId jobMasterId,

21

ResourceID jobMasterResourceId,

22

String jobMasterAddress,

23

JobID jobId,

24

Duration timeout

25

);

26

27

/** Register a TaskExecutor with the ResourceManager */

28

CompletableFuture<RegistrationResponse> registerTaskExecutor(

29

String taskExecutorAddress,

30

ResourceID resourceId,

31

SlotReport slotReport,

32

ResourceProfile totalResourceProfile,

33

Duration timeout

34

);

35

36

/** Send slot report from TaskExecutor */

37

CompletableFuture<Acknowledge> sendSlotReport(

38

ResourceID taskManagerResourceId,

39

InstanceID taskManagerRegistrationId,

40

SlotReport slotReport,

41

Duration timeout

42

);

43

44

/** Request slot allocation */

45

CompletableFuture<Acknowledge> requestSlot(

46

JobMasterId jobMasterId,

47

SlotRequest slotRequest,

48

Duration timeout

49

);

50

51

/** Cancel slot request */

52

void cancelSlotRequest(SlotRequestId slotRequestId);

53

54

/** Notify slot available */

55

CompletableFuture<Acknowledge> notifySlotAvailable(

56

InstanceID instanceID,

57

SlotID slotId,

58

AllocationID allocationId

59

);

60

61

/** Deregister application (for per-job clusters) */

62

CompletableFuture<Acknowledge> deregisterApplication(

63

ApplicationStatus finalStatus,

64

String diagnostics

65

);

66

67

/** Get number of registered task managers */

68

CompletableFuture<Integer> getNumberOfRegisteredTaskManagers();

69

70

/** Heartbeat from JobMaster */

71

void heartbeatFromJobManager(ResourceID resourceID, JobMasterIdWithResourceRequirements heartbeatPayload);

72

73

/** Heartbeat from TaskExecutor */

74

void heartbeatFromTaskManager(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload);

75

76

/** Disconnect JobMaster */

77

void disconnectJobManager(JobID jobId, Exception cause);

78

79

/** Disconnect TaskExecutor */

80

void disconnectTaskManager(ResourceID resourceId, Exception cause);

81

82

/** Request thread dump from TaskExecutor */

83

CompletableFuture<ThreadDumpInfo> requestThreadDump(

84

ResourceID taskManagerId,

85

Duration timeout

86

);

87

88

/** Request profiling from TaskExecutor */

89

CompletableFuture<Collection<ProfilingInfo>> requestProfiling(

90

ResourceID taskManagerId,

91

Duration timeout,

92

ProfilingMode mode,

93

Duration profilingDuration

94

);

95

96

/** Request resource overview */

97

CompletableFuture<ResourceOverview> requestResourceOverview(Duration timeout);

98

99

/** Request task manager info */

100

CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Duration timeout);

101

102

/** Request detailed task manager info */

103

CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo(

104

ResourceID taskManagerId,

105

Duration timeout

106

);

107

}

108

```

109

110

**Usage Examples:**

111

112

```java

113

// Register JobMaster with ResourceManager

114

CompletableFuture<RegistrationResponse> registrationFuture =

115

resourceManagerGateway.registerJobMaster(

116

jobMasterId,

117

jobMasterResourceId,

118

jobMasterAddress,

119

jobId,

120

Duration.ofMinutes(1)

121

);

122

123

registrationFuture.thenAccept(response -> {

124

if (response instanceof JobMasterRegistrationSuccess) {

125

JobMasterRegistrationSuccess success = (JobMasterRegistrationSuccess) response;

126

System.out.println("JobMaster registered successfully");

127

System.out.println("ResourceManager ID: " + success.getResourceManagerId());

128

} else {

129

System.out.println("Registration failed: " + response);

130

}

131

});

132

133

// Request slot allocation

134

SlotRequest slotRequest = new SlotRequest(

135

jobId,

136

allocationId,

137

resourceProfile,

138

targetAddress,

139

resourceManagerId

140

);

141

142

CompletableFuture<Acknowledge> slotFuture = resourceManagerGateway.requestSlot(

143

jobMasterId,

144

slotRequest,

145

Duration.ofMinutes(2)

146

);

147

148

// Get cluster resource overview

149

CompletableFuture<ResourceOverview> overviewFuture =

150

resourceManagerGateway.requestResourceOverview(Duration.ofSeconds(30));

151

152

overviewFuture.thenAccept(overview -> {

153

System.out.println("Available slots: " + overview.getNumberOfAvailableSlots());

154

System.out.println("Total slots: " + overview.getNumberOfTotalSlots());

155

System.out.println("Free slots: " + overview.getNumberOfFreeSlots());

156

});

157

```

158

159

### ResourceManager

160

161

Base class for resource managers handling resource allocation and TaskExecutor lifecycle management.

162

163

```java { .api }

164

/**

165

* ResourceManager implementation. The ResourceManager is responsible for resource allocation

166

* and bookkeeping. It offers unused slots to the JobManager and keeps track of which

167

* slots are available, and which slots are being used.

168

*/

169

public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>

170

extends FencedRpcEndpoint<ResourceManagerId> implements ResourceManagerGateway {

171

172

/** Start the ResourceManager service */

173

public void start();

174

175

/** Get the ResourceManager's resource ID */

176

public ResourceID getResourceId();

177

178

/** Get the ResourceManager's address */

179

public String getAddress();

180

181

/** Get the ResourceManager's ID */

182

public ResourceManagerId getFencingToken();

183

184

/** Get cluster information */

185

public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Duration timeout);

186

187

/** Get resource overview */

188

public CompletableFuture<ResourceOverview> requestResourceOverview(Duration timeout);

189

190

/** Start new worker */

191

protected abstract CompletableFuture<WorkerType> requestNewWorker(WorkerResourceSpec workerResourceSpec);

192

193

/** Stop worker */

194

protected abstract void stopWorker(WorkerType worker);

195

196

/** Get worker resource specification factory */

197

protected abstract WorkerResourceSpecFactory getWorkerResourceSpecFactory();

198

199

/** Get number of slots per worker */

200

protected abstract int getNumberSlotsPerWorker();

201

202

/** Initialize the ResourceManager */

203

protected void initialize();

204

205

/** Terminate the ResourceManager */

206

protected void terminate();

207

208

/** Handle new TaskExecutor registration */

209

protected void onTaskExecutorRegistration(TaskExecutorConnection taskExecutorConnection);

210

211

/** Handle TaskExecutor disconnection */

212

protected void onTaskExecutorDisconnection(ResourceID resourceId, Exception cause);

213

214

/** Handle slot report from TaskExecutor */

215

protected void onSlotReport(ResourceID resourceId, SlotReport slotReport);

216

217

/** Handle heartbeat from TaskExecutor */

218

protected void onTaskExecutorHeartbeat(ResourceID resourceId, TaskExecutorHeartbeatPayload heartbeatPayload);

219

220

/** Handle job leader notification */

221

protected void onJobLeaderIdChanged(JobID jobId, JobMasterId newJobMasterId);

222

223

/** Get slot manager */

224

protected SlotManager getSlotManager();

225

226

/** Get job leader id service */

227

protected JobLeaderIdService getJobLeaderIdService();

228

229

/** Get cluster partition manager */

230

protected ClusterPartitionManager getClusterPartitionManager();

231

}

232

```

233

234

### WorkerResourceSpec

235

236

Specification of computational resources required for a worker/TaskExecutor instance.

237

238

```java { .api }

239

/**

240

* Specification of worker resources. This class describes the resources of a worker,

241

* including CPU, memory, and other computational resources.

242

*/

243

public class WorkerResourceSpec implements Serializable {

244

/** Create worker resource spec from total resource profile */

245

public static WorkerResourceSpec fromTotalResourceProfile(

246

ResourceProfile totalResourceProfile,

247

MemorySize networkMemorySize

248

);

249

250

/** Create worker resource spec with specific memory allocations */

251

public static WorkerResourceSpec fromTaskExecutorResourceSpec(

252

CPUResource cpuCores,

253

MemorySize taskHeapSize,

254

MemorySize taskOffHeapSize,

255

MemorySize networkMemorySize,

256

MemorySize managedMemorySize

257

);

258

259

/** Get total resource profile */

260

public ResourceProfile getTotalResourceProfile();

261

262

/** Get CPU cores */

263

public CPUResource getCpuCores();

264

265

/** Get task heap memory size */

266

public MemorySize getTaskHeapSize();

267

268

/** Get task off-heap memory size */

269

public MemorySize getTaskOffHeapSize();

270

271

/** Get network memory size */

272

public MemorySize getNetworkMemSize();

273

274

/** Get managed memory size */

275

public MemorySize getManagedMemSize();

276

277

/** Get JVM heap memory size (framework + task heap) */

278

public MemorySize getJvmHeapMemorySize();

279

280

/** Get total memory size */

281

public MemorySize getTotalMemSize();

282

283

/** Get number of slots this worker can provide */

284

public int getNumSlots();

285

286

/** Check equality with another spec */

287

public boolean equals(Object obj);

288

289

/** Get hash code */

290

public int hashCode();

291

292

/** Convert to string representation */

293

public String toString();

294

}

295

```

296

297

### SlotManager

298

299

Central component for managing slot allocation and TaskExecutor coordination within the ResourceManager.

300

301

```java { .api }

302

/**

303

* The slot manager is responsible for maintaining a view on all registered task managers

304

* and their available slots. It offers unused slots to the slot pool and keeps track of

305

* allocations and deallocations.

306

*/

307

public interface SlotManager extends AutoCloseable {

308

/** Start the slot manager */

309

void start(

310

ResourceManagerId newResourceManagerId,

311

Executor newMainThreadExecutor,

312

ResourceActions resourceActions

313

);

314

315

/** Suspend the slot manager */

316

void suspend();

317

318

/** Register task manager with slot manager */

319

boolean registerTaskManager(

320

TaskExecutorConnection taskExecutorConnection,

321

SlotReport initialSlotReport,

322

ResourceProfile totalResourceProfile,

323

ResourceProfile defaultSlotResourceProfile

324

);

325

326

/** Unregister task manager */

327

boolean unregisterTaskManager(InstanceID instanceId, Exception cause);

328

329

/** Report slot status from task manager */

330

boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport);

331

332

/** Request slot allocation */

333

CompletableFuture<Void> requestResource(ResourceRequirements resourceRequirements);

334

335

/** Cancel slot request */

336

void cancelResourceRequirements(JobID jobId);

337

338

/** Free slot */

339

void freeSlot(SlotID slotId, AllocationID allocationId);

340

341

/** Get number of registered slots */

342

int getNumberRegisteredSlots();

343

344

/** Get number of registered slots of specific job */

345

int getNumberRegisteredSlotsOf(InstanceID instanceId);

346

347

/** Get number of free slots */

348

int getNumberFreeSlots();

349

350

/** Get number of free slots of specific job */

351

int getNumberFreeSlotsOf(InstanceID instanceId);

352

353

/** Get registered task managers */

354

Collection<TaskManagerInfo> getRegisteredTaskManagers();

355

356

/** Get task manager info */

357

Optional<TaskManagerInfo> getTaskManagerInfo(InstanceID instanceId);

358

359

/** Set failing allocated slot */

360

boolean setFailingAllocatedSlot(SlotID slotId);

361

362

/** Clear slot for task manager */

363

void clearSlotFor(SlotID slotId);

364

365

/** Get allocations for job */

366

ResourceRequirements getAllocations(JobID jobId);

367

368

/** Close the slot manager */

369

void close();

370

}

371

```

372

373

### ResourceOverview

374

375

Summary of cluster resource utilization and availability.

376

377

```java { .api }

378

/**

379

* An overview over the resources in the cluster.

380

*/

381

public class ResourceOverview implements Serializable {

382

/** Create resource overview */

383

public ResourceOverview(

384

int numberOfTaskManagers,

385

int numberOfAvailableSlots,

386

int numberOfTotalSlots,

387

ResourceProfile availableResourceProfile,

388

ResourceProfile totalResourceProfile

389

);

390

391

/** Get number of registered task managers */

392

public int getNumberOfTaskManagers();

393

394

/** Get number of available slots */

395

public int getNumberOfAvailableSlots();

396

397

/** Get number of total slots */

398

public int getNumberOfTotalSlots();

399

400

/** Get number of free slots */

401

public int getNumberOfFreeSlots();

402

403

/** Get available resource profile */

404

public ResourceProfile getAvailableResource();

405

406

/** Get total resource profile */

407

public ResourceProfile getTotalResource();

408

409

/** Check if cluster has sufficient resources */

410

public boolean hasSufficientResources(ResourceProfile requiredResources);

411

412

/** Get resource utilization ratio */

413

public double getUtilizationRatio();

414

}

415

```

416

417

### StandaloneResourceManager

418

419

ResourceManager implementation for standalone Flink deployments without external resource orchestration.

420

421

```java { .api }

422

/**

423

* ResourceManager for standalone Flink deployments. In standalone mode,

424

* TaskExecutors are started manually and register themselves with the ResourceManager.

425

*/

426

public class StandaloneResourceManager extends ResourceManager<ResourceID> {

427

/** Create standalone resource manager */

428

public StandaloneResourceManager(

429

RpcService rpcService,

430

ResourceManagerConfiguration resourceManagerConfiguration,

431

HighAvailabilityServices highAvailabilityServices,

432

SlotManager slotManager,

433

ResourceManagerPartitionTracker partitionTracker,

434

BlocklistHandler.Factory blocklistHandlerFactory,

435

JobLeaderIdService jobLeaderIdService,

436

ClusterInformation clusterInformation,

437

FatalErrorHandler fatalErrorHandler,

438

ResourceManagerMetricGroup resourceManagerMetricGroup,

439

Time rpcTimeout,

440

Time previousAttemptTimeout

441

);

442

443

/** Initialize the resource manager */

444

protected void initialize();

445

446

/** Terminate the resource manager */

447

protected void terminate();

448

449

/** Request new worker (not supported in standalone mode) */

450

protected CompletableFuture<ResourceID> requestNewWorker(WorkerResourceSpec workerResourceSpec);

451

452

/** Stop worker (not supported in standalone mode) */

453

protected void stopWorker(ResourceID worker);

454

455

/** Get worker resource spec factory */

456

protected WorkerResourceSpecFactory getWorkerResourceSpecFactory();

457

458

/** Get number of slots per worker */

459

protected int getNumberSlotsPerWorker();

460

}

461

```

462

463

**Usage Examples:**

464

465

```java

466

// Configure resource manager

467

ResourceManagerConfiguration rmConfig = ResourceManagerConfiguration.fromConfiguration(

468

configuration,

469

ResourceID.fromString("resource-manager")

470

);

471

472

// Create slot manager

473

SlotManagerConfiguration slotManagerConfig = SlotManagerConfiguration.fromConfiguration(

474

configuration,

475

WorkerResourceSpec.fromTotalResourceProfile(

476

ResourceProfile.fromResources(4.0, 8192),

477

MemorySize.ofMebiBytes(1024)

478

)

479

);

480

481

SlotManager slotManager = SlotManagerBuilder

482

.newBuilder()

483

.setSlotManagerConfiguration(slotManagerConfig)

484

.setResourceManagerId(resourceManagerId)

485

.setMainThreadExecutor(mainThreadExecutor)

486

.setResourceActions(resourceActions)

487

.build();

488

489

// Create standalone resource manager

490

StandaloneResourceManager resourceManager = new StandaloneResourceManager(

491

rpcService,

492

rmConfig,

493

highAvailabilityServices,

494

slotManager,

495

partitionTracker,

496

blocklistHandlerFactory,

497

jobLeaderIdService,

498

clusterInformation,

499

fatalErrorHandler,

500

resourceManagerMetricGroup,

501

rpcTimeout,

502

previousAttemptTimeout

503

);

504

505

// Start resource manager

506

resourceManager.start();

507

508

// Monitor resource utilization

509

resourceManager.requestResourceOverview(Duration.ofSeconds(10))

510

.thenAccept(overview -> {

511

System.out.println("Cluster Overview:");

512

System.out.println(" Task Managers: " + overview.getNumberOfTaskManagers());

513

System.out.println(" Total Slots: " + overview.getNumberOfTotalSlots());

514

System.out.println(" Available Slots: " + overview.getNumberOfAvailableSlots());

515

System.out.println(" Utilization: " +

516

String.format("%.2f%%", overview.getUtilizationRatio() * 100));

517

});

518

```

519

520

## Types

521

522

```java { .api }

523

// Resource manager identifiers

524

public class ResourceManagerId implements Serializable {

525

public ResourceManagerId();

526

public ResourceManagerId(UUID uuid);

527

public static ResourceManagerId generate();

528

529

public UUID getUuid();

530

}

531

532

public class InstanceID implements Serializable {

533

public InstanceID();

534

public InstanceID(byte[] instanceId);

535

public static InstanceID generate();

536

537

public byte[] getBytes();

538

}

539

540

// Registration responses

541

public abstract class RegistrationResponse implements Serializable {

542

public abstract boolean isSuccess();

543

public abstract boolean isFailure();

544

}

545

546

public class JobMasterRegistrationSuccess extends RegistrationResponse {

547

public JobMasterRegistrationSuccess(ResourceManagerId resourceManagerId);

548

549

public ResourceManagerId getResourceManagerId();

550

public boolean isSuccess();

551

}

552

553

public class TaskExecutorRegistrationSuccess extends RegistrationResponse {

554

public TaskExecutorRegistrationSuccess(

555

InstanceID registrationId,

556

ResourceID resourceManagerResourceId,

557

ClusterInformation clusterInformation

558

);

559

560

public InstanceID getRegistrationId();

561

public ResourceID getResourceManagerResourceId();

562

public ClusterInformation getClusterInformation();

563

public boolean isSuccess();

564

}

565

566

public class RegistrationResponse.Failure extends RegistrationResponse {

567

public Failure(String reason);

568

569

public String getReason();

570

public boolean isFailure();

571

}

572

573

// Resource requirements and allocation

574

public class ResourceRequirements implements Serializable {

575

public static ResourceRequirements create(

576

JobID jobId,

577

String targetAddress,

578

Collection<ResourceRequirement> resourceRequirements

579

);

580

581

public JobID getJobId();

582

public String getTargetAddress();

583

public Collection<ResourceRequirement> getResourceRequirements();

584

public int getTotalRequiredResources();

585

}

586

587

public class ResourceRequirement implements Serializable {

588

public ResourceRequirement(ResourceProfile resourceProfile, int numberOfRequiredSlots);

589

590

public ResourceProfile getResourceProfile();

591

public int getNumberOfRequiredSlots();

592

}

593

594

// Slot requests

595

public class SlotRequest implements Serializable {

596

public SlotRequest(

597

JobID jobId,

598

AllocationID allocationId,

599

ResourceProfile resourceProfile,

600

String targetAddress,

601

ResourceManagerId resourceManagerId

602

);

603

604

public JobID getJobId();

605

public AllocationID getAllocationId();

606

public ResourceProfile getResourceProfile();

607

public String getTargetAddress();

608

public ResourceManagerId getResourceManagerId();

609

}

610

611

public class SlotRequestId implements Serializable {

612

public SlotRequestId();

613

public static SlotRequestId generate();

614

615

public UUID getUuid();

616

}

617

618

// Task manager information

619

public class TaskManagerInfo implements Serializable {

620

public TaskManagerInfo(

621

ResourceID resourceId,

622

String address,

623

int dataPort,

624

int jmxPort,

625

long lastHeartbeat,

626

int numberSlots,

627

int numberAvailableSlots,

628

ResourceProfile totalResource,

629

ResourceProfile availableResource,

630

HardwareDescription hardwareDescription,

631

TaskExecutorMemoryConfiguration memoryConfiguration

632

);

633

634

public ResourceID getResourceId();

635

public String getAddress();

636

public int getDataPort();

637

public long getLastHeartbeat();

638

public int getNumberSlots();

639

public int getNumberAvailableSlots();

640

public ResourceProfile getTotalResource();

641

public ResourceProfile getAvailableResource();

642

public HardwareDescription getHardwareDescription();

643

public TaskExecutorMemoryConfiguration getMemoryConfiguration();

644

}

645

646

// Application status for per-job clusters

647

public enum ApplicationStatus {

648

SUCCEEDED,

649

FAILED,

650

KILLED,

651

UNKNOWN

652

}

653

```