CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--yarn-parent-2-11

Apache Spark's integration with Hadoop YARN cluster manager for running Spark applications on YARN clusters

Pending
Overview
Eval results
Files

schedulers.mddocs/

Scheduler Integration

The YARN module provides specialized scheduler implementations that integrate Spark's task scheduling with YARN's resource management. These schedulers handle resource allocation, task placement, and coordination between Spark's execution engine and YARN's cluster management.

Imports

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.hadoop.yarn.api.records.ApplicationId

Capabilities

YARN Cluster Scheduler

Scheduler implementation for YARN cluster mode where the driver runs within the YARN cluster as part of the Application Master.

/**
 * Task scheduler for YARN cluster mode
 * Integrates with Application Master for resource management
 */
private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
  
  /**
   * Gets rack information for a given host
   * @param hostPort Host and port string (e.g., "host1:8080")
   * @return Optional rack information for the host
   */
  override def getRackForHost(hostPort: String): Option[String]
  
  /**
   * Post-start initialization hook
   * Notifies Application Master that SparkContext is ready
   */
  override def postStartHook(): Unit
  
  /**
   * Stops the scheduler and notifies Application Master
   */
  override def stop(): Unit
}

Usage Example:

// Automatically created when using yarn-cluster mode
val conf = new SparkConf().setMaster("yarn-cluster")
val sc = new SparkContext(conf)
// YarnClusterScheduler is automatically instantiated

YARN Client Scheduler

Scheduler implementation for YARN client mode where the driver runs outside the YARN cluster on the client machine.

/**
 * Task scheduler for YARN client mode
 * Handles scheduling when driver runs outside YARN cluster
 */
private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
  
  /**
   * Gets rack information for a given host
   * @param hostPort Host and port string (e.g., "host1:8080")  
   * @return Optional rack information for the host
   */
  override def getRackForHost(hostPort: String): Option[String]
}

Usage Example:

// Automatically created when using yarn-client mode
val conf = new SparkConf().setMaster("yarn-client")
val sc = new SparkContext(conf)
// YarnClientClusterScheduler is automatically instantiated

YARN Scheduler Backend Base

Base class for YARN scheduler backends providing common functionality.

/**
 * Base scheduler backend for YARN implementations
 * Provides common functionality for cluster and client modes
 */
private[spark] abstract class YarnSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
  
  /**
   * Gets the YARN application ID
   * @return Application ID string
   */
  def applicationId(): String
  
  /**
   * Gets the application attempt ID
   * @return Application attempt ID
   */  
  def applicationAttemptId(): Option[String]
}

YARN Cluster Scheduler Backend

Scheduler backend that manages communication between the Spark scheduler and YARN resources in cluster mode.

/**
 * Scheduler backend for YARN cluster mode
 * Manages executor lifecycle and communication
 */
private[spark] class YarnClusterSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) {
  
  /**
   * Starts the scheduler backend
   */
  override def start(): Unit
  
  /**
   * Gets the YARN application ID from SparkConf
   * @return Application ID string
   */
  override def applicationId(): String
}

YARN Client Scheduler Backend

Scheduler backend for client mode that coordinates between the external driver and YARN-managed executors.

/**
 * Scheduler backend for YARN client mode
 * Coordinates between external driver and YARN executors
 */
private[spark] class YarnClientSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) {
  
  /**
   * Starts the scheduler backend and submits application to YARN
   */
  override def start(): Unit
  
  /**
   * Stops the scheduler backend and cleans up resources
   */
  override def stop(): Unit
  
  /**
   * Gets the YARN application ID
   * @return Application ID string
   */
  override def applicationId(): String
}

Resource Management Integration

Rack Awareness

Both scheduler implementations provide rack awareness for optimal task placement:

// Rack lookup using YARN's topology information
override def getRackForHost(hostPort: String): Option[String] = {
  val host = Utils.parseHostPort(hostPort)._1
  Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
}

Benefits:

  • Data Locality: Place tasks close to data when possible
  • Network Efficiency: Minimize cross-rack network traffic
  • Fault Tolerance: Distribute tasks across racks for resilience

Resource Allocation Coordination

The schedulers coordinate with YARN for dynamic resource management:

  • Executor Requests: Communicate resource needs to Application Master
  • Container Allocation: Handle YARN container assignments
  • Dynamic Scaling: Support for adding/removing executors based on workload
  • Resource Constraints: Respect YARN queue limits and cluster capacity

Scheduler Lifecycle

Initialization Sequence

  1. Scheduler Creation: Automatically instantiated based on master URL
  2. Backend Setup: Create appropriate scheduler backend for the mode
  3. Resource Discovery: Initialize rack topology and cluster information
  4. Registration: Register with Application Master (cluster mode) or start AM (client mode)

Cluster Mode Lifecycle

