Apache Spark's integration with Hadoop YARN cluster manager for running Spark applications on YARN clusters
—
The YARN module provides specialized scheduler implementations that integrate Spark's task scheduling with YARN's resource management. These schedulers handle resource allocation, task placement, and coordination between Spark's execution engine and YARN's cluster management.
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.hadoop.yarn.api.records.ApplicationIdScheduler implementation for YARN cluster mode where the driver runs within the YARN cluster as part of the Application Master.
/**
* Task scheduler for YARN cluster mode
* Integrates with Application Master for resource management
*/
private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
/**
* Gets rack information for a given host
* @param hostPort Host and port string (e.g., "host1:8080")
* @return Optional rack information for the host
*/
override def getRackForHost(hostPort: String): Option[String]
/**
* Post-start initialization hook
* Notifies Application Master that SparkContext is ready
*/
override def postStartHook(): Unit
/**
* Stops the scheduler and notifies Application Master
*/
override def stop(): Unit
}Usage Example:
// Automatically created when using yarn-cluster mode
val conf = new SparkConf().setMaster("yarn-cluster")
val sc = new SparkContext(conf)
// YarnClusterScheduler is automatically instantiatedScheduler implementation for YARN client mode where the driver runs outside the YARN cluster on the client machine.
/**
* Task scheduler for YARN client mode
* Handles scheduling when driver runs outside YARN cluster
*/
private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
/**
* Gets rack information for a given host
* @param hostPort Host and port string (e.g., "host1:8080")
* @return Optional rack information for the host
*/
override def getRackForHost(hostPort: String): Option[String]
}Usage Example:
// Automatically created when using yarn-client mode
val conf = new SparkConf().setMaster("yarn-client")
val sc = new SparkContext(conf)
// YarnClientClusterScheduler is automatically instantiatedBase class for YARN scheduler backends providing common functionality.
/**
* Base scheduler backend for YARN implementations
* Provides common functionality for cluster and client modes
*/
private[spark] abstract class YarnSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
/**
* Gets the YARN application ID
* @return Application ID string
*/
def applicationId(): String
/**
* Gets the application attempt ID
* @return Application attempt ID
*/
def applicationAttemptId(): Option[String]
}Scheduler backend that manages communication between the Spark scheduler and YARN resources in cluster mode.
/**
* Scheduler backend for YARN cluster mode
* Manages executor lifecycle and communication
*/
private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) {
/**
* Starts the scheduler backend
*/
override def start(): Unit
/**
* Gets the YARN application ID from SparkConf
* @return Application ID string
*/
override def applicationId(): String
}Scheduler backend for client mode that coordinates between the external driver and YARN-managed executors.
/**
* Scheduler backend for YARN client mode
* Coordinates between external driver and YARN executors
*/
private[spark] class YarnClientSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) {
/**
* Starts the scheduler backend and submits application to YARN
*/
override def start(): Unit
/**
* Stops the scheduler backend and cleans up resources
*/
override def stop(): Unit
/**
* Gets the YARN application ID
* @return Application ID string
*/
override def applicationId(): String
}Both scheduler implementations provide rack awareness for optimal task placement:
// Rack lookup using YARN's topology information
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
}Benefits:
The schedulers coordinate with YARN for dynamic resource management:
// 1. Scheduler initialization
val scheduler = new YarnClusterScheduler(sparkContext)
// 2. Post-start hook execution
scheduler.postStartHook()
// Calls: ApplicationMaster.sparkContextInitialized(sc)
// 3. Task scheduling and execution
// Normal Spark task scheduling operations
// 4. Shutdown
scheduler.stop()
// Calls: ApplicationMaster.sparkContextStopped(sc)// 1. Scheduler initialization
val scheduler = new YarnClientClusterScheduler(sparkContext)
// 2. Application Master communication setup
// Backend establishes communication with separate AM process
// 3. Task scheduling and execution
// Tasks executed on YARN-managed executors
// 4. Shutdown
scheduler.stop()
// Cleanup and AM notificationKey configuration properties affecting scheduler behavior:
val conf = new SparkConf()
.set("spark.scheduler.mode", "FAIR") // FAIR or FIFO scheduling
.set("spark.scheduler.allocation.file", "pools.xml") // Fair scheduler pools
.set("spark.locality.wait", "3s") // Data locality wait time
.set("spark.locality.wait.rack", "0") // Rack locality wait time
.set("spark.task.maxFailures", "3") // Task failure retry limitConfiguration properties specific to YARN scheduler integration:
val conf = new SparkConf()
.set("spark.yarn.scheduler.heartbeat.interval-ms", "3000") // Heartbeat interval
.set("spark.yarn.scheduler.initial-allocation.interval", "200ms") // Initial allocation
.set("spark.yarn.max.executor.failures", "6") // Max executor failures
.set("spark.yarn.am.waitTime", "100s") // AM wait time for SparkContextThe schedulers optimize task placement for data locality:
// Locality preference handling
def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
// Use YARN's rack resolution
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
}Task placement considers:
The schedulers handle various executor failure scenarios:
// Executor failure detection and recovery
// - Automatic task retry on remaining executors
// - Executor replacement through Application Master
// - Blacklisting of problematic nodes
// - Application failure if too many executors failHandling resource constraints:
In client mode, robust communication with Application Master:
The YARN schedulers extend Spark's base TaskSchedulerImpl:
Coordination with Spark driver:
Integration with Spark's storage layer:
Key metrics tracked by YARN schedulers:
The schedulers integrate with Spark's web UI:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--yarn-parent-2-11