CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-yarn-2-12

Apache Spark YARN resource manager integration module that enables Spark applications to run on YARN clusters

Pending
Overview
Eval results
Files

resource-management.mddocs/

Resource Management

Components responsible for allocating and managing YARN containers for Spark executors. These classes handle resource allocation strategies, placement policies, container management, and communication with YARN ResourceManager and NodeManagers.

Capabilities

YarnAllocator

Core component that manages resource allocation and executor lifecycle on YARN. Handles container requests, allocation monitoring, and executor management.

/**
 * Manages resource allocation and executor lifecycle on YARN
 * Handles container requests, allocation, and executor management
 */
class YarnAllocator {
  
  /** Allocate containers from YARN ResourceManager */
  def allocateResources(): Unit
  
  /**
   * Request executors with locality preferences
   * @return true if request was successful
   */
  def requestTotalExecutorsWithPreferredLocalities(): Boolean
  
  /**
   * Kill specific executor
   * @param executorId ID of executor to kill
   */
  def killExecutor(executorId: String): Unit
  
  /** Get number of failed executors */
  def getNumExecutorsFailed: Int
  
  /** Stop allocator and cleanup resources */
  def stop(): Unit
}

Usage Pattern:

// YarnAllocator is typically used internally by scheduler backends
// but understanding its role is important for configuration

import org.apache.spark.SparkConf

val conf = new SparkConf()
  .setMaster("yarn")
  .setAppName("ResourceAllocationExample")
  // Configure executor resources
  .set("spark.executor.instances", "10")
  .set("spark.executor.cores", "2")
  .set("spark.executor.memory", "4g")
  .set("spark.executor.memoryOverhead", "512m")
  // Configure allocation behavior
  .set("spark.yarn.executor.failuresValidityInterval", "1h")
  .set("spark.yarn.max.executor.failures", "3")

// YarnAllocator will use these settings for resource allocation

YarnRMClient

Client for communicating with YARN ResourceManager. Handles ApplicationMaster registration and resource allocator creation.

/**
 * Client for communicating with YARN ResourceManager
 * Handles AM registration and resource allocation setup
 */
class YarnRMClient {
  
  /** Register ApplicationMaster with ResourceManager */
  def register(): Unit
  
  /** Create resource allocator instance */
  def createAllocator(): YarnAllocator
  
  /**
   * Unregister from ResourceManager
   * @param status Final application status
   * @param diagnostics Diagnostic message
   */
  def unregister(status: FinalApplicationStatus, diagnostics: String): Unit
}

ResourceRequestHelper

Helper object for creating YARN resource requests with proper resource profiles and constraints.

/**
 * Helper for creating YARN resource requests
 * Handles resource profiles, constraints, and locality preferences
 */
object ResourceRequestHelper {
  
  /**
   * Create resource request for executors
   * @param resource Resource requirements
   * @param nodes Preferred node list
   * @param racks Preferred rack list  
   * @param numContainers Number of containers requested
   * @return YARN ResourceRequest
   */
  def createResourceRequest(
    resource: Resource,
    nodes: List[String],
    racks: List[String],
    numContainers: Int
  ): ResourceRequest
  
  /**
   * Build resource profile for container
   * @param cores Number of CPU cores
   * @param memory Memory in MB
   * @param customResources Additional resource requirements
   * @return YARN resource profile
   */
  def buildResourceProfile(
    cores: Int,
    memory: Int,
    customResources: Map[String, Long] = Map.empty
  ): Resource
}

Usage Example:

import org.apache.spark.deploy.yarn.ResourceRequestHelper
import org.apache.hadoop.yarn.api.records.Resource

// Create resource profile for executors
val executorResource = ResourceRequestHelper.buildResourceProfile(
  cores = 2,
  memory = 4096,  // 4GB in MB
  customResources = Map(
    "yarn.io/gpu" -> 1L,  // Request 1 GPU
    "example.com/fpga" -> 2L  // Request 2 FPGAs
  )
)

// Create resource request with locality preferences
val resourceRequest = ResourceRequestHelper.createResourceRequest(
  resource = executorResource,
  nodes = List("worker1.example.com", "worker2.example.com"),
  racks = List("/rack1", "/rack2"),
  numContainers = 5
)

Container Placement Strategies

LocalityPreferredContainerPlacementStrategy

Container placement strategy that optimizes for data locality and resource utilization.

