or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-deployment.mdcluster-management.mdcommand-building.mdconfiguration-system.mdextension-points.mdindex.mdresource-management.mdsecurity-integration.mdyarn-shuffle-service.md
tile.json

resource-management.mddocs/

Resource Management

Container allocation and executor lifecycle management within YARN resource constraints and scheduling policies. This module handles the complex coordination between Spark's resource needs and YARN's resource management system.

Capabilities

YarnAllocator

Core resource allocator that manages container requests, allocation tracking, and executor lifecycle within YARN clusters.

class YarnAllocator(
  driverUrl: String,
  driverRef: RpcEndpointRef,
  conf: YarnConfiguration,
  sparkConf: SparkConf,
  amClient: AMRMClient[ContainerRequest],
  appAttemptId: ApplicationAttemptId,
  securityMgr: SecurityManager,
  localResources: Map[String, LocalResource],
  resolver: SparkRackResolver,
  clock: Clock = new SystemClock()
) {
  def getNumExecutorsRunning: Int
  def getNumExecutorsFailed: Int
  def numContainersPendingAllocate: Int
  def allocateResources(): Unit
  def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit
  def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit
  def updateResourceRequests(): Unit
  def killExecutor(executorId: String): Unit
  def stop(): Unit
}

Constructor Parameters:

  • driverUrl: RPC URL for driver communication
  • driverRef: RPC endpoint reference for driver
  • conf: YARN configuration from cluster
  • sparkConf: Spark configuration with resource settings
  • amClient: YARN ApplicationMaster ResourceManager client
  • appAttemptId: YARN application attempt identifier
  • securityMgr: Spark security manager instance
  • localResources: Staged resources for executor containers
  • resolver: Rack resolver for locality awareness
  • clock: Clock instance for time-based operations (defaults to SystemClock)

Status Query Methods:

getNumExecutorsRunning: Int

  • Returns count of currently running executors
  • Includes executors in RUNNING state only

getNumExecutorsFailed: Int

  • Returns count of failed executor attempts
  • Includes permanent failures and exceeded retry attempts

numContainersPendingAllocate: Int

  • Returns count of outstanding container requests
  • Requests submitted but not yet allocated by YARN

Resource Management Methods:

allocateResources(): Unit

  • Requests containers from YARN ResourceManager based on current needs
  • Considers executor targets, pending requests, and failed containers
  • Updates container allocation state

handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit

  • Processes newly allocated containers from YARN
  • Launches executor processes in allocated containers
  • Updates tracking state for launched executors

processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit

  • Handles container completion events from YARN
  • Distinguishes between normal completion and failures
  • Triggers replacement requests for failed executors

updateResourceRequests(): Unit

  • Updates container resource requests with YARN ResourceManager
  • Adjusts requested resources based on current executor targets
  • Called periodically to sync allocation state

killExecutor(executorId: String): Unit

  • Kills a specific executor by terminating its container
  • Updates internal tracking state
  • Used for explicit executor removal

stop(): Unit

  • Stops the allocator and releases all resources
  • Cancels pending container requests
  • Performs cleanup of internal state

Usage Example:

import org.apache.spark.deploy.yarn.YarnAllocator

// YarnAllocator is typically used internally by ApplicationMaster
// Status can be monitored through scheduler backends

val backend = sc.schedulerBackend.asInstanceOf[YarnSchedulerBackend]
// Internal allocator state is managed automatically

// Monitor resource status through SparkContext
val statusTracker = sc.statusTracker
val execInfo = statusTracker.getExecutorInfos

execInfo.foreach { exec =>
  println(s"Executor ${exec.executorId}: ${exec.totalCores} cores, ${exec.maxMemory} memory")
}

ExecutorRunnable

Manages executor container launch and configuration within YARN containers.

class ExecutorRunnable(
  container: Container,
  conf: SparkConf,
  spConf: SparkConf,
  masterAddress: String,
  executorId: String,
  hostname: String,
  executorMemory: Int,
  executorCores: Int,
  appId: String,
  securityMgr: SecurityManager,
  localResources: Map[String, LocalResource]
) {
  def run(): Unit
}

Responsibilities:

  • Container launch context creation
  • Environment variable setup
  • Java options and classpath configuration
  • Security context establishment
  • Executor process startup

Container Launch Process:

  1. Prepares container environment (PATH, JAVA_HOME, etc.)
  2. Constructs executor command line with JVM options
  3. Sets up security credentials and tokens
  4. Configures local resources and file permissions
  5. Submits container launch request to NodeManager

YarnRMClient

Handles ResourceManager registration and communication for the ApplicationMaster.

class YarnRMClient(amClient: AMRMClient[ContainerRequest]) {
  def register(
    driverHost: String, 
    driverPort: Int, 
    conf: YarnConfiguration, 
    sparkConf: SparkConf, 
    uiAddress: Option[String], 
    uiHistoryAddress: String
  ): Unit
  def createAllocator(
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    driverUrl: String,
    driverRef: RpcEndpointRef,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource]
  ): YarnAllocator
  def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit
  def getAttemptId(): ApplicationAttemptId
  def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int
}