// 1. Scheduler initialization
val scheduler = new YarnClusterScheduler(sparkContext)

// 2. Post-start hook execution
scheduler.postStartHook()
// Calls: ApplicationMaster.sparkContextInitialized(sc)

// 3. Task scheduling and execution
// Normal Spark task scheduling operations

// 4. Shutdown
scheduler.stop()
// Calls: ApplicationMaster.sparkContextStopped(sc)

Client Mode Lifecycle

// 1. Scheduler initialization  
val scheduler = new YarnClientClusterScheduler(sparkContext)

// 2. Application Master communication setup
// Backend establishes communication with separate AM process

// 3. Task scheduling and execution
// Tasks executed on YARN-managed executors

// 4. Shutdown
scheduler.stop()
// Cleanup and AM notification

Configuration Integration

Scheduler Configuration

Key configuration properties affecting scheduler behavior:

val conf = new SparkConf()
  .set("spark.scheduler.mode", "FAIR")              // FAIR or FIFO scheduling
  .set("spark.scheduler.allocation.file", "pools.xml") // Fair scheduler pools
  .set("spark.locality.wait", "3s")                // Data locality wait time
  .set("spark.locality.wait.rack", "0")            // Rack locality wait time
  .set("spark.task.maxFailures", "3")              // Task failure retry limit

YARN-Specific Configuration

Configuration properties specific to YARN scheduler integration:

val conf = new SparkConf()
  .set("spark.yarn.scheduler.heartbeat.interval-ms", "3000") // Heartbeat interval
  .set("spark.yarn.scheduler.initial-allocation.interval", "200ms") // Initial allocation
  .set("spark.yarn.max.executor.failures", "6")    // Max executor failures
  .set("spark.yarn.am.waitTime", "100s")           // AM wait time for SparkContext

Task Placement and Locality

Data Locality Optimization

The schedulers optimize task placement for data locality:

  1. Node Local: Task runs on same node as data
  2. Rack Local: Task runs on same rack as data
  3. Any: Task can run anywhere in cluster
// Locality preference handling
def getRackForHost(hostPort: String): Option[String] = {
  val host = Utils.parseHostPort(hostPort)._1
  // Use YARN's rack resolution
  Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
}

Resource Preference

Task placement considers:

  • Memory Requirements: Match tasks to appropriately sized executors
  • CPU Requirements: Consider CPU core availability
  • Network Topology: Minimize data movement across network
  • Load Balancing: Distribute work evenly across cluster

Error Handling and Recovery

Executor Failure Handling

The schedulers handle various executor failure scenarios:

// Executor failure detection and recovery
// - Automatic task retry on remaining executors
// - Executor replacement through Application Master
// - Blacklisting of problematic nodes
// - Application failure if too many executors fail

Resource Unavailability

Handling resource constraints:

  • Queue Full: Wait for resources to become available
  • Insufficient Memory: Graceful degradation or failure
  • Network Partitions: Timeout and retry mechanisms
  • Node Failures: Task redistribution to healthy nodes

Application Master Communication

In client mode, robust communication with Application Master:

  • Heartbeat Monitoring: Regular health checks
  • Message Retry: Retry failed communications
  • Connection Recovery: Re-establish lost connections
  • Failover: Handle Application Master restarts

Integration with Spark Components

TaskScheduler Integration

The YARN schedulers extend Spark's base TaskSchedulerImpl:

  • Task Submission: Receive tasks from DAGScheduler
  • Resource Matching: Match tasks to available executors
  • Task Launch: Send tasks to appropriate executors
  • Result Collection: Gather task results and metrics

Driver Integration

Coordination with Spark driver:

  • Task Graph: Receive task execution plans
  • Status Updates: Report task and executor status
  • Metrics Collection: Aggregate performance metrics
  • Event Handling: Process Spark events and lifecycle changes

Storage Integration

Integration with Spark's storage layer:

  • Block Manager: Coordinate with block managers on executors
  • RDD Caching: Optimize placement for cached RDDs
  • Shuffle Management: Coordinate shuffle operations
  • Broadcast Variables: Efficient distribution of broadcast data

Monitoring and Metrics

Scheduler Metrics

Key metrics tracked by YARN schedulers:

  • Task Completion Rate: Tasks completed per second
  • Resource Utilization: CPU and memory usage across cluster
  • Locality Statistics: Data locality hit rates
  • Failure Rates: Task and executor failure frequencies

Integration with Spark UI

The schedulers integrate with Spark's web UI:

  • Executor Information: Current executor status and resources
  • Task Details: Task execution times and locality
  • Resource Usage: Memory and CPU utilization graphs
  • Error Reporting: Failed task and executor information

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--yarn-parent-2-11

docs

application-master.md

client.md

hadoop-utils.md

index.md

schedulers.md

tile.json