Apache Spark YARN resource manager integration module that enables Spark applications to run on YARN clusters
—
Integration components that connect Spark's task scheduling system with YARN's resource management. These classes provide cluster manager implementation and scheduler backends for both client and cluster deployment modes.
Cluster manager implementation that integrates Spark with YARN. Registered as an external cluster manager for "yarn" master URLs.
/**
* Cluster manager implementation for YARN integration
* Implements ExternalClusterManager interface
*/
private[spark] class YarnClusterManager extends ExternalClusterManager {
/**
* Check if this manager can create components for the given master URL
* @param masterURL Master URL (should be "yarn")
* @return true if masterURL is "yarn"
*/
def canCreate(masterURL: String): Boolean
/**
* Create YARN task scheduler
* @param sc SparkContext
* @param masterURL Master URL
* @return TaskScheduler instance
*/
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
/**
* Create YARN scheduler backend
* @param sc SparkContext
* @param masterURL Master URL
* @param scheduler TaskScheduler instance
* @return SchedulerBackend instance
*/
def createSchedulerBackend(
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler
): SchedulerBackend
/**
* Initialize scheduler components
* @param scheduler TaskScheduler to initialize
* @param backend SchedulerBackend to initialize
*/
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}Usage Example:
import org.apache.spark.{SparkConf, SparkContext}
// YarnClusterManager is automatically used when master is "yarn"
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("YarnIntegrationExample")
val sc = new SparkContext(conf)
// YarnClusterManager creates appropriate scheduler and backend automaticallyBase class for YARN scheduler backends that handle resource requests and executor management.
/**
* Abstract base class for YARN scheduler backends
* @param scheduler TaskScheduler implementation
* @param sc SparkContext
*/
private[spark] abstract class YarnSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
/**
* Bind scheduler backend to YARN application
* @param appId YARN application ID
* @param attemptId Optional application attempt ID
*/
protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit
/** Get YARN application ID as string */
def applicationId(): String
/** Get application attempt ID as string */
def applicationAttemptId(): Option[String]
/** Request total executors from YARN */
def doRequestTotalExecutors(): Future[Boolean]
/**
* Kill specific executors
* @param executorIds Sequence of executor IDs to kill
* @return Future indicating success/failure
*/
def doKillExecutors(executorIds: Seq[String]): Future[Boolean]
/** Get shuffle service merger locations */
def getShufflePushMergerLocations(): Seq[BlockManagerId]
}Scheduler backend for yarn-client mode where driver runs locally and executors run on YARN.
/**
* Scheduler backend for yarn-client mode
* @param scheduler TaskScheduler implementation
* @param sc SparkContext
*/
private[spark] class YarnClientSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) {
/** Start client mode scheduler backend */
def start(): Unit
/**
* Stop scheduler backend
* @param exitCode Exit code for cleanup
*/
def stop(exitCode: Int): Unit
}Usage Example:
import org.apache.spark.{SparkConf, SparkContext}
// Client mode - driver runs locally, executors on YARN
val conf = new SparkConf()
.setMaster("yarn")
.setDeployMode("client") // or spark.submit.deployMode=client
.setAppName("ClientModeApp")
val sc = new SparkContext(conf) // Uses YarnClientSchedulerBackend automaticallyScheduler backend for yarn-cluster mode where both driver and executors run on YARN.
/**
* Scheduler backend for yarn-cluster mode
* @param scheduler TaskScheduler implementation
* @param sc SparkContext
*/
private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) {
/** Start cluster mode scheduler backend */
def start(): Unit
/**
* Stop scheduler backend
* @param exitCode Exit code for cleanup
*/
def stop(exitCode: Int): Unit
/** Get driver container log URLs */
def getDriverLogUrls(): Option[Map[String, String]]
/** Get driver container attributes */
def getDriverAttributes(): Option[Map[String, String]]
}Usage Example:
import org.apache.spark.{SparkConf, SparkContext}
// Cluster mode - both driver and executors on YARN
val conf = new SparkConf()
.setMaster("yarn")
.setDeployMode("cluster") // or spark.submit.deployMode=cluster
.setAppName("ClusterModeApp")
val sc = new SparkContext(conf) // Uses YarnClusterSchedulerBackend automaticallyTask scheduler with YARN-specific enhancements for rack awareness and locality optimization.
/**
* Task scheduler with YARN-specific enhancements
* @param sc SparkContext
*/
class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl {
/**
* Get rack information for hosts
* @param hostPorts Sequence of host:port strings
* @return Sequence of optional rack names
*/
def getRacksForHosts(hostPorts: Seq[String]): Seq[Option[String]]
}Task scheduler specifically for yarn-cluster mode with additional initialization hooks.
/**
* Task scheduler for yarn-cluster mode
* @param sc SparkContext
*/
class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler {
/** Post-start initialization hook */
def postStartHook(): Unit
}The YARN integration automatically selects the appropriate scheduler backend based on deployment mode:
import org.apache.spark.{SparkConf, SparkContext}
// Client mode (default)
val clientConf = new SparkConf()
.setMaster("yarn")
.setAppName("AutoClientMode")
// Creates: YarnScheduler + YarnClientSchedulerBackend
// Cluster mode
val clusterConf = new SparkConf()
.setMaster("yarn")
.setDeployMode("cluster")
.setAppName("AutoClusterMode")
// Creates: YarnClusterScheduler + YarnClusterSchedulerBackendimport org.apache.spark.SparkConf
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("CustomSchedulerConfig")
// Scheduler-specific configurations
.set("spark.scheduler.mode", "FAIR")
.set("spark.scheduler.allocation.file", "/path/to/fairscheduler.xml")
.set("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
.set("spark.scheduler.minRegisteredResourcesRatio", "0.8")// Configure executor resources
val conf = new SparkConf()
.setMaster("yarn")
.set("spark.executor.instances", "10")
.set("spark.executor.cores", "2")
.set("spark.executor.memory", "4g")
.set("spark.executor.memoryOverhead", "512m")
// Dynamic allocation
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", "2")
.set("spark.dynamicAllocation.maxExecutors", "20")
.set("spark.dynamicAllocation.initialExecutors", "5")import org.apache.spark.scheduler.cluster.YarnScheduler
// YarnScheduler automatically provides rack awareness
val scheduler = new YarnScheduler(sparkContext)
// Get rack information for data locality optimization
val hosts = Seq("worker1:7337", "worker2:7337", "worker3:7337")
val racks = scheduler.getRacksForHosts(hosts)
// Returns rack information if available from YARN
// Configure locality preferences
val conf = new SparkConf()
.set("spark.locality.wait", "3s")
.set("spark.locality.wait.node", "0")
.set("spark.locality.wait.rack", "0")// The lifecycle is typically managed automatically, but understanding the flow:
// 1. YarnClusterManager.createSchedulerBackend() creates appropriate backend
// 2. Backend.start() initializes communication with YARN
// 3. Backend.bindToYarn() connects to YARN application
// 4. Backend handles executor requests via doRequestTotalExecutors()
// 5. Backend.stop() cleans up resources
// For custom integration:
class CustomYarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
extends YarnSchedulerBackend(scheduler, sc) {
override def start(): Unit = {
super.start()
// Custom initialization logic
}
override def doRequestTotalExecutors(): Future[Boolean] = {
// Custom executor request logic
super.doRequestTotalExecutors()
}
}// Monitor scheduler backend status
val backend = sparkContext.schedulerBackend.asInstanceOf[YarnSchedulerBackend]
// Get YARN application information
val appId = backend.applicationId()
val attemptId = backend.applicationAttemptId()
println(s"Application ID: $appId")
println(s"Attempt ID: $attemptId")
// For cluster mode, get driver information
backend match {
case clusterBackend: YarnClusterSchedulerBackend =>
val driverLogs = clusterBackend.getDriverLogUrls()
val driverAttrs = clusterBackend.getDriverAttributes()
println(s"Driver logs: $driverLogs")
println(s"Driver attributes: $driverAttrs")
case _ => // Client mode
}import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// Request additional executors
val backend = sparkContext.schedulerBackend.asInstanceOf[YarnSchedulerBackend]
val requestFuture: Future[Boolean] = backend.doRequestTotalExecutors()
requestFuture.onComplete {
case Success(true) => println("Executor request successful")
case Success(false) => println("Executor request failed")
case Failure(exception) => println(s"Request failed with exception: $exception")
}
// Kill specific executors
val executorsToKill = Seq("executor-1", "executor-2")
val killFuture: Future[Boolean] = backend.doKillExecutors(executorsToKill)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-yarn-2-12