/**
 * Container placement strategy with locality preferences
 * Optimizes container placement based on data locality and cluster topology
 */
class LocalityPreferredContainerPlacementStrategy {
  
  /**
   * Determine optimal container placement
   * @param availableNodes Available cluster nodes
   * @param requestedContainers Number of containers to place
   * @param localityPreferences Data locality preferences
   * @return Placement recommendations
   */
  def getContainerPlacement(
    availableNodes: Seq[String],
    requestedContainers: Int,
    localityPreferences: Map[String, Seq[String]]
  ): Map[String, Int]
}

YarnAllocatorNodeHealthTracker

Tracks node health and executor failure patterns to optimize resource allocation decisions.

/**
 * Tracks node health and executor failure patterns
 * Used by YarnAllocator for intelligent resource allocation
 */
class YarnAllocatorNodeHealthTracker {
  
  /**
   * Record executor failure on a node
   * @param nodeId Node identifier
   * @param executorId Failed executor ID
   */
  def recordFailure(nodeId: String, executorId: String): Unit
  
  /**
   * Check if node should be avoided for new allocations
   * @param nodeId Node to check
   * @return true if node should be avoided
   */
  def shouldAvoidNode(nodeId: String): Boolean
  
  /**
   * Get healthy nodes for allocation
   * @param candidateNodes Candidate nodes to filter
   * @return Filtered list of healthy nodes
   */
  def getHealthyNodes(candidateNodes: Seq[String]): Seq[String]
}

Advanced Resource Management

Dynamic Allocation Integration

import org.apache.spark.SparkConf

// Configure dynamic allocation with YARN
val conf = new SparkConf()
  .setMaster("yarn")
  .setAppName("DynamicAllocationExample")
  // Enable dynamic allocation
  .set("spark.dynamicAllocation.enabled", "true")
  .set("spark.dynamicAllocation.minExecutors", "2")
  .set("spark.dynamicAllocation.maxExecutors", "50")
  .set("spark.dynamicAllocation.initialExecutors", "10")
  // Configure allocation behavior
  .set("spark.dynamicAllocation.executorIdleTimeout", "60s")
  .set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "300s")
  .set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
  .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "5s")
  // Configure with external shuffle service
  .set("spark.shuffle.service.enabled", "true")
  .set("spark.yarn.shuffle.stopOnFailure", "false")

Custom Resource Types

import org.apache.spark.SparkConf

// Configure custom resource types (GPUs, FPGAs, etc.)
val conf = new SparkConf()
  .setMaster("yarn")
  .setAppName("CustomResourceExample")
  // Configure GPU resources
  .set("spark.executor.resource.gpu.amount", "1")
  .set("spark.executor.resource.gpu.discoveryScript", "/opt/spark/scripts/gpu_discovery.sh")
  .set("spark.executor.resource.gpu.vendor", "nvidia.com")
  // Configure custom FPGA resources
  .set("spark.executor.resource.fpga.amount", "2")
  .set("spark.executor.resource.fpga.discoveryScript", "/opt/spark/scripts/fpga_discovery.sh")
  .set("spark.executor.resource.fpga.vendor", "xilinx.com")
  // Task-level resource requests
  .set("spark.task.resource.gpu.amount", "0.5")  // Tasks can share GPUs
  .set("spark.task.resource.fpga.amount", "1")   // Tasks use dedicated FPGAs

Resource Allocation Patterns

Static Allocation

import org.apache.spark.{SparkConf, SparkContext}

// Static resource allocation - fixed number of executors
val conf = new SparkConf()
  .setMaster("yarn")
  .setAppName("StaticAllocationExample")
  .set("spark.dynamicAllocation.enabled", "false")
  .set("spark.executor.instances", "20")
  .set("spark.executor.cores", "4")
  .set("spark.executor.memory", "8g")
  .set("spark.executor.memoryOverhead", "1g")

val sc = new SparkContext(conf)

Memory Optimization

import org.apache.spark.SparkConf

// Optimize memory allocation for different workloads
val conf = new SparkConf()
  .setMaster("yarn")
  .setAppName("MemoryOptimizationExample")
  // Total executor memory breakdown
  .set("spark.executor.memory", "12g")
  .set("spark.executor.memoryOverhead", "2g")  // Additional overhead for YARN container
  // Memory fractions for different purposes
  .set("spark.executor.memoryFraction", "0.8")  // JVM heap for RDD cache and execution
  .set("spark.executor.storageFraction", "0.5")  // Fraction of memoryFraction for RDD cache
  // Off-heap memory (optional)
  .set("spark.executor.memory.offHeap.enabled", "true")
  .set("spark.executor.memory.offHeap.size", "4g")

