The runtime module of Apache Flink that provides the core execution engine, scheduling, fault tolerance, checkpointing, and resource management capabilities for distributed stream processing applications
—
Cluster resource allocation and TaskExecutor lifecycle management for different deployment environments. The ResourceManager is responsible for managing cluster resources, allocating slots, and coordinating with TaskExecutors.
RPC gateway interface for communication with ResourceManager, handling job registration, slot allocation, and cluster coordination.
/**
* The ResourceManager's RPC gateway interface for communication with JobMasters
* and TaskExecutors.
*/
public interface ResourceManagerGateway
extends FencedRpcGateway<ResourceManagerId>, ClusterPartitionManager, BlocklistListener {
/** Register a JobMaster with the ResourceManager */
CompletableFuture<RegistrationResponse> registerJobMaster(
JobMasterId jobMasterId,
ResourceID jobMasterResourceId,
String jobMasterAddress,
JobID jobId,
Duration timeout
);
/** Register a TaskExecutor with the ResourceManager */
CompletableFuture<RegistrationResponse> registerTaskExecutor(
String taskExecutorAddress,
ResourceID resourceId,
SlotReport slotReport,
ResourceProfile totalResourceProfile,
Duration timeout
);
/** Send slot report from TaskExecutor */
CompletableFuture<Acknowledge> sendSlotReport(
ResourceID taskManagerResourceId,
InstanceID taskManagerRegistrationId,
SlotReport slotReport,
Duration timeout
);
/** Request slot allocation */
CompletableFuture<Acknowledge> requestSlot(
JobMasterId jobMasterId,
SlotRequest slotRequest,
Duration timeout
);
/** Cancel slot request */
void cancelSlotRequest(SlotRequestId slotRequestId);
/** Notify slot available */
CompletableFuture<Acknowledge> notifySlotAvailable(
InstanceID instanceID,
SlotID slotId,
AllocationID allocationId
);
/** Deregister application (for per-job clusters) */
CompletableFuture<Acknowledge> deregisterApplication(
ApplicationStatus finalStatus,
String diagnostics
);
/** Get number of registered task managers */
CompletableFuture<Integer> getNumberOfRegisteredTaskManagers();
/** Heartbeat from JobMaster */
void heartbeatFromJobManager(ResourceID resourceID, JobMasterIdWithResourceRequirements heartbeatPayload);
/** Heartbeat from TaskExecutor */
void heartbeatFromTaskManager(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload);
/** Disconnect JobMaster */
void disconnectJobManager(JobID jobId, Exception cause);
/** Disconnect TaskExecutor */
void disconnectTaskManager(ResourceID resourceId, Exception cause);
/** Request thread dump from TaskExecutor */
CompletableFuture<ThreadDumpInfo> requestThreadDump(
ResourceID taskManagerId,
Duration timeout
);
/** Request profiling from TaskExecutor */
CompletableFuture<Collection<ProfilingInfo>> requestProfiling(
ResourceID taskManagerId,
Duration timeout,
ProfilingMode mode,
Duration profilingDuration
);
/** Request resource overview */
CompletableFuture<ResourceOverview> requestResourceOverview(Duration timeout);
/** Request task manager info */
CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Duration timeout);
/** Request detailed task manager info */
CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo(
ResourceID taskManagerId,
Duration timeout
);
}Usage Examples:
// Register JobMaster with ResourceManager
CompletableFuture<RegistrationResponse> registrationFuture =
resourceManagerGateway.registerJobMaster(
jobMasterId,
jobMasterResourceId,
jobMasterAddress,
jobId,
Duration.ofMinutes(1)
);
registrationFuture.thenAccept(response -> {
if (response instanceof JobMasterRegistrationSuccess) {
JobMasterRegistrationSuccess success = (JobMasterRegistrationSuccess) response;
System.out.println("JobMaster registered successfully");
System.out.println("ResourceManager ID: " + success.getResourceManagerId());
} else {
System.out.println("Registration failed: " + response);
}
});
// Request slot allocation
SlotRequest slotRequest = new SlotRequest(
jobId,
allocationId,
resourceProfile,
targetAddress,
resourceManagerId
);
CompletableFuture<Acknowledge> slotFuture = resourceManagerGateway.requestSlot(
jobMasterId,
slotRequest,
Duration.ofMinutes(2)
);
// Get cluster resource overview
CompletableFuture<ResourceOverview> overviewFuture =
resourceManagerGateway.requestResourceOverview(Duration.ofSeconds(30));
overviewFuture.thenAccept(overview -> {
System.out.println("Available slots: " + overview.getNumberOfAvailableSlots());
System.out.println("Total slots: " + overview.getNumberOfTotalSlots());
System.out.println("Free slots: " + overview.getNumberOfFreeSlots());
});Base class for resource managers handling resource allocation and TaskExecutor lifecycle management.
/**
* ResourceManager implementation. The ResourceManager is responsible for resource allocation
* and bookkeeping. It offers unused slots to the JobManager and keeps track of which
* slots are available, and which slots are being used.
*/
public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
extends FencedRpcEndpoint<ResourceManagerId> implements ResourceManagerGateway {
/** Start the ResourceManager service */
public void start();
/** Get the ResourceManager's resource ID */
public ResourceID getResourceId();
/** Get the ResourceManager's address */
public String getAddress();
/** Get the ResourceManager's ID */
public ResourceManagerId getFencingToken();
/** Get cluster information */
public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Duration timeout);
/** Get resource overview */
public CompletableFuture<ResourceOverview> requestResourceOverview(Duration timeout);
/** Start new worker */
protected abstract CompletableFuture<WorkerType> requestNewWorker(WorkerResourceSpec workerResourceSpec);
/** Stop worker */
protected abstract void stopWorker(WorkerType worker);
/** Get worker resource specification factory */
protected abstract WorkerResourceSpecFactory getWorkerResourceSpecFactory();
/** Get number of slots per worker */
protected abstract int getNumberSlotsPerWorker();
/** Initialize the ResourceManager */
protected void initialize();
/** Terminate the ResourceManager */
protected void terminate();
/** Handle new TaskExecutor registration */
protected void onTaskExecutorRegistration(TaskExecutorConnection taskExecutorConnection);
/** Handle TaskExecutor disconnection */
protected void onTaskExecutorDisconnection(ResourceID resourceId, Exception cause);
/** Handle slot report from TaskExecutor */
protected void onSlotReport(ResourceID resourceId, SlotReport slotReport);
/** Handle heartbeat from TaskExecutor */
protected void onTaskExecutorHeartbeat(ResourceID resourceId, TaskExecutorHeartbeatPayload heartbeatPayload);
/** Handle job leader notification */
protected void onJobLeaderIdChanged(JobID jobId, JobMasterId newJobMasterId);
/** Get slot manager */
protected SlotManager getSlotManager();
/** Get job leader id service */
protected JobLeaderIdService getJobLeaderIdService();
/** Get cluster partition manager */
protected ClusterPartitionManager getClusterPartitionManager();
}Specification of computational resources required for a worker/TaskExecutor instance.
/**
* Specification of worker resources. This class describes the resources of a worker,
* including CPU, memory, and other computational resources.
*/
public class WorkerResourceSpec implements Serializable {
/** Create worker resource spec from total resource profile */
public static WorkerResourceSpec fromTotalResourceProfile(
ResourceProfile totalResourceProfile,
MemorySize networkMemorySize
);
/** Create worker resource spec with specific memory allocations */
public static WorkerResourceSpec fromTaskExecutorResourceSpec(
CPUResource cpuCores,
MemorySize taskHeapSize,
MemorySize taskOffHeapSize,
MemorySize networkMemorySize,
MemorySize managedMemorySize
);
/** Get total resource profile */
public ResourceProfile getTotalResourceProfile();
/** Get CPU cores */
public CPUResource getCpuCores();
/** Get task heap memory size */
public MemorySize getTaskHeapSize();
/** Get task off-heap memory size */
public MemorySize getTaskOffHeapSize();
/** Get network memory size */
public MemorySize getNetworkMemSize();
/** Get managed memory size */
public MemorySize getManagedMemSize();
/** Get JVM heap memory size (framework + task heap) */
public MemorySize getJvmHeapMemorySize();
/** Get total memory size */
public MemorySize getTotalMemSize();
/** Get number of slots this worker can provide */
public int getNumSlots();
/** Check equality with another spec */
public boolean equals(Object obj);
/** Get hash code */
public int hashCode();
/** Convert to string representation */
public String toString();
}Central component for managing slot allocation and TaskExecutor coordination within the ResourceManager.
/**
* The slot manager is responsible for maintaining a view on all registered task managers
* and their available slots. It offers unused slots to the slot pool and keeps track of
* allocations and deallocations.
*/
public interface SlotManager extends AutoCloseable {
/** Start the slot manager */
void start(
ResourceManagerId newResourceManagerId,
Executor newMainThreadExecutor,
ResourceActions resourceActions
);
/** Suspend the slot manager */
void suspend();
/** Register task manager with slot manager */
boolean registerTaskManager(
TaskExecutorConnection taskExecutorConnection,
SlotReport initialSlotReport,
ResourceProfile totalResourceProfile,
ResourceProfile defaultSlotResourceProfile
);
/** Unregister task manager */
boolean unregisterTaskManager(InstanceID instanceId, Exception cause);
/** Report slot status from task manager */
boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport);
/** Request slot allocation */
CompletableFuture<Void> requestResource(ResourceRequirements resourceRequirements);
/** Cancel slot request */
void cancelResourceRequirements(JobID jobId);
/** Free slot */
void freeSlot(SlotID slotId, AllocationID allocationId);
/** Get number of registered slots */
int getNumberRegisteredSlots();
/** Get number of registered slots of specific job */
int getNumberRegisteredSlotsOf(InstanceID instanceId);
/** Get number of free slots */
int getNumberFreeSlots();
/** Get number of free slots of specific job */
int getNumberFreeSlotsOf(InstanceID instanceId);
/** Get registered task managers */
Collection<TaskManagerInfo> getRegisteredTaskManagers();
/** Get task manager info */
Optional<TaskManagerInfo> getTaskManagerInfo(InstanceID instanceId);
/** Set failing allocated slot */
boolean setFailingAllocatedSlot(SlotID slotId);
/** Clear slot for task manager */
void clearSlotFor(SlotID slotId);
/** Get allocations for job */
ResourceRequirements getAllocations(JobID jobId);
/** Close the slot manager */
void close();
}Summary of cluster resource utilization and availability.
/**
* An overview over the resources in the cluster.
*/
public class ResourceOverview implements Serializable {
/** Create resource overview */
public ResourceOverview(
int numberOfTaskManagers,
int numberOfAvailableSlots,
int numberOfTotalSlots,
ResourceProfile availableResourceProfile,
ResourceProfile totalResourceProfile
);
/** Get number of registered task managers */
public int getNumberOfTaskManagers();
/** Get number of available slots */
public int getNumberOfAvailableSlots();
/** Get number of total slots */
public int getNumberOfTotalSlots();
/** Get number of free slots */
public int getNumberOfFreeSlots();
/** Get available resource profile */
public ResourceProfile getAvailableResource();
/** Get total resource profile */
public ResourceProfile getTotalResource();
/** Check if cluster has sufficient resources */
public boolean hasSufficientResources(ResourceProfile requiredResources);
/** Get resource utilization ratio */
public double getUtilizationRatio();
}ResourceManager implementation for standalone Flink deployments without external resource orchestration.
/**
* ResourceManager for standalone Flink deployments. In standalone mode,
* TaskExecutors are started manually and register themselves with the ResourceManager.
*/
public class StandaloneResourceManager extends ResourceManager<ResourceID> {
/** Create standalone resource manager */
public StandaloneResourceManager(
RpcService rpcService,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
SlotManager slotManager,
ResourceManagerPartitionTracker partitionTracker,
BlocklistHandler.Factory blocklistHandlerFactory,
JobLeaderIdService jobLeaderIdService,
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
ResourceManagerMetricGroup resourceManagerMetricGroup,
Time rpcTimeout,
Time previousAttemptTimeout
);
/** Initialize the resource manager */
protected void initialize();
/** Terminate the resource manager */
protected void terminate();
/** Request new worker (not supported in standalone mode) */
protected CompletableFuture<ResourceID> requestNewWorker(WorkerResourceSpec workerResourceSpec);
/** Stop worker (not supported in standalone mode) */
protected void stopWorker(ResourceID worker);
/** Get worker resource spec factory */
protected WorkerResourceSpecFactory getWorkerResourceSpecFactory();
/** Get number of slots per worker */
protected int getNumberSlotsPerWorker();
}Usage Examples:
// Configure resource manager
ResourceManagerConfiguration rmConfig = ResourceManagerConfiguration.fromConfiguration(
configuration,
ResourceID.fromString("resource-manager")
);
// Create slot manager
SlotManagerConfiguration slotManagerConfig = SlotManagerConfiguration.fromConfiguration(
configuration,
WorkerResourceSpec.fromTotalResourceProfile(
ResourceProfile.fromResources(4.0, 8192),
MemorySize.ofMebiBytes(1024)
)
);
SlotManager slotManager = SlotManagerBuilder
.newBuilder()
.setSlotManagerConfiguration(slotManagerConfig)
.setResourceManagerId(resourceManagerId)
.setMainThreadExecutor(mainThreadExecutor)
.setResourceActions(resourceActions)
.build();
// Create standalone resource manager
StandaloneResourceManager resourceManager = new StandaloneResourceManager(
rpcService,
rmConfig,
highAvailabilityServices,
slotManager,
partitionTracker,
blocklistHandlerFactory,
jobLeaderIdService,
clusterInformation,
fatalErrorHandler,
resourceManagerMetricGroup,
rpcTimeout,
previousAttemptTimeout
);
// Start resource manager
resourceManager.start();
// Monitor resource utilization
resourceManager.requestResourceOverview(Duration.ofSeconds(10))
.thenAccept(overview -> {
System.out.println("Cluster Overview:");
System.out.println(" Task Managers: " + overview.getNumberOfTaskManagers());
System.out.println(" Total Slots: " + overview.getNumberOfTotalSlots());
System.out.println(" Available Slots: " + overview.getNumberOfAvailableSlots());
System.out.println(" Utilization: " +
String.format("%.2f%%", overview.getUtilizationRatio() * 100));
});// Resource manager identifiers
public class ResourceManagerId implements Serializable {
public ResourceManagerId();
public ResourceManagerId(UUID uuid);
public static ResourceManagerId generate();
public UUID getUuid();
}
public class InstanceID implements Serializable {
public InstanceID();
public InstanceID(byte[] instanceId);
public static InstanceID generate();
public byte[] getBytes();
}
// Registration responses
public abstract class RegistrationResponse implements Serializable {
public abstract boolean isSuccess();
public abstract boolean isFailure();
}
public class JobMasterRegistrationSuccess extends RegistrationResponse {
public JobMasterRegistrationSuccess(ResourceManagerId resourceManagerId);
public ResourceManagerId getResourceManagerId();
public boolean isSuccess();
}
public class TaskExecutorRegistrationSuccess extends RegistrationResponse {
public TaskExecutorRegistrationSuccess(
InstanceID registrationId,
ResourceID resourceManagerResourceId,
ClusterInformation clusterInformation
);
public InstanceID getRegistrationId();
public ResourceID getResourceManagerResourceId();
public ClusterInformation getClusterInformation();
public boolean isSuccess();
}
public class RegistrationResponse.Failure extends RegistrationResponse {
public Failure(String reason);
public String getReason();
public boolean isFailure();
}
// Resource requirements and allocation
public class ResourceRequirements implements Serializable {
public static ResourceRequirements create(
JobID jobId,
String targetAddress,
Collection<ResourceRequirement> resourceRequirements
);
public JobID getJobId();
public String getTargetAddress();
public Collection<ResourceRequirement> getResourceRequirements();
public int getTotalRequiredResources();
}
public class ResourceRequirement implements Serializable {
public ResourceRequirement(ResourceProfile resourceProfile, int numberOfRequiredSlots);
public ResourceProfile getResourceProfile();
public int getNumberOfRequiredSlots();
}
// Slot requests
public class SlotRequest implements Serializable {
public SlotRequest(
JobID jobId,
AllocationID allocationId,
ResourceProfile resourceProfile,
String targetAddress,
ResourceManagerId resourceManagerId
);
public JobID getJobId();
public AllocationID getAllocationId();
public ResourceProfile getResourceProfile();
public String getTargetAddress();
public ResourceManagerId getResourceManagerId();
}
public class SlotRequestId implements Serializable {
public SlotRequestId();
public static SlotRequestId generate();
public UUID getUuid();
}
// Task manager information
public class TaskManagerInfo implements Serializable {
public TaskManagerInfo(
ResourceID resourceId,
String address,
int dataPort,
int jmxPort,
long lastHeartbeat,
int numberSlots,
int numberAvailableSlots,
ResourceProfile totalResource,
ResourceProfile availableResource,
HardwareDescription hardwareDescription,
TaskExecutorMemoryConfiguration memoryConfiguration
);
public ResourceID getResourceId();
public String getAddress();
public int getDataPort();
public long getLastHeartbeat();
public int getNumberSlots();
public int getNumberAvailableSlots();
public ResourceProfile getTotalResource();
public ResourceProfile getAvailableResource();
public HardwareDescription getHardwareDescription();
public TaskExecutorMemoryConfiguration getMemoryConfiguration();
}
// Application status for per-job clusters
public enum ApplicationStatus {
SUCCEEDED,
FAILED,
KILLED,
UNKNOWN
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime