Container allocation and executor lifecycle management within YARN resource constraints and scheduling policies. This module handles the complex coordination between Spark's resource needs and YARN's resource management system.
Core resource allocator that manages container requests, allocation tracking, and executor lifecycle within YARN clusters.
class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
resolver: SparkRackResolver,
clock: Clock = new SystemClock()
) {
def getNumExecutorsRunning: Int
def getNumExecutorsFailed: Int
def numContainersPendingAllocate: Int
def allocateResources(): Unit
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit
def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit
def updateResourceRequests(): Unit
def killExecutor(executorId: String): Unit
def stop(): Unit
}Constructor Parameters:
driverUrl: RPC URL for driver communicationdriverRef: RPC endpoint reference for driverconf: YARN configuration from clustersparkConf: Spark configuration with resource settingsamClient: YARN ApplicationMaster ResourceManager clientappAttemptId: YARN application attempt identifiersecurityMgr: Spark security manager instancelocalResources: Staged resources for executor containersresolver: Rack resolver for locality awarenessclock: Clock instance for time-based operations (defaults to SystemClock)Status Query Methods:
getNumExecutorsRunning: Int
getNumExecutorsFailed: Int
numContainersPendingAllocate: Int
Resource Management Methods:
allocateResources(): Unit
handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit
processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit
updateResourceRequests(): Unit
killExecutor(executorId: String): Unit
stop(): Unit
Usage Example:
import org.apache.spark.deploy.yarn.YarnAllocator
// YarnAllocator is typically used internally by ApplicationMaster
// Status can be monitored through scheduler backends
val backend = sc.schedulerBackend.asInstanceOf[YarnSchedulerBackend]
// Internal allocator state is managed automatically
// Monitor resource status through SparkContext
val statusTracker = sc.statusTracker
val execInfo = statusTracker.getExecutorInfos
execInfo.foreach { exec =>
println(s"Executor ${exec.executorId}: ${exec.totalCores} cores, ${exec.maxMemory} memory")
}Manages executor container launch and configuration within YARN containers.
class ExecutorRunnable(
container: Container,
conf: SparkConf,
spConf: SparkConf,
masterAddress: String,
executorId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
appId: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
) {
def run(): Unit
}Responsibilities:
Container Launch Process:
Handles ResourceManager registration and communication for the ApplicationMaster.
class YarnRMClient(amClient: AMRMClient[ContainerRequest]) {
def register(
driverHost: String,
driverPort: Int,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: Option[String],
uiHistoryAddress: String
): Unit
def createAllocator(
conf: YarnConfiguration,
sparkConf: SparkConf,
driverUrl: String,
driverRef: RpcEndpointRef,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
): YarnAllocator
def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit
def getAttemptId(): ApplicationAttemptId
def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int
}ResourceManager Integration:
register(...): Unit
createAllocator(...): YarnAllocator
unregister(status, diagnostics): Unit
getAttemptId(): ApplicationAttemptId
getMaxRegAttempts(sparkConf, yarnConf): Int
Manages executor container launch and configuration within YARN containers. This class handles the low-level details of starting Spark executors in YARN containers.
class ExecutorRunnable(
container: Container,
conf: Configuration,
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
appId: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
) {
def run(): Unit
def launchContextDebugInfo(): String
def startContainer(): java.util.Map[String, ByteBuffer]
}Container Launch Management:
run(): Unit
launchContextDebugInfo(): String
startContainer(): java.util.Map[String, ByteBuffer]
Container Launch Process:
Usage Example:
// ExecutorRunnable is typically used internally by YarnAllocator
// when launching new executor containers
def launchExecutorContainer(
container: Container,
executorId: String): Unit = {
val runnable = new ExecutorRunnable(
container = container,
conf = yarnConf,
sparkConf = sparkConf,
masterAddress = driverUrl,
executorId = executorId,
hostname = container.getNodeId.getHost,
executorMemory = executorMemoryMB,
executorCores = executorCores,
appId = appId,
securityMgr = securityManager,
localResources = localResourceMap
)
// Launch in separate thread
launcherPool.execute(runnable)
}Integration with Spark's dynamic allocation for elastic resource scaling.
// Configuration for dynamic allocation
val conf = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", "2")
.set("spark.dynamicAllocation.maxExecutors", "100")
.set("spark.dynamicAllocation.initialExecutors", "10")
.set("spark.dynamicAllocation.targetUtilization", "0.8")Scaling Behavior:
Leverages YARN's rack awareness for optimal data locality.
class SparkRackResolver(conf: SparkConf, yarnConf: YarnConfiguration) {
def resolve(hostName: String): String
}Locality Levels:
Container Request Strategy:
YARN-specific resource constraint handling.
Memory Management:
// Memory overhead calculation
val executorMemory = conf.get("spark.executor.memory", "1g")
val memoryOverhead = conf.get("spark.yarn.executor.memoryOverhead",
math.max((executorMemory * 0.1).toLong, 384L).toString)
val totalMemory = executorMemory + memoryOverheadCPU Allocation:
// Core allocation with YARN constraints
val executorCores = conf.getInt("spark.executor.cores", 1)
val maxCores = yarnConf.getInt("yarn.scheduler.maximum-allocation-vcores", Int.MaxValue)
val requestCores = math.min(executorCores, maxCores)Container Lifecycle:
Failure Categories:
Recovery Strategies:
// Retry configuration
val maxExecutorFailures = conf.getInt("spark.yarn.max.executor.failures",
math.max(2 * numExecutors, 3))
// Blacklist configuration
val nodeBlacklistEnabled = conf.getBoolean("spark.blacklist.enabled", false)
val maxNodeBlacklist = conf.getInt("spark.blacklist.application.maxFailedTasksPerNode", 2)Health Monitoring:
Metrics Integration:
// Monitoring through ApplicationMaster metrics
val source = new ApplicationMasterSource()
source.registerGauge("numExecutorsRunning", () => numExecutorsRunning)
source.registerGauge("numExecutorsFailed", () => numExecutorsFailed)
source.registerGauge("numPendingContainers", () => numPendingAllocate)Graceful Shutdown:
Emergency Shutdown:
// Executor memory settings
"spark.executor.memory" -> "4g" // Heap memory
"spark.yarn.executor.memoryOverhead" -> "1g" // Off-heap overhead
"spark.executor.memoryFraction" -> "0.8" // Storage/execution split
// ApplicationMaster memory
"spark.yarn.am.memory" -> "2g" // AM heap memory
"spark.yarn.am.memoryOverhead" -> "384m" // AM overhead// Executor CPU settings
"spark.executor.cores" -> "4" // Cores per executor
"spark.executor.instances" -> "10" // Static executor count
// ApplicationMaster CPU
"spark.yarn.am.cores" -> "2" // AM CPU cores// YARN queue and scheduling
"spark.yarn.queue" -> "production" // YARN queue name
"spark.yarn.priority" -> "1" // Application priority
"spark.yarn.maxAppAttempts" -> "3" // Max application attemptsInsufficient Resources:
throw new YarnException("Could not allocate container within timeout")
throw new IllegalStateException("Requested memory exceeds queue maximum")Container Launch Failures:
throw new IOException("Failed to launch container on node")
throw new SecurityException("Container launch denied due to security policy")Resource Constraint Violations:
throw new IllegalArgumentException("Executor memory must be at least 384MB")
throw new SparkException("Total requested cores exceed cluster capacity")Resource Status Reporting:
def getResourceStatus: String = {
s"""
|Executors Running: $getNumExecutorsRunning
|Executors Failed: $getNumExecutorsFailed
|Containers Pending: $getNumPendingAllocate
|Total Containers: $getNumContainersRunning
""".stripMargin
}Container Exit Analysis:
def analyzeContainerExit(status: ContainerStatus): String = {
status.getExitStatus match {
case 0 => "Container completed successfully"
case -100 => "Container killed by YARN (preemption)"
case -101 => "Container killed due to exceeding memory limits"
case other => s"Container failed with exit code: $other"
}
}Memory Optimization:
CPU Optimization:
Container Sizing:
Data Locality:
Network Locality: