tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads
The YARN module's resource management components handle dynamic executor allocation, container lifecycle management, and intelligent placement strategies to optimize resource utilization and task locality in Spark applications.
The YarnAllocator class is the central component responsible for requesting containers from the YARN ResourceManager and managing executor lifecycle.
package org.apache.spark.deploy.yarn
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: Configuration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
securityMgr: SecurityManager
) extends Loggingval allocatedHostToContainersMap: HashMap[String, collection.mutable.Set[ContainerId]]
val allocatedContainerToHostMap: HashMap[ContainerId, String]
protected val executorMemory: Int // Executor memory in MB
protected val memoryOverhead: Int // Additional memory overhead
protected val executorCores: Int // Number of cores per executor
private[yarn] val resource: Resource // Resource capability for executorsdef getNumExecutorsRunning: IntReturns the current number of running executors.
def getNumExecutorsFailed: IntReturns the total number of failed executors since application start.
Example:
import org.apache.spark.deploy.yarn.YarnAllocator
val allocator: YarnAllocator = // ... initialized allocator
val runningExecutors = allocator.getNumExecutorsRunning
val failedExecutors = allocator.getNumExecutorsFailed
println(s"Running executors: $runningExecutors")
println(s"Failed executors: $failedExecutors")
val totalExecutors = runningExecutors + failedExecutors
val successRate = if (totalExecutors > 0) {
(runningExecutors.toDouble / totalExecutors) * 100
} else {
0.0
}
println(f"Executor success rate: $successRate%.2f%%")def allocateResources(): UnitMain resource allocation method that:
This method must be called periodically to maintain resource allocation.
Example:
// Typical usage in ApplicationMaster heartbeat loop
import java.util.concurrent.{Executors, TimeUnit}
val scheduledExecutor = Executors.newSingleThreadScheduledExecutor()
val heartbeatInterval = 3000 // 3 seconds
scheduledExecutor.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
try {
allocator.allocateResources()
} catch {
case e: Exception =>
logError("Error during resource allocation", e)
}
}
}, 0, heartbeatInterval, TimeUnit.MILLISECONDS)def updateResourceRequests(): UnitUpdates the pending resource requests based on the current target executor count and preferred localities. Called internally by allocateResources() to synchronize container requests with YARN ResourceManager.
def handleAllocatedContainers(allocatedContainers: Seq[Container]): UnitProcesses newly allocated containers from YARN, deciding which containers to use for launching executors and which to release based on placement strategies and resource requirements.
Parameters:
allocatedContainers: Sequence of containers allocated by YARNdef processCompletedContainers(completedContainers: Seq[ContainerStatus]): UnitHandles containers that have completed (successfully or with failures), updating executor status and potentially requesting replacement executors for failed ones.
Parameters:
completedContainers: Sequence of container status reports for completed containersdef requestTotalExecutorsWithPreferredLocalities(
requestedTotal: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]
): BooleanRequests executors with locality preferences to optimize task scheduling performance.
Parameters:
requestedTotal: Total number of executors desired (including running ones)localityAwareTasks: Number of tasks with locality preferenceshostToLocalTaskCount: Map of preferred hosts to expected task countsReturns: true if the target was updated, false if unchanged
Example:
import scala.collection.mutable.Map
// Request executors with locality preferences
val hostPreferences = Map(
"worker1.example.com" -> 8, // 8 tasks prefer this host
"worker2.example.com" -> 6, // 6 tasks prefer this host
"worker3.example.com" -> 4 // 4 tasks prefer this host
)
val totalTasks = 18
val requestedExecutors = 6
val updated = allocator.requestTotalExecutorsWithPreferredLocalities(
requestedTotal = requestedExecutors,
localityAwareTasks = totalTasks,
hostToLocalTaskCount = hostPreferences.toMap
)
if (updated) {
println(s"Updated executor target to $requestedExecutors")
println("Locality preferences:")
hostPreferences.foreach { case (host, taskCount) =>
println(s" $host: $taskCount tasks")
}
} else {
println("Executor target unchanged")
}def killExecutor(executorId: String): UnitTerminates a specific executor by releasing its container.
Parameters:
executorId: ID of the executor to terminateExample:
// Kill specific executors
val executorsToKill = List("executor-1", "executor-3", "executor-7")
executorsToKill.foreach { executorId =>
try {
allocator.killExecutor(executorId)
println(s"Requested termination of executor: $executorId")
} catch {
case e: Exception =>
println(s"Failed to kill executor $executorId: ${e.getMessage}")
}
}def getPendingAllocate: Seq[ContainerRequest]Returns container requests that have been submitted to YARN but not yet fulfilled.
Example:
val pendingRequests = allocator.getPendingAllocate
println(s"Pending container requests: ${pendingRequests.size}")
pendingRequests.foreach { request =>
val nodes = Option(request.getNodes).map(_.asScala.mkString(", ")).getOrElse("Any")
val racks = Option(request.getRacks).map(_.asScala.mkString(", ")).getOrElse("Any")
println(s" Request - Nodes: $nodes, Racks: $racks, Resource: ${request.getCapability}")
}The ExecutorRunnable class manages the lifecycle of individual executor containers.
package org.apache.spark.deploy.yarn
class ExecutorRunnable(
container: Container,
conf: Configuration,
sparkConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
appId: String,
securityMgr: SecurityManager
) extends Runnable with Loggingdef run(): UnitMain execution method that starts the NodeManager client and launches the container.
def startContainer(): java.util.Map[String, ByteBuffer]Launches the executor container by:
Example:
import org.apache.spark.deploy.yarn.ExecutorRunnable
import org.apache.hadoop.yarn.api.records.Container
import java.util.concurrent.{ExecutorService, Executors}
// Launch executor in container (typically done by YarnAllocator)
def launchExecutor(container: Container): Unit = {
val executorRunnable = new ExecutorRunnable(
container = container,
conf = hadoopConf,
sparkConf = sparkConf,
masterAddress = driverUrl,
slaveId = s"executor-${container.getId}",
hostname = container.getNodeId.getHost,
executorMemory = 2048, // 2GB
executorCores = 2,
appId = applicationId,
securityMgr = securityManager
)
// Launch in separate thread
val launcherPool: ExecutorService = Executors.newCachedThreadPool()
launcherPool.submit(executorRunnable)
}This class implements intelligent container placement to maximize task locality and minimize data transfer.
package org.apache.spark.deploy.yarn
case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String])
class LocalityPreferredContainerPlacementStrategy(
val sparkConf: SparkConf,
val yarnConf: Configuration,
val resource: Resource
)def localityOfRequestedContainers(
numContainer: Int,
numLocalityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
allocatedContainers: Seq[Container],
nodeBlacklist: Set[String]
): Array[ContainerLocalityPreferences]Calculates optimal container placement preferences based on task locality requirements and existing container distribution.
Parameters:
numContainer: Number of containers to placenumLocalityAwareTasks: Number of tasks with locality preferenceshostToLocalTaskCount: Map of preferred hosts to task countsallocatedContainers: Currently allocated containersnodeBlacklist: Nodes to avoid for placementReturns: Array of locality preferences for each requested container
Example:
import org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy
val placementStrategy = new LocalityPreferredContainerPlacementStrategy(
sparkConf = sparkConf,
yarnConf = yarnConf,
resource = Resource.newInstance(2048, 2) // 2GB RAM, 2 cores
)
// Calculate placement for 5 containers
val hostTaskMap = Map(
"worker1.example.com" -> 12,
"worker2.example.com" -> 8,
"worker3.example.com" -> 6,
"worker4.example.com" -> 4
)
val preferences = placementStrategy.localityOfRequestedContainers(
numContainer = 5,
numLocalityAwareTasks = 30,
hostToLocalTaskCount = hostTaskMap,
allocatedContainers = Seq.empty,
nodeBlacklist = Set("blacklisted-node.example.com")
)
preferences.zipWithIndex.foreach { case (pref, index) =>
val nodeList = if (pref.nodes.nonEmpty) pref.nodes.mkString(", ") else "Any"
val rackList = if (pref.racks.nonEmpty) pref.racks.mkString(", ") else "Any"
println(s"Container $index - Preferred nodes: $nodeList, Preferred racks: $rackList")
}import org.apache.spark.deploy.yarn.{YarnAllocator, LocalityPreferredContainerPlacementStrategy}
import org.apache.hadoop.yarn.client.api.AMRMClient
import scala.concurrent.duration._
import java.util.concurrent.{ScheduledExecutorService, Executors, TimeUnit}
class DynamicResourceManager(
allocator: YarnAllocator,
sparkConf: SparkConf
) {
private val scheduler: ScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor()
private var targetExecutors = sparkConf.getInt("spark.executor.instances", 2)
private val maxExecutors = sparkConf.getInt("spark.dynamicAllocation.maxExecutors", 20)
private val minExecutors = sparkConf.getInt("spark.dynamicAllocation.minExecutors", 1)
def startDynamicAllocation(): Unit = {
// Heartbeat for resource allocation
scheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
allocator.allocateResources()
}
}, 0, 3, TimeUnit.SECONDS)
// Dynamic scaling based on workload
scheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
adjustExecutorCount()
}
}, 10, 30, TimeUnit.SECONDS) // Check every 30 seconds
}
private def adjustExecutorCount(): Unit = {
val runningExecutors = allocator.getNumExecutorsRunning
val failedExecutors = allocator.getNumExecutorsFailed
val pendingTasks = getPendingTaskCount() // Application-specific logic
// Scale up if many pending tasks
if (pendingTasks > runningExecutors * 2 && runningExecutors < maxExecutors) {
val newTarget = math.min(maxExecutors, runningExecutors + 2)
scaleExecutors(newTarget)
}
// Scale down if low utilization
else if (pendingTasks < runningExecutors / 2 && runningExecutors > minExecutors) {
val newTarget = math.max(minExecutors, runningExecutors - 1)
scaleExecutors(newTarget)
}
}
private def scaleExecutors(newTarget: Int): Unit = {
// Calculate locality preferences based on current workload
val hostPreferences = calculateHostPreferences() // Application-specific
val updated = allocator.requestTotalExecutorsWithPreferredLocalities(
requestedTotal = newTarget,
localityAwareTasks = getPendingTaskCount(),
hostToLocalTaskCount = hostPreferences
)
if (updated) {
println(s"Scaled executor target to $newTarget")
targetExecutors = newTarget
}
}
private def getPendingTaskCount(): Int = {
// Application-specific logic to count pending tasks
// This would typically query the task scheduler
0
}
private def calculateHostPreferences(): Map[String, Int] = {
// Application-specific logic to determine data locality preferences
// This might analyze input data distribution
Map.empty[String, Int]
}
def shutdown(): Unit = {
scheduler.shutdown()
}
}
// Usage example
val resourceManager = new DynamicResourceManager(allocator, sparkConf)
resourceManager.startDynamicAllocation()
// Application runs...
resourceManager.shutdown()import scala.concurrent.duration._
class ResourceMonitor(allocator: YarnAllocator) {
case class ResourceMetrics(
running: Int,
failed: Int,
pending: Int,
successRate: Double,
timestamp: Long = System.currentTimeMillis()
)
def collectMetrics(): ResourceMetrics = {
val running = allocator.getNumExecutorsRunning
val failed = allocator.getNumExecutorsFailed
val pending = allocator.getPendingAllocate.size
val total = running + failed
val successRate = if (total > 0) (running.toDouble / total) * 100 else 100.0
ResourceMetrics(running, failed, pending, successRate)
}
def monitorResources(intervalSeconds: Int = 10): Unit = {
val scheduler = Executors.newSingleThreadScheduledExecutor()
scheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
val metrics = collectMetrics()
println(s"=== Resource Metrics (${new java.util.Date()}) ===")
println(s"Running executors: ${metrics.running}")
println(s"Failed executors: ${metrics.failed}")
println(s"Pending requests: ${metrics.pending}")
println(f"Success rate: ${metrics.successRate}%.2f%%")
println()
// Alert on high failure rate
if (metrics.successRate < 80.0 && metrics.running + metrics.failed > 5) {
println("WARNING: High executor failure rate detected!")
}
// Alert on resource starvation
if (metrics.pending > 10) {
println("WARNING: Many pending container requests - possible resource shortage")
}
}
}, 0, intervalSeconds, TimeUnit.SECONDS)
}
}
// Usage
val monitor = new ResourceMonitor(allocator)
monitor.monitorResources(intervalSeconds = 15)The resource management components provide comprehensive control over executor lifecycle, intelligent placement strategies, and dynamic scaling capabilities, enabling efficient utilization of YARN cluster resources while optimizing for data locality and fault tolerance.