Apache Spark YARN resource manager integration component that enables Spark applications to run on Hadoop YARN clusters
Core cluster manager integration that enables Spark to run on YARN clusters through the external cluster manager SPI. This module handles the lifecycle of YARN-based Spark applications and provides appropriate schedulers and backends for different deployment modes.
Main entry point for YARN cluster management, registered as an ExternalClusterManager service provider. Automatically activated when master = "yarn" is specified in SparkConf.
class YarnClusterManager extends ExternalClusterManager {
def canCreate(masterURL: String): Boolean
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}Parameters:
masterURL: Must be "yarn" for this cluster manager to be selectedsc: SparkContext instance for the applicationscheduler: TaskScheduler instance to be initializedbackend: SchedulerBackend instance to be initializedUsage Example:
import org.apache.spark.{SparkConf, SparkContext}
// YarnClusterManager is automatically selected when master is "yarn"
val conf = new SparkConf()
.setAppName("MyApp")
.setMaster("yarn") // This triggers YarnClusterManager selection
val sc = new SparkContext(conf)YARN-specific task schedulers that provide rack awareness and optimal task placement within YARN clusters.
class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
override def getRackForHost(hostPort: String): Option[String]
}
class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc)YarnScheduler:
YarnClusterScheduler:
Usage Example:
// Schedulers are created automatically based on deploy mode
val conf = new SparkConf()
.setMaster("yarn")
.set("spark.submit.deployMode", "client") // Uses YarnScheduler
// .set("spark.submit.deployMode", "cluster") // Uses YarnClusterSchedulerYARN-specific scheduler backends that manage the communication between Spark and YARN ResourceManager.
abstract class YarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit
override def start(): Unit
override def stop(): Unit
override def minRegisteredRatio: Double // Returns 0.8
}Common Methods:
bindToYarn: Associates backend with YARN application and attempt IDsstart(): Initializes the backend and begins resource managementstop(): Cleanly shuts down the backend and releases resourcesminRegisteredRatio: Returns 0.8 (80%) minimum executor registration ratio for YARNclass YarnClientSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
extends YarnSchedulerBackend(scheduler, sc) {
override def start(): Unit
def waitForApplication(): Unit
override def stop(): Unit
}Client Mode Specific:
start(): Creates and submits YARN application via ClientwaitForApplication(): Blocks until application reaches RUNNING statespark.submit.deployMode = "client"Usage Example:
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf()
.setMaster("yarn")
.set("spark.submit.deployMode", "client")
.set("spark.yarn.queue", "default")
// YarnClientSchedulerBackend is created automatically
val sc = new SparkContext(conf)class YarnClusterSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
extends YarnSchedulerBackend(scheduler, sc) {
override def start(): Unit
def getDriverLogUrls: Option[Map[String, String]]
override def stop(): Unit
}Cluster Mode Specific:
start(): Binds to existing YARN application (running inside ApplicationMaster)getDriverLogUrls: Returns driver log URLs from YARN for monitoringspark.submit.deployMode = "cluster"Usage Example:
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf()
.setMaster("yarn")
.set("spark.submit.deployMode", "cluster")
.set("spark.yarn.queue", "production")
// YarnClusterSchedulerBackend is created automatically
val sc = new SparkContext(conf)The YARN cluster manager is automatically registered through Java's ServiceLoader mechanism:
META-INF/services/org.apache.spark.scheduler.ExternalClusterManager:
org.apache.spark.scheduler.cluster.YarnClusterManagerThis enables automatic discovery when master = "yarn" without requiring explicit class registration.
| Component | Client Mode | Cluster Mode |
|---|---|---|
| TaskScheduler | YarnScheduler | YarnClusterScheduler |
| SchedulerBackend | YarnClientSchedulerBackend | YarnClusterSchedulerBackend |
| Driver Location | Client machine | YARN ApplicationMaster |
| Application Submission | Client submits to YARN | Pre-submitted by spark-submit |
Common exceptions in cluster management:
// Unsupported deploy mode
throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
// Backend initialization failure
throw new SparkException("Failed to initialize YARN scheduler backend")spark.submit.deployMode and spark.master settingsInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-yarn-2-11