ResourceManager Integration:

register(...): Unit

  • Registers ApplicationMaster with YARN ResourceManager
  • Provides driver connection details and web UI URLs
  • Establishes communication channel for resource requests

createAllocator(...): YarnAllocator

  • Creates and configures YarnAllocator instance
  • Sets up container allocation and executor lifecycle management
  • Integrates with security manager and local resources

unregister(status, diagnostics): Unit

  • Unregisters ApplicationMaster from ResourceManager
  • Reports final application status and diagnostic information
  • Performs cleanup of ResourceManager communication

getAttemptId(): ApplicationAttemptId

  • Returns current YARN application attempt identifier
  • Used for tracking and logging purposes

getMaxRegAttempts(sparkConf, yarnConf): Int

  • Determines maximum ApplicationMaster registration retry attempts
  • Based on Spark and YARN configuration settings

ExecutorRunnable

Manages executor container launch and configuration within YARN containers. This class handles the low-level details of starting Spark executors in YARN containers.

class ExecutorRunnable(
  container: Container,
  conf: Configuration,
  sparkConf: SparkConf,
  masterAddress: String,
  executorId: String,
  hostname: String,
  executorMemory: Int,
  executorCores: Int,
  appId: String,
  securityMgr: SecurityManager,
  localResources: Map[String, LocalResource]
) {
  def run(): Unit
  def launchContextDebugInfo(): String
  def startContainer(): java.util.Map[String, ByteBuffer]
}

Container Launch Management:

run(): Unit

  • Main execution method that launches executor in YARN container
  • Sets up container environment and security context
  • Submits container launch request to NodeManager
  • Handles launch failures and error reporting

launchContextDebugInfo(): String

  • Returns detailed debug information about container launch context
  • Includes environment variables, command line, and resource information
  • Used for troubleshooting container launch issues

startContainer(): java.util.Map[String, ByteBuffer]

  • Initiates container startup process
  • Returns container tokens and security information
  • Manages authentication and authorization setup

Container Launch Process:

  1. Environment Setup: Configures PATH, JAVA_HOME, and other environment variables
  2. Command Construction: Builds executor command line with JVM options and classpath
  3. Security Context: Sets up Kerberos credentials and security tokens
  4. Resource Configuration: Configures local resources and file permissions
  5. Container Submission: Submits launch request to NodeManager
  6. Monitoring: Tracks container launch status and handles failures

Usage Example:

// ExecutorRunnable is typically used internally by YarnAllocator
// when launching new executor containers

def launchExecutorContainer(
    container: Container,
    executorId: String): Unit = {
  
  val runnable = new ExecutorRunnable(
    container = container,
    conf = yarnConf,
    sparkConf = sparkConf,
    masterAddress = driverUrl,
    executorId = executorId,
    hostname = container.getNodeId.getHost,
    executorMemory = executorMemoryMB,
    executorCores = executorCores,
    appId = appId,
    securityMgr = securityManager,
    localResources = localResourceMap
  )
  
  // Launch in separate thread
  launcherPool.execute(runnable)
}

Resource Allocation Strategies

Dynamic Allocation

Integration with Spark's dynamic allocation for elastic resource scaling.

// Configuration for dynamic allocation
val conf = new SparkConf()
  .set("spark.dynamicAllocation.enabled", "true")
  .set("spark.dynamicAllocation.minExecutors", "2")
  .set("spark.dynamicAllocation.maxExecutors", "100")
  .set("spark.dynamicAllocation.initialExecutors", "10")
  .set("spark.dynamicAllocation.targetUtilization", "0.8")

Scaling Behavior:

  • Requests additional containers when task queue grows
  • Releases idle containers after configured timeout
  • Respects YARN queue and cluster resource limits
  • Coordinates with YARN fair/capacity schedulers

Locality Preferences

Leverages YARN's rack awareness for optimal data locality.

class SparkRackResolver(conf: SparkConf, yarnConf: YarnConfiguration) {
  def resolve(hostName: String): String
}

Locality Levels:

  1. NODE_LOCAL: Same node as data
  2. RACK_LOCAL: Same rack as data
  3. ANY: Any available node

Container Request Strategy:

  • Requests containers with locality preferences
  • Falls back to lower locality levels if needed
  • Balances locality with resource availability

Resource Constraints

YARN-specific resource constraint handling.

Memory Management:

// Memory overhead calculation
val executorMemory = conf.get("spark.executor.memory", "1g")
val memoryOverhead = conf.get("spark.yarn.executor.memoryOverhead", 
  math.max((executorMemory * 0.1).toLong, 384L).toString)
val totalMemory = executorMemory + memoryOverhead

CPU Allocation:

// Core allocation with YARN constraints
val executorCores = conf.getInt("spark.executor.cores", 1)
val maxCores = yarnConf.getInt("yarn.scheduler.maximum-allocation-vcores", Int.MaxValue)
val requestCores = math.min(executorCores, maxCores)

