YARN integration support for Apache Spark cluster computing, enabling Spark applications to run on Hadoop YARN clusters
—
Resource allocation and management components for negotiating and monitoring YARN cluster resources for Spark executors. These components handle the complex interactions with YARN's ResourceManager to obtain, monitor, and release container resources.
Core interface for ResourceManager client operations, providing abstraction over YARN ResourceManager interactions.
/**
* Trait defining ResourceManager client interface
* Provides abstraction for YARN ResourceManager communication
*/
trait YarnRMClient {
// ResourceManager connection and authentication
// Container resource requests and releases
// Application status monitoring and reporting
// Resource allocation callbacks and event handling
}Usage Examples:
import org.apache.spark.deploy.yarn.YarnRMClient
// YarnRMClient implementations are version-specific
// Created internally by ApplicationMaster and Client components
val rmClient: YarnRMClient = // Implementation instance
// Used for ResourceManager communication throughout application lifecycleVersion-specific implementations of the YarnRMClient trait for different Hadoop YARN API versions.
/**
* Implementation of YarnRMClient for specific YARN API versions
* Available in both alpha (deprecated) and stable modules
*/
class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient {
/**
* Register ApplicationMaster with ResourceManager
* @param host ApplicationMaster host
* @param port ApplicationMaster port
* @param trackingUrl Application tracking URL
* @return Registration response from ResourceManager
*/
def register(host: String, port: Int, trackingUrl: String): RegisterApplicationMasterResponse
/**
* Allocate resources from ResourceManager
* @param progressIndicator Application progress (0.0 to 1.0)
* @return Allocation response with assigned containers
*/
def allocate(progressIndicator: Float): AllocateResponse
/**
* Unregister ApplicationMaster from ResourceManager
* @param status Final application status
* @param appMessage Final message to ResourceManager
* @param trackingUrl Final tracking URL
*/
def unregister(status: FinalApplicationStatus, appMessage: String, trackingUrl: String): Unit
}Abstract base class providing core resource allocation logic for YARN containers.
/**
* Abstract base class for YARN resource allocation logic
* Manages executor container requests and lifecycle
*/
private[yarn] abstract class YarnAllocator(
conf: Configuration,
sparkConf: SparkConf,
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
securityMgr: SecurityManager
) extends Logging {
/** Abstract method to allocate containers from ResourceManager */
protected def allocateContainers(resourceRequests: JList[ResourceRequest]): Unit
/** Abstract method to release a specific container */
protected def releaseContainer(container: Container): Unit
/** Request total number of executors from cluster */
def requestTotalExecutors(requestedTotal: Int): Unit
/** Kill a specific executor by ID */
def killExecutor(executorId: String): Unit
/** Allocate resources and handle responses from ResourceManager */
def allocateResources(): Unit
/** Update resource requests for executor containers */
def updateResourceRequests(): Unit
}
/**
* Companion object with constants and utilities
*/
object YarnAllocator {
// Internal constants for resource allocation
}Usage Examples:
import org.apache.spark.deploy.yarn.YarnAllocator
// YarnAllocator is extended by version-specific implementations
// Handles the core logic for:
// - Requesting executor containers from ResourceManager
// - Launching executor processes in allocated containers
// - Monitoring executor health and handling failures
// - Releasing containers when executors completeEnumeration defining allocation strategies for YARN resource requests.
/**
* Enumeration for YARN allocation types
* Defines resource allocation strategies for container placement
*/
object AllocationType extends Enumeration {
type AllocationType = Value
/** Host-specific allocation - request container on specific host */
val HOST = Value
/** Rack-specific allocation - request container within specific rack */
val RACK = Value
/** Any allocation - allow ResourceManager to place container anywhere */
val ANY = Value
}Version-specific resource allocation implementations that extend the base YarnAllocator functionality.
/**
* Version-specific resource allocation implementation
* Available in both alpha and stable API versions
*/
private[yarn] class YarnAllocationHandler extends YarnAllocator {
// Concrete implementation of resource allocation logic
// YARN API version-specific container operations
// Executor container launch and monitoring
// Resource optimization and failure recovery
}Enumeration defining different types of YARN resource allocations.
/**
* Enumeration for YARN allocation types
* Defines different categories of resource requests
*/
object AllocationType extends Enumeration {
// Different allocation type values for resource categorization
// Used for tracking and managing different types of resource requests
}Usage Examples:
import org.apache.spark.deploy.yarn.AllocationType
// Used internally for categorizing resource allocation requests
val allocationType = AllocationType.someValue
// Helps track and manage different types of container requests// Typical resource allocation workflow
abstract class YarnAllocator {
// 1. Calculate resource requirements based on Spark configuration
protected def calculateResourceRequirements(): ContainerRequest
// 2. Submit resource requests to ResourceManager
protected def requestContainers(requests: List[ContainerRequest]): Unit
// 3. Handle container allocation responses from ResourceManager
protected def onContainersAllocated(containers: List[Container]): Unit
// 4. Launch executor processes in allocated containers
protected def launchExecutors(containers: List[Container]): Unit
// 5. Monitor executor health and handle failures
protected def monitorExecutors(): Unit
// 6. Release containers when executors complete or fail
protected def releaseContainers(containers: List[Container]): Unit
}// Resource calculation based on Spark configuration
class YarnAllocator {
private def buildContainerRequest(): ContainerRequest = {
val executorMemory = sparkConf.get("spark.executor.memory", "1024m")
val executorCores = sparkConf.getInt("spark.executor.cores", 1)
val priority = sparkConf.getInt("spark.yarn.priority", 0)
// Build YARN ContainerRequest with calculated resources
// Include locality preferences and resource constraints
// Set appropriate priority and capability requirements
}
}// Container launch process for executor deployment
class YarnAllocationHandler {
private def launchExecutorContainer(container: Container): Unit = {
// 1. Prepare executor launch context
val launchContext = createExecutorLaunchContext()
// 2. Set up environment variables and classpath
setupExecutorEnvironment(launchContext)
// 3. Configure executor JVM parameters
configureExecutorJVM(launchContext)
// 4. Launch container with NodeManager
nodeManager.startContainer(container, launchContext)
// 5. Track container for monitoring
trackExecutorContainer(container.getId)
}
}// Continuous monitoring of executor containers
abstract class YarnAllocator {
private def monitorContainerHealth(): Unit = {
// Monitor executor heartbeats and health status
// Detect container failures and exits
// Handle node failures and blacklisting
// Report container status to ApplicationMaster
}
}// Optimizing container allocation for data locality
class YarnAllocator {
private def buildLocalityRequest(
preferredNodes: List[String],
preferredRacks: List[String]
): ContainerRequest = {
// Request containers on preferred nodes (highest priority)
// Fall back to preferred racks (medium priority)
// Accept any available nodes (lowest priority)
// Balance locality with resource availability
}
}// Dynamic executor allocation integration
abstract class YarnAllocator {
def requestAdditionalExecutors(numExecutors: Int): Unit = {
// Support for Spark's dynamic allocation feature
// Scale executor count based on workload demands
// Integrate with cluster resource availability
}
def removeExecutors(executorIds: Set[String]): Unit = {
// Gracefully shutdown and release executor containers
// Ensure running tasks complete before container release
// Update resource tracking and allocation state
}
}// Key configuration properties affecting resource management
spark.executor.memory=2g // Executor container memory
spark.executor.cores=2 // Executor CPU cores
spark.executor.memoryFraction=0.8 // Memory allocation within container
spark.executor.memoryOffHeap.enabled=true // Off-heap memory usage
// YARN-specific resource configuration
spark.yarn.executor.memoryOverhead=384 // Additional memory overhead
spark.yarn.executor.nodeLabelExpression // Node label constraints
spark.yarn.priority=0 // Application priority
spark.yarn.queue=production // YARN queue assignment// Container-level configuration options
spark.yarn.containerLauncherMaxThreads=25 // Parallel container launch limit
spark.yarn.executor.failuresValidityInterval=1h // Failure tracking window
spark.yarn.max.executor.failures=3 // Max executor failures per node
spark.yarn.nodemanager.resource.memory-mb // NodeManager memory limits// Robust error handling for container failures
class YarnAllocator {
private def handleContainerFailure(containerId: ContainerId, exitStatus: Int): Unit = {
// Log container failure details
// Determine if failure should trigger node blacklisting
// Request replacement container if within failure limits
// Report failure to Spark scheduler for task rescheduling
}
private def blacklistNode(nodeId: String, reason: String): Unit = {
// Add node to blacklist to avoid future allocations
// Implement exponential backoff for blacklist duration
// Provide mechanisms for node rehabilitation
}
}// Handling ResourceManager failures and reconnection
trait YarnRMClient {
private def handleRMFailure(): Unit = {
// Detect ResourceManager connection loss
// Implement reconnection with exponential backoff
// Recover application state after reconnection
// Resubmit pending resource requests
}
}// Integration with Spark metrics system
abstract class YarnAllocator {
// Track container allocation success rates
private val containerAllocationRate = metrics.meter("containerAllocationRate")
// Monitor resource request fulfillment times
private val resourceRequestLatency = metrics.histogram("resourceRequestLatency")
// Track executor launch success and failure rates
private val executorLaunchSuccess = metrics.counter("executorLaunchSuccess")
private val executorLaunchFailure = metrics.counter("executorLaunchFailure")
}The resource management components provide detailed metrics for:
These metrics enable optimization of resource allocation strategies and troubleshooting of cluster resource issues.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--yarn-parent-2-10