tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads
The YARN scheduler backends provide the core scheduling and executor management functionality for Spark applications running on YARN clusters. These components handle communication between the driver, ApplicationMaster, and executors, with different implementations for client and cluster deployment modes.
The abstract YarnSchedulerBackend class provides common functionality shared between client and cluster modes.
package org.apache.spark.scheduler.cluster
private[spark] abstract class YarnSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)protected var totalExpectedExecutors: Int = 0
protected val minRegisteredRatio: Double = 0.8 // Default minimum registration ratiodef doRequestTotalExecutors(requestedTotal: Int): BooleanRequests executors from the ApplicationMaster by specifying the total number desired, including executors already pending or running.
Parameters:
requestedTotal: Total number of executors desired (including pending/running)Returns: true if the request was submitted successfully
def doKillExecutors(executorIds: Seq[String]): BooleanRequests that the ApplicationMaster kill the specified executors.
Parameters:
executorIds: Sequence of executor IDs to terminateReturns: true if the kill request was submitted successfully
Example:
import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
// Request 10 total executors
val success = backend.doRequestTotalExecutors(10)
if (success) {
println("Executor request submitted successfully")
}
// Kill specific executors
val executorIds = Seq("executor-1", "executor-2")
val killed = backend.doKillExecutors(executorIds)def sufficientResourcesRegistered(): BooleanDetermines whether sufficient executors have been registered to start scheduling tasks.
Returns: true if registered executors meet the minimum threshold
def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpointCreates a YARN-specific driver endpoint that handles executor disconnections and communicates with the ApplicationMaster.
The YarnClientSchedulerBackend is used for client deployment mode where the driver runs outside the YARN cluster.
package org.apache.spark.scheduler.cluster
private[spark] class YarnClientSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) with Loggingprivate var client: Client = null
private var appId: ApplicationId = null
private var monitorThread: MonitorThread = nulldef start(): UnitCreates a YARN client to submit an application to the ResourceManager and waits until the application is running. This method:
Example:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend
val conf = new SparkConf()
.setAppName("MyYarnApp")
.set("spark.driver.host", "driver-host.example.com")
.set("spark.driver.port", "7077")
val sc = new SparkContext(conf)
val taskScheduler = new TaskSchedulerImpl(sc)
val backend = new YarnClientSchedulerBackend(taskScheduler, sc)
backend.start()
// Application is now running on YARNdef stop(): UnitStops the scheduler backend and cleans up resources. This method:
def applicationId(): StringReturns the YARN application ID as a string.
Example:
val appId = backend.applicationId()
println(s"Application ID: $appId")private class MonitorThread extends ThreadMonitors the application state and stops the SparkContext if the YARN application exits unexpectedly.
def stopMonitor(): UnitSafely stops the monitoring thread.
Example:
// The MonitorThread is managed internally by YarnClientSchedulerBackend
// It automatically monitors application state and handles cleanup
try {
backend.start()
// Application runs...
} catch {
case e: SparkException =>
println(s"Application failed: ${e.getMessage}")
} finally {
backend.stop() // Automatically stops monitor thread
}The YarnClusterSchedulerBackend is used for cluster deployment mode where the driver runs within the YARN cluster.
package org.apache.spark.scheduler.cluster
private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc)def start(): UnitInitializes the cluster mode scheduler backend and sets the initial target executor count.
def applicationId(): StringReturns the application ID from the Spark configuration. Logs an error if not set since it's expected to be available in cluster mode.
def applicationAttemptId(): Option[String]Returns the application attempt ID from the Spark configuration.
Example:
import org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
val backend = new YarnClusterSchedulerBackend(scheduler, sc)
backend.start()
val appId = backend.applicationId()
val attemptId = backend.applicationAttemptId()
println(s"Application: $appId, Attempt: ${attemptId.getOrElse("unknown")}")def getDriverLogUrls: Option[Map[String, String]]Retrieves the URLs for the driver logs in the YARN NodeManager web interface.
Returns: Map containing log URLs for "stdout" and "stderr", or None if unavailable
Example:
backend.getDriverLogUrls match {
case Some(logUrls) =>
logUrls.foreach { case (logType, url) =>
println(s"$logType logs: $url")
}
case None =>
println("Driver log URLs not available")
}The YarnScheduler class extends TaskSchedulerImpl with YARN-specific rack awareness functionality.
package org.apache.spark.scheduler.cluster
private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc)def getRackForHost(hostPort: String): Option[String]Resolves the rack location for a given host, enabling rack-aware task scheduling.
Parameters:
hostPort: Host and port string (e.g., "host.example.com:8080")Returns: The rack location as an Option[String], or None if unknown
Example:
import org.apache.spark.scheduler.cluster.YarnScheduler
val scheduler = new YarnScheduler(sc)
val rack = scheduler.getRackForHost("worker1.example.com:8080")
rack match {
case Some(rackId) => println(s"Host is in rack: $rackId")
case None => println("Rack information unavailable")
}The YarnClusterScheduler extends YarnScheduler with ApplicationMaster integration for cluster mode.
package org.apache.spark.scheduler.cluster
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc)def postStartHook(): UnitCalled after scheduler initialization to notify the ApplicationMaster that the SparkContext has been initialized.
def stop(): UnitStops the scheduler and notifies the ApplicationMaster that the SparkContext has been stopped.
Example:
import org.apache.spark.scheduler.cluster.YarnClusterScheduler
val scheduler = new YarnClusterScheduler(sc)
// postStartHook() called automatically during SparkContext initialization
// stop() called automatically during SparkContext shutdownimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.{YarnClientSchedulerBackend, YarnScheduler}
// Configure for client mode
val conf = new SparkConf()
.setAppName("ClientModeApp")
.setMaster("yarn-client")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
val sc = new SparkContext(conf)
val scheduler = new YarnScheduler(sc)
val backend = new YarnClientSchedulerBackend(scheduler, sc)
try {
scheduler.initialize(backend)
backend.start()
// Application logic
val rdd = sc.parallelize(1 to 1000)
println(s"Sum: ${rdd.sum()}")
} finally {
backend.stop()
sc.stop()
}import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.cluster.{YarnClusterSchedulerBackend, YarnClusterScheduler}
// Configure for cluster mode
val conf = new SparkConf()
.setAppName("ClusterModeApp")
.setMaster("yarn-cluster")
.set("spark.executor.memory", "4g")
.set("spark.executor.cores", "4")
val sc = new SparkContext(conf)
val scheduler = new YarnClusterScheduler(sc)
val backend = new YarnClusterSchedulerBackend(scheduler, sc)
try {
scheduler.initialize(backend)
backend.start()
// Application logic runs in cluster
val rdd = sc.textFile("hdfs://namenode:9000/input/data.txt")
val wordCounts = rdd.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
wordCounts.saveAsTextFile("hdfs://namenode:9000/output/results")
} finally {
backend.stop()
sc.stop()
}import org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend
// Request more executors dynamically
val backend: YarnClientSchedulerBackend = // ... initialized backend
// Scale up to 20 executors
if (backend.doRequestTotalExecutors(20)) {
println("Successfully requested 20 executors")
} else {
println("Failed to request executors")
}
// Scale down by killing specific executors
val executorsToKill = Seq("executor-15", "executor-16", "executor-17")
if (backend.doKillExecutors(executorsToKill)) {
println(s"Successfully requested to kill executors: ${executorsToKill.mkString(", ")}")
}
// Check if sufficient resources are available
if (backend.sufficientResourcesRegistered()) {
println("Sufficient executors registered, can start scheduling tasks")
} else {
println("Waiting for more executors to register...")
}import org.apache.spark.SparkException
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
try {
val backend = new YarnClientSchedulerBackend(scheduler, sc)
backend.start()
// Monitor application
val appId = backend.applicationId()
println(s"Monitoring application: $appId")
// Run application logic
// ...
} catch {
case e: SparkException =>
println(s"Spark application failed: ${e.getMessage}")
case e: ApplicationNotFoundException =>
println(s"YARN application not found: ${e.getMessage}")
case e: Exception =>
println(s"Unexpected error: ${e.getMessage}")
throw e
} finally {
if (backend != null) {
backend.stop()
}
}These scheduler backends provide the essential coordination layer between Spark and YARN, handling executor lifecycle management, resource allocation, and communication in both client and cluster deployment modes.