Container Lifecycle Management

Container States

Container Lifecycle:

  1. REQUESTED: Container request submitted to YARN
  2. ALLOCATED: Container allocated by ResourceManager
  3. LAUNCHING: Container being launched on NodeManager
  4. RUNNING: Executor process running successfully
  5. COMPLETED: Container finished (success or failure)

Failure Handling

Failure Categories:

  • Preemption: Container killed by YARN for resource rebalancing
  • Node Failure: NodeManager or hardware failure
  • Application Error: Executor process crashed or failed
  • Resource Exhaustion: Out of memory or disk space

Recovery Strategies:

// Retry configuration
val maxExecutorFailures = conf.getInt("spark.yarn.max.executor.failures", 
  math.max(2 * numExecutors, 3))

// Blacklist configuration  
val nodeBlacklistEnabled = conf.getBoolean("spark.blacklist.enabled", false)
val maxNodeBlacklist = conf.getInt("spark.blacklist.application.maxFailedTasksPerNode", 2)

Container Monitoring

Health Monitoring:

  • Container resource usage tracking
  • Executor heartbeat monitoring
  • Node failure detection
  • Container exit code analysis

Metrics Integration:

// Monitoring through ApplicationMaster metrics
val source = new ApplicationMasterSource()
source.registerGauge("numExecutorsRunning", () => numExecutorsRunning)
source.registerGauge("numExecutorsFailed", () => numExecutorsFailed)
source.registerGauge("numPendingContainers", () => numPendingAllocate)

Cleanup and Termination

Graceful Shutdown:

  1. Stop accepting new container requests
  2. Complete running tasks where possible
  3. Release allocated but unused containers
  4. Clean up staged resources
  5. Unregister from ResourceManager

Emergency Shutdown:

  1. Immediately kill all executor containers
  2. Force cleanup of staged resources
  3. Report failure status to ResourceManager

Configuration Integration

Memory Configuration

// Executor memory settings
"spark.executor.memory" -> "4g"                    // Heap memory
"spark.yarn.executor.memoryOverhead" -> "1g"       // Off-heap overhead
"spark.executor.memoryFraction" -> "0.8"           // Storage/execution split

// ApplicationMaster memory  
"spark.yarn.am.memory" -> "2g"                     // AM heap memory
"spark.yarn.am.memoryOverhead" -> "384m"           // AM overhead

CPU Configuration

// Executor CPU settings
"spark.executor.cores" -> "4"                      // Cores per executor
"spark.executor.instances" -> "10"                 // Static executor count

// ApplicationMaster CPU
"spark.yarn.am.cores" -> "2"                       // AM CPU cores

Resource Queue Configuration

// YARN queue and scheduling
"spark.yarn.queue" -> "production"                 // YARN queue name
"spark.yarn.priority" -> "1"                       // Application priority
"spark.yarn.maxAppAttempts" -> "3"                 // Max application attempts

Error Handling and Diagnostics

Common Resource Errors

Insufficient Resources:

throw new YarnException("Could not allocate container within timeout")
throw new IllegalStateException("Requested memory exceeds queue maximum")

Container Launch Failures:

throw new IOException("Failed to launch container on node")
throw new SecurityException("Container launch denied due to security policy")

Resource Constraint Violations:

throw new IllegalArgumentException("Executor memory must be at least 384MB")
throw new SparkException("Total requested cores exceed cluster capacity")

Diagnostic Information

Resource Status Reporting:

def getResourceStatus: String = {
  s"""
    |Executors Running: $getNumExecutorsRunning
    |Executors Failed: $getNumExecutorsFailed  
    |Containers Pending: $getNumPendingAllocate
    |Total Containers: $getNumContainersRunning
  """.stripMargin
}

Container Exit Analysis:

def analyzeContainerExit(status: ContainerStatus): String = {
  status.getExitStatus match {
    case 0 => "Container completed successfully"
    case -100 => "Container killed by YARN (preemption)"
    case -101 => "Container killed due to exceeding memory limits"
    case other => s"Container failed with exit code: $other"
  }
}

Performance Optimization

Resource Tuning Guidelines

Memory Optimization:

  • Set executor memory to 80-90% of container memory
  • Reserve sufficient overhead for off-heap operations
  • Consider GC tuning for large heap sizes

CPU Optimization:

  • Balance executor cores with parallelism needs
  • Avoid over-subscribing CPU resources
  • Consider NUMA topology on large nodes

Container Sizing:

  • Larger containers reduce scheduling overhead
  • Smaller containers provide better resource utilization
  • Balance based on workload characteristics

Locality Optimization

Data Locality:

  • Co-locate executors with data when possible
  • Use rack-aware placement for distributed data
  • Consider storage system topology

Network Locality:

  • Minimize cross-rack network traffic
  • Optimize for cluster network topology
  • Consider bandwidth constraints