Apache Spark YARN resource manager integration module that enables Spark applications to run on YARN clusters
—
Components responsible for allocating and managing YARN containers for Spark executors. These classes handle resource allocation strategies, placement policies, container management, and communication with YARN ResourceManager and NodeManagers.
Core component that manages resource allocation and executor lifecycle on YARN. Handles container requests, allocation monitoring, and executor management.
/**
* Manages resource allocation and executor lifecycle on YARN
* Handles container requests, allocation, and executor management
*/
class YarnAllocator {
/** Allocate containers from YARN ResourceManager */
def allocateResources(): Unit
/**
* Request executors with locality preferences
* @return true if request was successful
*/
def requestTotalExecutorsWithPreferredLocalities(): Boolean
/**
* Kill specific executor
* @param executorId ID of executor to kill
*/
def killExecutor(executorId: String): Unit
/** Get number of failed executors */
def getNumExecutorsFailed: Int
/** Stop allocator and cleanup resources */
def stop(): Unit
}Usage Pattern:
// YarnAllocator is typically used internally by scheduler backends
// but understanding its role is important for configuration
import org.apache.spark.SparkConf
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("ResourceAllocationExample")
// Configure executor resources
.set("spark.executor.instances", "10")
.set("spark.executor.cores", "2")
.set("spark.executor.memory", "4g")
.set("spark.executor.memoryOverhead", "512m")
// Configure allocation behavior
.set("spark.yarn.executor.failuresValidityInterval", "1h")
.set("spark.yarn.max.executor.failures", "3")
// YarnAllocator will use these settings for resource allocationClient for communicating with YARN ResourceManager. Handles ApplicationMaster registration and resource allocator creation.
/**
* Client for communicating with YARN ResourceManager
* Handles AM registration and resource allocation setup
*/
class YarnRMClient {
/** Register ApplicationMaster with ResourceManager */
def register(): Unit
/** Create resource allocator instance */
def createAllocator(): YarnAllocator
/**
* Unregister from ResourceManager
* @param status Final application status
* @param diagnostics Diagnostic message
*/
def unregister(status: FinalApplicationStatus, diagnostics: String): Unit
}Helper object for creating YARN resource requests with proper resource profiles and constraints.
/**
* Helper for creating YARN resource requests
* Handles resource profiles, constraints, and locality preferences
*/
object ResourceRequestHelper {
/**
* Create resource request for executors
* @param resource Resource requirements
* @param nodes Preferred node list
* @param racks Preferred rack list
* @param numContainers Number of containers requested
* @return YARN ResourceRequest
*/
def createResourceRequest(
resource: Resource,
nodes: List[String],
racks: List[String],
numContainers: Int
): ResourceRequest
/**
* Build resource profile for container
* @param cores Number of CPU cores
* @param memory Memory in MB
* @param customResources Additional resource requirements
* @return YARN resource profile
*/
def buildResourceProfile(
cores: Int,
memory: Int,
customResources: Map[String, Long] = Map.empty
): Resource
}Usage Example:
import org.apache.spark.deploy.yarn.ResourceRequestHelper
import org.apache.hadoop.yarn.api.records.Resource
// Create resource profile for executors
val executorResource = ResourceRequestHelper.buildResourceProfile(
cores = 2,
memory = 4096, // 4GB in MB
customResources = Map(
"yarn.io/gpu" -> 1L, // Request 1 GPU
"example.com/fpga" -> 2L // Request 2 FPGAs
)
)
// Create resource request with locality preferences
val resourceRequest = ResourceRequestHelper.createResourceRequest(
resource = executorResource,
nodes = List("worker1.example.com", "worker2.example.com"),
racks = List("/rack1", "/rack2"),
numContainers = 5
)Container placement strategy that optimizes for data locality and resource utilization.
/**
* Container placement strategy with locality preferences
* Optimizes container placement based on data locality and cluster topology
*/
class LocalityPreferredContainerPlacementStrategy {
/**
* Determine optimal container placement
* @param availableNodes Available cluster nodes
* @param requestedContainers Number of containers to place
* @param localityPreferences Data locality preferences
* @return Placement recommendations
*/
def getContainerPlacement(
availableNodes: Seq[String],
requestedContainers: Int,
localityPreferences: Map[String, Seq[String]]
): Map[String, Int]
}Tracks node health and executor failure patterns to optimize resource allocation decisions.
/**
* Tracks node health and executor failure patterns
* Used by YarnAllocator for intelligent resource allocation
*/
class YarnAllocatorNodeHealthTracker {
/**
* Record executor failure on a node
* @param nodeId Node identifier
* @param executorId Failed executor ID
*/
def recordFailure(nodeId: String, executorId: String): Unit
/**
* Check if node should be avoided for new allocations
* @param nodeId Node to check
* @return true if node should be avoided
*/
def shouldAvoidNode(nodeId: String): Boolean
/**
* Get healthy nodes for allocation
* @param candidateNodes Candidate nodes to filter
* @return Filtered list of healthy nodes
*/
def getHealthyNodes(candidateNodes: Seq[String]): Seq[String]
}import org.apache.spark.SparkConf
// Configure dynamic allocation with YARN
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("DynamicAllocationExample")
// Enable dynamic allocation
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", "2")
.set("spark.dynamicAllocation.maxExecutors", "50")
.set("spark.dynamicAllocation.initialExecutors", "10")
// Configure allocation behavior
.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "300s")
.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "5s")
// Configure with external shuffle service
.set("spark.shuffle.service.enabled", "true")
.set("spark.yarn.shuffle.stopOnFailure", "false")import org.apache.spark.SparkConf
// Configure custom resource types (GPUs, FPGAs, etc.)
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("CustomResourceExample")
// Configure GPU resources
.set("spark.executor.resource.gpu.amount", "1")
.set("spark.executor.resource.gpu.discoveryScript", "/opt/spark/scripts/gpu_discovery.sh")
.set("spark.executor.resource.gpu.vendor", "nvidia.com")
// Configure custom FPGA resources
.set("spark.executor.resource.fpga.amount", "2")
.set("spark.executor.resource.fpga.discoveryScript", "/opt/spark/scripts/fpga_discovery.sh")
.set("spark.executor.resource.fpga.vendor", "xilinx.com")
// Task-level resource requests
.set("spark.task.resource.gpu.amount", "0.5") // Tasks can share GPUs
.set("spark.task.resource.fpga.amount", "1") // Tasks use dedicated FPGAsimport org.apache.spark.{SparkConf, SparkContext}
// Static resource allocation - fixed number of executors
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("StaticAllocationExample")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.executor.instances", "20")
.set("spark.executor.cores", "4")
.set("spark.executor.memory", "8g")
.set("spark.executor.memoryOverhead", "1g")
val sc = new SparkContext(conf)import org.apache.spark.SparkConf
// Optimize memory allocation for different workloads
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("MemoryOptimizationExample")
// Total executor memory breakdown
.set("spark.executor.memory", "12g")
.set("spark.executor.memoryOverhead", "2g") // Additional overhead for YARN container
// Memory fractions for different purposes
.set("spark.executor.memoryFraction", "0.8") // JVM heap for RDD cache and execution
.set("spark.executor.storageFraction", "0.5") // Fraction of memoryFraction for RDD cache
// Off-heap memory (optional)
.set("spark.executor.memory.offHeap.enabled", "true")
.set("spark.executor.memory.offHeap.size", "4g")import org.apache.spark.SparkConf
// Configure locality preferences for optimal data access
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("LocalityAwareExample")
// Locality wait times
.set("spark.locality.wait", "3s") // Generic locality wait
.set("spark.locality.wait.process", "0") // Wait for process-local data
.set("spark.locality.wait.node", "1s") // Wait for node-local data
.set("spark.locality.wait.rack", "2s") // Wait for rack-local data
// Node labeling for placement
.set("spark.yarn.executor.nodeLabelExpression", "compute-intensive")
.set("spark.yarn.am.nodeLabelExpression", "management")import org.apache.spark.SparkConf
// Configure preemption handling for spot instances or preemptible queues
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("PreemptionHandlingExample")
// Enable preemption-aware scheduling
.set("spark.yarn.executor.failuresValidityInterval", "1h")
.set("spark.yarn.max.executor.failures", "10") // Higher tolerance for preemptions
.set("spark.yarn.submit.waitAppCompletion", "false") // Don't wait if using spot instances
// Configure checkpointing for fault tolerance
.set("spark.sql.streaming.checkpointLocation", "hdfs://cluster/checkpoints/myapp")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Faster recovery// Monitor resource usage and allocation effectiveness
import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate()
// Access YARN-specific information
val yarnBackend = sc.schedulerBackend match {
case yarn: org.apache.spark.scheduler.cluster.YarnSchedulerBackend =>
Some(yarn)
case _ => None
}
yarnBackend.foreach { backend =>
println(s"Application ID: ${backend.applicationId()}")
println(s"Application Attempt: ${backend.applicationAttemptId()}")
// For cluster mode, get additional information
backend match {
case clusterBackend: org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend =>
val driverLogs = clusterBackend.getDriverLogUrls()
val driverAttrs = clusterBackend.getDriverAttributes()
println(s"Driver Container Logs: $driverLogs")
println(s"Driver Container Attributes: $driverAttrs")
case _ => // Client mode
}
}import org.apache.spark.SparkConf
// Configure retry and failure handling for resource allocation
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("AllocationFailureHandling")
// Retry configuration
.set("spark.yarn.maxAppAttempts", "3")
.set("spark.yarn.am.attemptFailuresValidityInterval", "1h")
.set("spark.yarn.executor.failuresValidityInterval", "1h")
.set("spark.yarn.max.executor.failures", "5")
// Resource allocation timeouts
.set("spark.yarn.scheduler.heartbeat.interval-ms", "3000")
.set("spark.yarn.scheduler.initial-allocation.interval", "200ms")
.set("spark.yarn.containerLauncherMaxThreads", "25")
// Enable detailed logging
.set("spark.yarn.am.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps")
.set("spark.yarn.executor.extraJavaOptions", "-XX:+PrintGCDetails")// Enable detailed logging for resource allocation debugging
import org.apache.spark.SparkConf
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("ResourceDebuggingExample")
// Enable debug logging
.set("spark.yarn.am.extraJavaOptions",
"-Dlog4j.configuration=yarn-log4j.properties " +
"-Dyarn.app.container.log.dir=/tmp/yarn-logs " +
"-Dyarn.app.container.log.filesize=100MB")
// Resource request diagnostics
.set("spark.yarn.preserve.staging.files", "true") // Keep staging files for debugging
)set("spark.yarn.submit.file.replication", "3") // Ensure file availabilityInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-yarn-2-12