Locality-Aware Allocation

import org.apache.spark.SparkConf

// Configure locality preferences for optimal data access
val conf = new SparkConf()
  .setMaster("yarn")
  .setAppName("LocalityAwareExample")
  // Locality wait times
  .set("spark.locality.wait", "3s")          // Generic locality wait
  .set("spark.locality.wait.process", "0")   // Wait for process-local data
  .set("spark.locality.wait.node", "1s")     // Wait for node-local data  
  .set("spark.locality.wait.rack", "2s")     // Wait for rack-local data
  // Node labeling for placement
  .set("spark.yarn.executor.nodeLabelExpression", "compute-intensive")
  .set("spark.yarn.am.nodeLabelExpression", "management")

Container Lifecycle Management

Preemption Handling

import org.apache.spark.SparkConf

// Configure preemption handling for spot instances or preemptible queues
val conf = new SparkConf()
  .setMaster("yarn")
  .setAppName("PreemptionHandlingExample")
  // Enable preemption-aware scheduling
  .set("spark.yarn.executor.failuresValidityInterval", "1h")
  .set("spark.yarn.max.executor.failures", "10")  // Higher tolerance for preemptions
  .set("spark.yarn.submit.waitAppCompletion", "false")  // Don't wait if using spot instances
  // Configure checkpointing for fault tolerance
  .set("spark.sql.streaming.checkpointLocation", "hdfs://cluster/checkpoints/myapp")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  // Faster recovery

Container Resource Monitoring

// Monitor resource usage and allocation effectiveness
import org.apache.spark.SparkContext

val sc = SparkContext.getOrCreate()

// Access YARN-specific information
val yarnBackend = sc.schedulerBackend match {
  case yarn: org.apache.spark.scheduler.cluster.YarnSchedulerBackend =>
    Some(yarn)
  case _ => None
}

yarnBackend.foreach { backend =>
  println(s"Application ID: ${backend.applicationId()}")
  println(s"Application Attempt: ${backend.applicationAttemptId()}")
  
  // For cluster mode, get additional information
  backend match {
    case clusterBackend: org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend =>
      val driverLogs = clusterBackend.getDriverLogUrls()
      val driverAttrs = clusterBackend.getDriverAttributes()
      println(s"Driver Container Logs: $driverLogs")
      println(s"Driver Container Attributes: $driverAttrs")
    case _ => // Client mode
  }
}

Error Handling and Diagnostics

Resource Allocation Failures

import org.apache.spark.SparkConf

// Configure retry and failure handling for resource allocation
val conf = new SparkConf()
  .setMaster("yarn")
  .setAppName("AllocationFailureHandling")
  // Retry configuration
  .set("spark.yarn.maxAppAttempts", "3")
  .set("spark.yarn.am.attemptFailuresValidityInterval", "1h")
  .set("spark.yarn.executor.failuresValidityInterval", "1h")
  .set("spark.yarn.max.executor.failures", "5")
  // Resource allocation timeouts
  .set("spark.yarn.scheduler.heartbeat.interval-ms", "3000")
  .set("spark.yarn.scheduler.initial-allocation.interval", "200ms")
  .set("spark.yarn.containerLauncherMaxThreads", "25")
  // Enable detailed logging
  .set("spark.yarn.am.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps")
  .set("spark.yarn.executor.extraJavaOptions", "-XX:+PrintGCDetails")

Resource Request Debugging

// Enable detailed logging for resource allocation debugging
import org.apache.spark.SparkConf

val conf = new SparkConf()
  .setMaster("yarn")
  .setAppName("ResourceDebuggingExample")
  // Enable debug logging
  .set("spark.yarn.am.extraJavaOptions", 
    "-Dlog4j.configuration=yarn-log4j.properties " +
    "-Dyarn.app.container.log.dir=/tmp/yarn-logs " +
    "-Dyarn.app.container.log.filesize=100MB")
  // Resource request diagnostics
  .set("spark.yarn.preserve.staging.files", "true")  // Keep staging files for debugging
)set("spark.yarn.submit.file.replication", "3")  // Ensure file availability

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-yarn-2-12

docs

application-management.md

configuration.md

index.md

resource-management.md

scheduler-integration.md

tile.json