0
# Resource Management
1
2
Cluster resource allocation and TaskExecutor lifecycle management for different deployment environments. The ResourceManager is responsible for managing cluster resources, allocating slots, and coordinating with TaskExecutors.
3
4
## Capabilities
5
6
### ResourceManagerGateway
7
8
RPC gateway interface for communication with ResourceManager, handling job registration, slot allocation, and cluster coordination.
9
10
```java { .api }
11
/**
12
* The ResourceManager's RPC gateway interface for communication with JobMasters
13
* and TaskExecutors.
14
*/
15
public interface ResourceManagerGateway
16
extends FencedRpcGateway<ResourceManagerId>, ClusterPartitionManager, BlocklistListener {
17
18
/** Register a JobMaster with the ResourceManager */
19
CompletableFuture<RegistrationResponse> registerJobMaster(
20
JobMasterId jobMasterId,
21
ResourceID jobMasterResourceId,
22
String jobMasterAddress,
23
JobID jobId,
24
Duration timeout
25
);
26
27
/** Register a TaskExecutor with the ResourceManager */
28
CompletableFuture<RegistrationResponse> registerTaskExecutor(
29
String taskExecutorAddress,
30
ResourceID resourceId,
31
SlotReport slotReport,
32
ResourceProfile totalResourceProfile,
33
Duration timeout
34
);
35
36
/** Send slot report from TaskExecutor */
37
CompletableFuture<Acknowledge> sendSlotReport(
38
ResourceID taskManagerResourceId,
39
InstanceID taskManagerRegistrationId,
40
SlotReport slotReport,
41
Duration timeout
42
);
43
44
/** Request slot allocation */
45
CompletableFuture<Acknowledge> requestSlot(
46
JobMasterId jobMasterId,
47
SlotRequest slotRequest,
48
Duration timeout
49
);
50
51
/** Cancel slot request */
52
void cancelSlotRequest(SlotRequestId slotRequestId);
53
54
/** Notify slot available */
55
CompletableFuture<Acknowledge> notifySlotAvailable(
56
InstanceID instanceID,
57
SlotID slotId,
58
AllocationID allocationId
59
);
60
61
/** Deregister application (for per-job clusters) */
62
CompletableFuture<Acknowledge> deregisterApplication(
63
ApplicationStatus finalStatus,
64
String diagnostics
65
);
66
67
/** Get number of registered task managers */
68
CompletableFuture<Integer> getNumberOfRegisteredTaskManagers();
69
70
/** Heartbeat from JobMaster */
71
void heartbeatFromJobManager(ResourceID resourceID, JobMasterIdWithResourceRequirements heartbeatPayload);
72
73
/** Heartbeat from TaskExecutor */
74
void heartbeatFromTaskManager(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload);
75
76
/** Disconnect JobMaster */
77
void disconnectJobManager(JobID jobId, Exception cause);
78
79
/** Disconnect TaskExecutor */
80
void disconnectTaskManager(ResourceID resourceId, Exception cause);
81
82
/** Request thread dump from TaskExecutor */
83
CompletableFuture<ThreadDumpInfo> requestThreadDump(
84
ResourceID taskManagerId,
85
Duration timeout
86
);
87
88
/** Request profiling from TaskExecutor */
89
CompletableFuture<Collection<ProfilingInfo>> requestProfiling(
90
ResourceID taskManagerId,
91
Duration timeout,
92
ProfilingMode mode,
93
Duration profilingDuration
94
);
95
96
/** Request resource overview */
97
CompletableFuture<ResourceOverview> requestResourceOverview(Duration timeout);
98
99
/** Request task manager info */
100
CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Duration timeout);
101
102
/** Request detailed task manager info */
103
CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo(
104
ResourceID taskManagerId,
105
Duration timeout
106
);
107
}
108
```
109
110
**Usage Examples:**
111
112
```java
113
// Register JobMaster with ResourceManager
114
CompletableFuture<RegistrationResponse> registrationFuture =
115
resourceManagerGateway.registerJobMaster(
116
jobMasterId,
117
jobMasterResourceId,
118
jobMasterAddress,
119
jobId,
120
Duration.ofMinutes(1)
121
);
122
123
registrationFuture.thenAccept(response -> {
124
if (response instanceof JobMasterRegistrationSuccess) {
125
JobMasterRegistrationSuccess success = (JobMasterRegistrationSuccess) response;
126
System.out.println("JobMaster registered successfully");
127
System.out.println("ResourceManager ID: " + success.getResourceManagerId());
128
} else {
129
System.out.println("Registration failed: " + response);
130
}
131
});
132
133
// Request slot allocation
134
SlotRequest slotRequest = new SlotRequest(
135
jobId,
136
allocationId,
137
resourceProfile,
138
targetAddress,
139
resourceManagerId
140
);
141
142
CompletableFuture<Acknowledge> slotFuture = resourceManagerGateway.requestSlot(
143
jobMasterId,
144
slotRequest,
145
Duration.ofMinutes(2)
146
);
147
148
// Get cluster resource overview
149
CompletableFuture<ResourceOverview> overviewFuture =
150
resourceManagerGateway.requestResourceOverview(Duration.ofSeconds(30));
151
152
overviewFuture.thenAccept(overview -> {
153
System.out.println("Available slots: " + overview.getNumberOfAvailableSlots());
154
System.out.println("Total slots: " + overview.getNumberOfTotalSlots());
155
System.out.println("Free slots: " + overview.getNumberOfFreeSlots());
156
});
157
```
158
159
### ResourceManager
160
161
Base class for resource managers handling resource allocation and TaskExecutor lifecycle management.
162
163
```java { .api }
164
/**
165
* ResourceManager implementation. The ResourceManager is responsible for resource allocation
166
* and bookkeeping. It offers unused slots to the JobManager and keeps track of which
167
* slots are available, and which slots are being used.
168
*/
169
public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
170
extends FencedRpcEndpoint<ResourceManagerId> implements ResourceManagerGateway {
171
172
/** Start the ResourceManager service */
173
public void start();
174
175
/** Get the ResourceManager's resource ID */
176
public ResourceID getResourceId();
177
178
/** Get the ResourceManager's address */
179
public String getAddress();
180
181
/** Get the ResourceManager's ID */
182
public ResourceManagerId getFencingToken();
183
184
/** Get cluster information */
185
public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Duration timeout);
186
187
/** Get resource overview */
188
public CompletableFuture<ResourceOverview> requestResourceOverview(Duration timeout);
189
190
/** Start new worker */
191
protected abstract CompletableFuture<WorkerType> requestNewWorker(WorkerResourceSpec workerResourceSpec);
192
193
/** Stop worker */
194
protected abstract void stopWorker(WorkerType worker);
195
196
/** Get worker resource specification factory */
197
protected abstract WorkerResourceSpecFactory getWorkerResourceSpecFactory();
198
199
/** Get number of slots per worker */
200
protected abstract int getNumberSlotsPerWorker();
201
202
/** Initialize the ResourceManager */
203
protected void initialize();
204
205
/** Terminate the ResourceManager */
206
protected void terminate();
207
208
/** Handle new TaskExecutor registration */
209
protected void onTaskExecutorRegistration(TaskExecutorConnection taskExecutorConnection);
210
211
/** Handle TaskExecutor disconnection */
212
protected void onTaskExecutorDisconnection(ResourceID resourceId, Exception cause);
213
214
/** Handle slot report from TaskExecutor */
215
protected void onSlotReport(ResourceID resourceId, SlotReport slotReport);
216
217
/** Handle heartbeat from TaskExecutor */
218
protected void onTaskExecutorHeartbeat(ResourceID resourceId, TaskExecutorHeartbeatPayload heartbeatPayload);
219
220
/** Handle job leader notification */
221
protected void onJobLeaderIdChanged(JobID jobId, JobMasterId newJobMasterId);
222
223
/** Get slot manager */
224
protected SlotManager getSlotManager();
225
226
/** Get job leader id service */
227
protected JobLeaderIdService getJobLeaderIdService();
228
229
/** Get cluster partition manager */
230
protected ClusterPartitionManager getClusterPartitionManager();
231
}
232
```
233
234
### WorkerResourceSpec
235
236
Specification of computational resources required for a worker/TaskExecutor instance.
237
238
```java { .api }
239
/**
240
* Specification of worker resources. This class describes the resources of a worker,
241
* including CPU, memory, and other computational resources.
242
*/
243
public class WorkerResourceSpec implements Serializable {
244
/** Create worker resource spec from total resource profile */
245
public static WorkerResourceSpec fromTotalResourceProfile(
246
ResourceProfile totalResourceProfile,
247
MemorySize networkMemorySize
248
);
249
250
/** Create worker resource spec with specific memory allocations */
251
public static WorkerResourceSpec fromTaskExecutorResourceSpec(
252
CPUResource cpuCores,
253
MemorySize taskHeapSize,
254
MemorySize taskOffHeapSize,
255
MemorySize networkMemorySize,
256
MemorySize managedMemorySize
257
);
258
259
/** Get total resource profile */
260
public ResourceProfile getTotalResourceProfile();
261
262
/** Get CPU cores */
263
public CPUResource getCpuCores();
264
265
/** Get task heap memory size */
266
public MemorySize getTaskHeapSize();
267
268
/** Get task off-heap memory size */
269
public MemorySize getTaskOffHeapSize();
270
271
/** Get network memory size */
272
public MemorySize getNetworkMemSize();
273
274
/** Get managed memory size */
275
public MemorySize getManagedMemSize();
276
277
/** Get JVM heap memory size (framework + task heap) */
278
public MemorySize getJvmHeapMemorySize();
279
280
/** Get total memory size */
281
public MemorySize getTotalMemSize();
282
283
/** Get number of slots this worker can provide */
284
public int getNumSlots();
285
286
/** Check equality with another spec */
287
public boolean equals(Object obj);
288
289
/** Get hash code */
290
public int hashCode();
291
292
/** Convert to string representation */
293
public String toString();
294
}
295
```
296
297
### SlotManager
298
299
Central component for managing slot allocation and TaskExecutor coordination within the ResourceManager.
300
301
```java { .api }
302
/**
303
* The slot manager is responsible for maintaining a view on all registered task managers
304
* and their available slots. It offers unused slots to the slot pool and keeps track of
305
* allocations and deallocations.
306
*/
307
public interface SlotManager extends AutoCloseable {
308
/** Start the slot manager */
309
void start(
310
ResourceManagerId newResourceManagerId,
311
Executor newMainThreadExecutor,
312
ResourceActions resourceActions
313
);
314
315
/** Suspend the slot manager */
316
void suspend();
317
318
/** Register task manager with slot manager */
319
boolean registerTaskManager(
320
TaskExecutorConnection taskExecutorConnection,
321
SlotReport initialSlotReport,
322
ResourceProfile totalResourceProfile,
323
ResourceProfile defaultSlotResourceProfile
324
);
325
326
/** Unregister task manager */
327
boolean unregisterTaskManager(InstanceID instanceId, Exception cause);
328
329
/** Report slot status from task manager */
330
boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport);
331
332
/** Request slot allocation */
333
CompletableFuture<Void> requestResource(ResourceRequirements resourceRequirements);
334
335
/** Cancel slot request */
336
void cancelResourceRequirements(JobID jobId);
337
338
/** Free slot */
339
void freeSlot(SlotID slotId, AllocationID allocationId);
340
341
/** Get number of registered slots */
342
int getNumberRegisteredSlots();
343
344
/** Get number of registered slots of specific job */
345
int getNumberRegisteredSlotsOf(InstanceID instanceId);
346
347
/** Get number of free slots */
348
int getNumberFreeSlots();
349
350
/** Get number of free slots of specific job */
351
int getNumberFreeSlotsOf(InstanceID instanceId);
352
353
/** Get registered task managers */
354
Collection<TaskManagerInfo> getRegisteredTaskManagers();
355
356
/** Get task manager info */
357
Optional<TaskManagerInfo> getTaskManagerInfo(InstanceID instanceId);
358
359
/** Set failing allocated slot */
360
boolean setFailingAllocatedSlot(SlotID slotId);
361
362
/** Clear slot for task manager */
363
void clearSlotFor(SlotID slotId);
364
365
/** Get allocations for job */
366
ResourceRequirements getAllocations(JobID jobId);
367
368
/** Close the slot manager */
369
void close();
370
}
371
```
372
373
### ResourceOverview
374
375
Summary of cluster resource utilization and availability.
376
377
```java { .api }
378
/**
379
* An overview over the resources in the cluster.
380
*/
381
public class ResourceOverview implements Serializable {
382
/** Create resource overview */
383
public ResourceOverview(
384
int numberOfTaskManagers,
385
int numberOfAvailableSlots,
386
int numberOfTotalSlots,
387
ResourceProfile availableResourceProfile,
388
ResourceProfile totalResourceProfile
389
);
390
391
/** Get number of registered task managers */
392
public int getNumberOfTaskManagers();
393
394
/** Get number of available slots */
395
public int getNumberOfAvailableSlots();
396
397
/** Get number of total slots */
398
public int getNumberOfTotalSlots();
399
400
/** Get number of free slots */
401
public int getNumberOfFreeSlots();
402
403
/** Get available resource profile */
404
public ResourceProfile getAvailableResource();
405
406
/** Get total resource profile */
407
public ResourceProfile getTotalResource();
408
409
/** Check if cluster has sufficient resources */
410
public boolean hasSufficientResources(ResourceProfile requiredResources);
411
412
/** Get resource utilization ratio */
413
public double getUtilizationRatio();
414
}
415
```
416
417
### StandaloneResourceManager
418
419
ResourceManager implementation for standalone Flink deployments without external resource orchestration.
420
421
```java { .api }
422
/**
423
* ResourceManager for standalone Flink deployments. In standalone mode,
424
* TaskExecutors are started manually and register themselves with the ResourceManager.
425
*/
426
public class StandaloneResourceManager extends ResourceManager<ResourceID> {
427
/** Create standalone resource manager */
428
public StandaloneResourceManager(
429
RpcService rpcService,
430
ResourceManagerConfiguration resourceManagerConfiguration,
431
HighAvailabilityServices highAvailabilityServices,
432
SlotManager slotManager,
433
ResourceManagerPartitionTracker partitionTracker,
434
BlocklistHandler.Factory blocklistHandlerFactory,
435
JobLeaderIdService jobLeaderIdService,
436
ClusterInformation clusterInformation,
437
FatalErrorHandler fatalErrorHandler,
438
ResourceManagerMetricGroup resourceManagerMetricGroup,
439
Time rpcTimeout,
440
Time previousAttemptTimeout
441
);
442
443
/** Initialize the resource manager */
444
protected void initialize();
445
446
/** Terminate the resource manager */
447
protected void terminate();
448
449
/** Request new worker (not supported in standalone mode) */
450
protected CompletableFuture<ResourceID> requestNewWorker(WorkerResourceSpec workerResourceSpec);
451
452
/** Stop worker (not supported in standalone mode) */
453
protected void stopWorker(ResourceID worker);
454
455
/** Get worker resource spec factory */
456
protected WorkerResourceSpecFactory getWorkerResourceSpecFactory();
457
458
/** Get number of slots per worker */
459
protected int getNumberSlotsPerWorker();
460
}
461
```
462
463
**Usage Examples:**
464
465
```java
466
// Configure resource manager
467
ResourceManagerConfiguration rmConfig = ResourceManagerConfiguration.fromConfiguration(
468
configuration,
469
ResourceID.fromString("resource-manager")
470
);
471
472
// Create slot manager
473
SlotManagerConfiguration slotManagerConfig = SlotManagerConfiguration.fromConfiguration(
474
configuration,
475
WorkerResourceSpec.fromTotalResourceProfile(
476
ResourceProfile.fromResources(4.0, 8192),
477
MemorySize.ofMebiBytes(1024)
478
)
479
);
480
481
SlotManager slotManager = SlotManagerBuilder
482
.newBuilder()
483
.setSlotManagerConfiguration(slotManagerConfig)
484
.setResourceManagerId(resourceManagerId)
485
.setMainThreadExecutor(mainThreadExecutor)
486
.setResourceActions(resourceActions)
487
.build();
488
489
// Create standalone resource manager
490
StandaloneResourceManager resourceManager = new StandaloneResourceManager(
491
rpcService,
492
rmConfig,
493
highAvailabilityServices,
494
slotManager,
495
partitionTracker,
496
blocklistHandlerFactory,
497
jobLeaderIdService,
498
clusterInformation,
499
fatalErrorHandler,
500
resourceManagerMetricGroup,
501
rpcTimeout,
502
previousAttemptTimeout
503
);
504
505
// Start resource manager
506
resourceManager.start();
507
508
// Monitor resource utilization
509
resourceManager.requestResourceOverview(Duration.ofSeconds(10))
510
.thenAccept(overview -> {
511
System.out.println("Cluster Overview:");
512
System.out.println(" Task Managers: " + overview.getNumberOfTaskManagers());
513
System.out.println(" Total Slots: " + overview.getNumberOfTotalSlots());
514
System.out.println(" Available Slots: " + overview.getNumberOfAvailableSlots());
515
System.out.println(" Utilization: " +
516
String.format("%.2f%%", overview.getUtilizationRatio() * 100));
517
});
518
```
519
520
## Types
521
522
```java { .api }
523
// Resource manager identifiers
524
public class ResourceManagerId implements Serializable {
525
public ResourceManagerId();
526
public ResourceManagerId(UUID uuid);
527
public static ResourceManagerId generate();
528
529
public UUID getUuid();
530
}
531
532
public class InstanceID implements Serializable {
533
public InstanceID();
534
public InstanceID(byte[] instanceId);
535
public static InstanceID generate();
536
537
public byte[] getBytes();
538
}
539
540
// Registration responses
541
public abstract class RegistrationResponse implements Serializable {
542
public abstract boolean isSuccess();
543
public abstract boolean isFailure();
544
}
545
546
public class JobMasterRegistrationSuccess extends RegistrationResponse {
547
public JobMasterRegistrationSuccess(ResourceManagerId resourceManagerId);
548
549
public ResourceManagerId getResourceManagerId();
550
public boolean isSuccess();
551
}
552
553
public class TaskExecutorRegistrationSuccess extends RegistrationResponse {
554
public TaskExecutorRegistrationSuccess(
555
InstanceID registrationId,
556
ResourceID resourceManagerResourceId,
557
ClusterInformation clusterInformation
558
);
559
560
public InstanceID getRegistrationId();
561
public ResourceID getResourceManagerResourceId();
562
public ClusterInformation getClusterInformation();
563
public boolean isSuccess();
564
}
565
566
public class RegistrationResponse.Failure extends RegistrationResponse {
567
public Failure(String reason);
568
569
public String getReason();
570
public boolean isFailure();
571
}
572
573
// Resource requirements and allocation
574
public class ResourceRequirements implements Serializable {
575
public static ResourceRequirements create(
576
JobID jobId,
577
String targetAddress,
578
Collection<ResourceRequirement> resourceRequirements
579
);
580
581
public JobID getJobId();
582
public String getTargetAddress();
583
public Collection<ResourceRequirement> getResourceRequirements();
584
public int getTotalRequiredResources();
585
}
586
587
public class ResourceRequirement implements Serializable {
588
public ResourceRequirement(ResourceProfile resourceProfile, int numberOfRequiredSlots);
589
590
public ResourceProfile getResourceProfile();
591
public int getNumberOfRequiredSlots();
592
}
593
594
// Slot requests
595
public class SlotRequest implements Serializable {
596
public SlotRequest(
597
JobID jobId,
598
AllocationID allocationId,
599
ResourceProfile resourceProfile,
600
String targetAddress,
601
ResourceManagerId resourceManagerId
602
);
603
604
public JobID getJobId();
605
public AllocationID getAllocationId();
606
public ResourceProfile getResourceProfile();
607
public String getTargetAddress();
608
public ResourceManagerId getResourceManagerId();
609
}
610
611
public class SlotRequestId implements Serializable {
612
public SlotRequestId();
613
public static SlotRequestId generate();
614
615
public UUID getUuid();
616
}
617
618
// Task manager information
619
public class TaskManagerInfo implements Serializable {
620
public TaskManagerInfo(
621
ResourceID resourceId,
622
String address,
623
int dataPort,
624
int jmxPort,
625
long lastHeartbeat,
626
int numberSlots,
627
int numberAvailableSlots,
628
ResourceProfile totalResource,
629
ResourceProfile availableResource,
630
HardwareDescription hardwareDescription,
631
TaskExecutorMemoryConfiguration memoryConfiguration
632
);
633
634
public ResourceID getResourceId();
635
public String getAddress();
636
public int getDataPort();
637
public long getLastHeartbeat();
638
public int getNumberSlots();
639
public int getNumberAvailableSlots();
640
public ResourceProfile getTotalResource();
641
public ResourceProfile getAvailableResource();
642
public HardwareDescription getHardwareDescription();
643
public TaskExecutorMemoryConfiguration getMemoryConfiguration();
644
}
645
646
// Application status for per-job clusters
647
public enum ApplicationStatus {
648
SUCCEEDED,
649
FAILED,
650
KILLED,
651
UNKNOWN
652
}
653
```