or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-yarn_2.11@1.6.x

docs

application-management.mdapplication-master.mdconfiguration-utilities.mdindex.mdresource-management.mdscheduler-backends.mdsecurity-authentication.md
tile.json

tessl/maven-org-apache-spark--spark-yarn_2-11

tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0

Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads

resource-management.mddocs/

Resource Management

The YARN module's resource management components handle dynamic executor allocation, container lifecycle management, and intelligent placement strategies to optimize resource utilization and task locality in Spark applications.

YarnAllocator

The YarnAllocator class is the central component responsible for requesting containers from the YARN ResourceManager and managing executor lifecycle.

package org.apache.spark.deploy.yarn

private[yarn] class YarnAllocator(
  driverUrl: String,
  driverRef: RpcEndpointRef,
  conf: Configuration,
  sparkConf: SparkConf,
  amClient: AMRMClient[ContainerRequest],
  appAttemptId: ApplicationAttemptId,
  args: ApplicationMasterArguments,
  securityMgr: SecurityManager
) extends Logging

Core Properties

val allocatedHostToContainersMap: HashMap[String, collection.mutable.Set[ContainerId]]
val allocatedContainerToHostMap: HashMap[ContainerId, String]

protected val executorMemory: Int              // Executor memory in MB
protected val memoryOverhead: Int              // Additional memory overhead
protected val executorCores: Int               // Number of cores per executor
private[yarn] val resource: Resource           // Resource capability for executors

Executor Metrics

def getNumExecutorsRunning: Int

Returns the current number of running executors.

def getNumExecutorsFailed: Int

Returns the total number of failed executors since application start.

Example:

import org.apache.spark.deploy.yarn.YarnAllocator

val allocator: YarnAllocator = // ... initialized allocator

val runningExecutors = allocator.getNumExecutorsRunning
val failedExecutors = allocator.getNumExecutorsFailed

println(s"Running executors: $runningExecutors")
println(s"Failed executors: $failedExecutors")

val totalExecutors = runningExecutors + failedExecutors
val successRate = if (totalExecutors > 0) {
  (runningExecutors.toDouble / totalExecutors) * 100
} else {
  0.0
}
println(f"Executor success rate: $successRate%.2f%%")

Resource Allocation

def allocateResources(): Unit

Main resource allocation method that:

  • Updates resource requests based on target executor count
  • Communicates with ResourceManager (doubles as heartbeat)
  • Handles newly allocated containers by launching executors
  • Processes completed containers and updates metrics

This method must be called periodically to maintain resource allocation.

Example:

// Typical usage in ApplicationMaster heartbeat loop
import java.util.concurrent.{Executors, TimeUnit}

val scheduledExecutor = Executors.newSingleThreadScheduledExecutor()
val heartbeatInterval = 3000 // 3 seconds

scheduledExecutor.scheduleAtFixedRate(new Runnable {
  override def run(): Unit = {
    try {
      allocator.allocateResources()
    } catch {
      case e: Exception =>
        logError("Error during resource allocation", e)
    }
  }
}, 0, heartbeatInterval, TimeUnit.MILLISECONDS)

Resource Request Management

Update Resource Requests

def updateResourceRequests(): Unit

Updates the pending resource requests based on the current target executor count and preferred localities. Called internally by allocateResources() to synchronize container requests with YARN ResourceManager.

Handle Allocated Containers

def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit

Processes newly allocated containers from YARN, deciding which containers to use for launching executors and which to release based on placement strategies and resource requirements.

Parameters:

  • allocatedContainers: Sequence of containers allocated by YARN

Process Completed Containers

def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit

Handles containers that have completed (successfully or with failures), updating executor status and potentially requesting replacement executors for failed ones.

Parameters:

  • completedContainers: Sequence of container status reports for completed containers

Executor Request Management

def requestTotalExecutorsWithPreferredLocalities(
  requestedTotal: Int,
  localityAwareTasks: Int,
  hostToLocalTaskCount: Map[String, Int]
): Boolean

Requests executors with locality preferences to optimize task scheduling performance.

Parameters:

  • requestedTotal: Total number of executors desired (including running ones)
  • localityAwareTasks: Number of tasks with locality preferences
  • hostToLocalTaskCount: Map of preferred hosts to expected task counts

Returns: true if the target was updated, false if unchanged

Example:

import scala.collection.mutable.Map

// Request executors with locality preferences
val hostPreferences = Map(
  "worker1.example.com" -> 8,  // 8 tasks prefer this host
  "worker2.example.com" -> 6,  // 6 tasks prefer this host  
  "worker3.example.com" -> 4   // 4 tasks prefer this host
)

val totalTasks = 18
val requestedExecutors = 6

val updated = allocator.requestTotalExecutorsWithPreferredLocalities(
  requestedTotal = requestedExecutors,
  localityAwareTasks = totalTasks,
  hostToLocalTaskCount = hostPreferences.toMap
)

if (updated) {
  println(s"Updated executor target to $requestedExecutors")
  println("Locality preferences:")
  hostPreferences.foreach { case (host, taskCount) =>
    println(s"  $host: $taskCount tasks")
  }
} else {
  println("Executor target unchanged")
}

