Core cluster manager integration that enables Spark to run on YARN clusters through the external cluster manager SPI. This module handles the lifecycle of YARN-based Spark applications and provides appropriate schedulers and backends for different deployment modes.
Main entry point for YARN cluster management, registered as an ExternalClusterManager service provider. Automatically activated when master = "yarn" is specified in SparkConf.
class YarnClusterManager extends ExternalClusterManager {
def canCreate(masterURL: String): Boolean
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}Parameters:
masterURL: Must be "yarn" for this cluster manager to be selectedsc: SparkContext instance for the applicationscheduler: TaskScheduler instance to be initializedbackend: SchedulerBackend instance to be initializedUsage Example:
import org.apache.spark.{SparkConf, SparkContext}
// YarnClusterManager is automatically selected when master is "yarn"
val conf = new SparkConf()
.setAppName("MyApp")
.setMaster("yarn") // This triggers YarnClusterManager selection
val sc = new SparkContext(conf)YARN-specific task schedulers that provide rack awareness and optimal task placement within YARN clusters.
class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
override def getRackForHost(hostPort: String): Option[String]
}
class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc)YarnScheduler:
YarnClusterScheduler:
Usage Example:
// Schedulers are created automatically based on deploy mode
val conf = new SparkConf()
.setMaster("yarn")
.set("spark.submit.deployMode", "client") // Uses YarnScheduler
// .set("spark.submit.deployMode", "cluster") // Uses YarnClusterSchedulerYARN-specific scheduler backends that manage the communication between Spark and YARN ResourceManager.
abstract class YarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit
override def start(): Unit
override def stop(): Unit
override def minRegisteredRatio: Double // Returns 0.8
}Common Methods:
bindToYarn: Associates backend with YARN application and attempt IDsstart(): Initializes the backend and begins resource managementstop(): Cleanly shuts down the backend and releases resourcesminRegisteredRatio: Returns 0.8 (80%) minimum executor registration ratio for YARNclass YarnClientSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
extends YarnSchedulerBackend(scheduler, sc) {
override def start(): Unit
def waitForApplication(): Unit
override def stop(): Unit
}Client Mode Specific:
start(): Creates and submits YARN application via ClientwaitForApplication(): Blocks until application reaches RUNNING statespark.submit.deployMode = "client"Usage Example:
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf()
.setMaster("yarn")
.set("spark.submit.deployMode", "client")
.set("spark.yarn.queue", "default")
// YarnClientSchedulerBackend is created automatically
val sc = new SparkContext(conf)class YarnClusterSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
extends YarnSchedulerBackend(scheduler, sc) {
override def start(): Unit
def getDriverLogUrls: Option[Map[String, String]]
override def stop(): Unit
}Cluster Mode Specific:
start(): Binds to existing YARN application (running inside ApplicationMaster)getDriverLogUrls: Returns driver log URLs from YARN for monitoringspark.submit.deployMode = "cluster"Usage Example:
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf()
.setMaster("yarn")
.set("spark.submit.deployMode", "cluster")
.set("spark.yarn.queue", "production")
// YarnClusterSchedulerBackend is created automatically
val sc = new SparkContext(conf)The YARN cluster manager is automatically registered through Java's ServiceLoader mechanism:
META-INF/services/org.apache.spark.scheduler.ExternalClusterManager:
org.apache.spark.scheduler.cluster.YarnClusterManagerThis enables automatic discovery when master = "yarn" without requiring explicit class registration.
| Component | Client Mode | Cluster Mode |
|---|---|---|
| TaskScheduler | YarnScheduler | YarnClusterScheduler |
| SchedulerBackend | YarnClientSchedulerBackend | YarnClusterSchedulerBackend |
| Driver Location | Client machine | YARN ApplicationMaster |
| Application Submission | Client submits to YARN | Pre-submitted by spark-submit |
Common exceptions in cluster management:
// Unsupported deploy mode
throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
// Backend initialization failure
throw new SparkException("Failed to initialize YARN scheduler backend")spark.submit.deployMode and spark.master settings