Executor Termination

def killExecutor(executorId: String): Unit

Terminates a specific executor by releasing its container.

Parameters:

  • executorId: ID of the executor to terminate

Example:

// Kill specific executors
val executorsToKill = List("executor-1", "executor-3", "executor-7")

executorsToKill.foreach { executorId =>
  try {
    allocator.killExecutor(executorId)
    println(s"Requested termination of executor: $executorId")
  } catch {
    case e: Exception =>
      println(s"Failed to kill executor $executorId: ${e.getMessage}")
  }
}

Pending Allocation Monitoring

def getPendingAllocate: Seq[ContainerRequest]

Returns container requests that have been submitted to YARN but not yet fulfilled.

Example:

val pendingRequests = allocator.getPendingAllocate
println(s"Pending container requests: ${pendingRequests.size}")

pendingRequests.foreach { request =>
  val nodes = Option(request.getNodes).map(_.asScala.mkString(", ")).getOrElse("Any")
  val racks = Option(request.getRacks).map(_.asScala.mkString(", ")).getOrElse("Any")
  println(s"  Request - Nodes: $nodes, Racks: $racks, Resource: ${request.getCapability}")
}

ExecutorRunnable

The ExecutorRunnable class manages the lifecycle of individual executor containers.

package org.apache.spark.deploy.yarn

class ExecutorRunnable(
  container: Container,
  conf: Configuration,
  sparkConf: SparkConf,
  masterAddress: String,
  slaveId: String,
  hostname: String,
  executorMemory: Int,
  executorCores: Int,
  appId: String,
  securityMgr: SecurityManager
) extends Runnable with Logging

Container Launch

def run(): Unit

Main execution method that starts the NodeManager client and launches the container.

def startContainer(): java.util.Map[String, ByteBuffer]

Launches the executor container by:

  • Setting up the container launch context
  • Configuring local resources and environment
  • Starting the executor process within the container

Example:

import org.apache.spark.deploy.yarn.ExecutorRunnable
import org.apache.hadoop.yarn.api.records.Container
import java.util.concurrent.{ExecutorService, Executors}

// Launch executor in container (typically done by YarnAllocator)
def launchExecutor(container: Container): Unit = {
  val executorRunnable = new ExecutorRunnable(
    container = container,
    conf = hadoopConf,
    sparkConf = sparkConf,
    masterAddress = driverUrl,
    slaveId = s"executor-${container.getId}",
    hostname = container.getNodeId.getHost,
    executorMemory = 2048, // 2GB
    executorCores = 2,
    appId = applicationId,
    securityMgr = securityManager
  )

  // Launch in separate thread
  val launcherPool: ExecutorService = Executors.newCachedThreadPool()
  launcherPool.submit(executorRunnable)
}

LocalityPreferredContainerPlacementStrategy

This class implements intelligent container placement to maximize task locality and minimize data transfer.

package org.apache.spark.deploy.yarn

case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String])

class LocalityPreferredContainerPlacementStrategy(
  val sparkConf: SparkConf,
  val yarnConf: Configuration,
  val resource: Resource
)

Locality Calculation

def localityOfRequestedContainers(
  numContainer: Int,
  numLocalityAwareTasks: Int,
  hostToLocalTaskCount: Map[String, Int],
  allocatedContainers: Seq[Container],
  nodeBlacklist: Set[String]
): Array[ContainerLocalityPreferences]

Calculates optimal container placement preferences based on task locality requirements and existing container distribution.

Parameters:

  • numContainer: Number of containers to place
  • numLocalityAwareTasks: Number of tasks with locality preferences
  • hostToLocalTaskCount: Map of preferred hosts to task counts
  • allocatedContainers: Currently allocated containers
  • nodeBlacklist: Nodes to avoid for placement

Returns: Array of locality preferences for each requested container

Example:

import org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy

val placementStrategy = new LocalityPreferredContainerPlacementStrategy(
  sparkConf = sparkConf,
  yarnConf = yarnConf,
  resource = Resource.newInstance(2048, 2) // 2GB RAM, 2 cores
)

// Calculate placement for 5 containers
val hostTaskMap = Map(
  "worker1.example.com" -> 12,
  "worker2.example.com" -> 8,
  "worker3.example.com" -> 6,
  "worker4.example.com" -> 4
)

val preferences = placementStrategy.localityOfRequestedContainers(
  numContainer = 5,
  numLocalityAwareTasks = 30,
  hostToLocalTaskCount = hostTaskMap,
  allocatedContainers = Seq.empty,
  nodeBlacklist = Set("blacklisted-node.example.com")
)

preferences.zipWithIndex.foreach { case (pref, index) =>
  val nodeList = if (pref.nodes.nonEmpty) pref.nodes.mkString(", ") else "Any"
  val rackList = if (pref.racks.nonEmpty) pref.racks.mkString(", ") else "Any"
  println(s"Container $index - Preferred nodes: $nodeList, Preferred racks: $rackList")
}

Dynamic Resource Scaling

Complete Resource Management Example

import org.apache.spark.deploy.yarn.{YarnAllocator, LocalityPreferredContainerPlacementStrategy}
import org.apache.hadoop.yarn.client.api.AMRMClient
import scala.concurrent.duration._
import java.util.concurrent.{ScheduledExecutorService, Executors, TimeUnit}

class DynamicResourceManager(
  allocator: YarnAllocator,
  sparkConf: SparkConf
) {

  private val scheduler: ScheduledExecutorService = 
    Executors.newSingleThreadScheduledExecutor()
  
  private var targetExecutors = sparkConf.getInt("spark.executor.instances", 2)
  private val maxExecutors = sparkConf.getInt("spark.dynamicAllocation.maxExecutors", 20)
  private val minExecutors = sparkConf.getInt("spark.dynamicAllocation.minExecutors", 1)

  def startDynamicAllocation(): Unit = {
    // Heartbeat for resource allocation
    scheduler.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = {
        allocator.allocateResources()
      }
    }, 0, 3, TimeUnit.SECONDS)

    // Dynamic scaling based on workload
    scheduler.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = {
        adjustExecutorCount()
      }
    }, 10, 30, TimeUnit.SECONDS) // Check every 30 seconds
  }

  private def adjustExecutorCount(): Unit = {
    val runningExecutors = allocator.getNumExecutorsRunning
    val failedExecutors = allocator.getNumExecutorsFailed
    val pendingTasks = getPendingTaskCount() // Application-specific logic

    // Scale up if many pending tasks
    if (pendingTasks > runningExecutors * 2 && runningExecutors < maxExecutors) {
      val newTarget = math.min(maxExecutors, runningExecutors + 2)
      scaleExecutors(newTarget)
    }
    // Scale down if low utilization
    else if (pendingTasks < runningExecutors / 2 && runningExecutors > minExecutors) {
      val newTarget = math.max(minExecutors, runningExecutors - 1)
      scaleExecutors(newTarget)
    }
  }

  private def scaleExecutors(newTarget: Int): Unit = {
    // Calculate locality preferences based on current workload
    val hostPreferences = calculateHostPreferences() // Application-specific

    val updated = allocator.requestTotalExecutorsWithPreferredLocalities(
      requestedTotal = newTarget,
      localityAwareTasks = getPendingTaskCount(),
      hostToLocalTaskCount = hostPreferences
    )

    if (updated) {
      println(s"Scaled executor target to $newTarget")
      targetExecutors = newTarget
    }
  }

  private def getPendingTaskCount(): Int = {
    // Application-specific logic to count pending tasks
    // This would typically query the task scheduler
    0
  }

  private def calculateHostPreferences(): Map[String, Int] = {
    // Application-specific logic to determine data locality preferences
    // This might analyze input data distribution
    Map.empty[String, Int]
  }

  def shutdown(): Unit = {
    scheduler.shutdown()
  }
}

// Usage example
val resourceManager = new DynamicResourceManager(allocator, sparkConf)
resourceManager.startDynamicAllocation()

// Application runs...

resourceManager.shutdown()

Resource Monitoring and Metrics

import scala.concurrent.duration._

class ResourceMonitor(allocator: YarnAllocator) {
  
  case class ResourceMetrics(
    running: Int,
    failed: Int,
    pending: Int,
    successRate: Double,
    timestamp: Long = System.currentTimeMillis()
  )

  def collectMetrics(): ResourceMetrics = {
    val running = allocator.getNumExecutorsRunning
    val failed = allocator.getNumExecutorsFailed
    val pending = allocator.getPendingAllocate.size
    val total = running + failed
    val successRate = if (total > 0) (running.toDouble / total) * 100 else 100.0

    ResourceMetrics(running, failed, pending, successRate)
  }

  def monitorResources(intervalSeconds: Int = 10): Unit = {
    val scheduler = Executors.newSingleThreadScheduledExecutor()
    
    scheduler.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = {
        val metrics = collectMetrics()
        
        println(s"=== Resource Metrics (${new java.util.Date()}) ===")
        println(s"Running executors: ${metrics.running}")
        println(s"Failed executors: ${metrics.failed}")
        println(s"Pending requests: ${metrics.pending}")
        println(f"Success rate: ${metrics.successRate}%.2f%%")
        println()

        // Alert on high failure rate
        if (metrics.successRate < 80.0 && metrics.running + metrics.failed > 5) {
          println("WARNING: High executor failure rate detected!")
        }

        // Alert on resource starvation
        if (metrics.pending > 10) {
          println("WARNING: Many pending container requests - possible resource shortage")
        }
      }
    }, 0, intervalSeconds, TimeUnit.SECONDS)
  }
}

// Usage
val monitor = new ResourceMonitor(allocator)
monitor.monitorResources(intervalSeconds = 15)

The resource management components provide comprehensive control over executor lifecycle, intelligent placement strategies, and dynamic scaling capabilities, enabling efficient utilization of YARN cluster resources while optimizing for data locality and fault tolerance.