Apache Spark's integration with Hadoop YARN cluster manager for running Spark applications on YARN clusters
npx @tessl/cli install tessl/maven-org-apache-spark--yarn-parent-2-11@1.2.0Apache Spark's YARN integration module enables Spark applications to run on Hadoop YARN (Yet Another Resource Negotiator) clusters alongside other distributed computing frameworks. This module provides an Application Master implementation that manages Spark driver and executor processes within YARN containers, handles resource allocation and deallocation through YARN's ResourceManager, and supports both client and cluster deployment modes.
import org.apache.spark.deploy.yarn._
import org.apache.spark.scheduler.cluster._For basic usage:
import org.apache.spark.{SparkConf, SparkContext}The YARN integration is typically used by setting the Spark master URL and submitting applications through the YARN client:
import org.apache.spark.{SparkConf, SparkContext}
// Set master to YARN mode
val sparkConf = new SparkConf()
.setMaster("yarn-client") // or "yarn-cluster"
.setAppName("MySparkApp")
val sparkContext = new SparkContext(sparkConf)For command-line submission:
# Client mode - driver runs on client machine
spark-submit --master yarn-client --num-executors 4 myapp.jar
# Cluster mode - driver runs in YARN container
spark-submit --master yarn-cluster --num-executors 4 myapp.jarApache Spark YARN integration is built around several key components:
Core functionality for submitting Spark applications to YARN clusters, handling resource allocation requests, and managing application lifecycle.
// Main entry points for YARN submission
object Client {
def main(argStrings: Array[String]): Unit
}
object ApplicationMaster {
def main(args: Array[String]): Unit
}Application Master implementation that manages Spark applications within YARN containers, including driver execution and executor coordination.
private[spark] class ApplicationMaster(
args: ApplicationMasterArguments,
client: YarnRMClient
) extends Logging {
def run(): Int
}
object ApplicationMaster {
private[spark] def sparkContextInitialized(sc: SparkContext): Unit
private[spark] def sparkContextStopped(sc: SparkContext): Unit
}Scheduler implementations that integrate Spark's task scheduling with YARN's resource management, supporting both client and cluster deployment modes.
private[spark] abstract class YarnSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
override def getRackForHost(hostPort: String): Option[String]
override def postStartHook(): Unit
override def stop(): Unit
}
private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc)
private[spark] class YarnClientSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) {
override def start(): Unit
override def stop(): Unit
override def applicationId(): String
}
private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) {
override def start(): Unit
override def applicationId(): String
}YARN-specific utilities for Hadoop integration, security, and environment management that extend Spark's base Hadoop utilities.
class YarnSparkHadoopUtil extends SparkHadoopUtil {
override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation): Unit
override def isYarnMode(): Boolean
override def newConfiguration(conf: SparkConf): Configuration
override def addCredentials(conf: JobConf): Unit
override def getCurrentUserCredentials(): Credentials
override def addCurrentUserCredentials(creds: Credentials): Unit
override def addSecretKeyToUserCredentials(key: String, secret: String): Unit
override def getSecretKeyFromUserCredentials(key: String): Array[Byte]
}
object YarnSparkHadoopUtil {
def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit
def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit
def escapeForShell(arg: String): String
def lookupRack(conf: Configuration, host: String): String
def populateRackInfo(conf: Configuration, hostname: String): Unit
def getApplicationAclsForYarn(securityMgr: SecurityManager): Map[ApplicationAccessType, String]
}// Core YARN types
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ApplicationAttemptId, ApplicationId, LocalResource}
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils
// YARN-specific classes
private[spark] class ApplicationMasterArguments(args: Array[String]) {
var userClass: String = null
var userJar: String = null
var userArgs: Array[String] = Array.empty
var numExecutors: Int = 2
var executorMemory: Int = 1024
var executorCores: Int = 1
var amMemory: Int = 512
var amCores: Int = 1
}
private[spark] trait YarnRMClient {
def getAttemptId(): ApplicationAttemptId
def getMaxRegAttempts(conf: YarnConfiguration, sparkConf: SparkConf): Int
def register(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: String,
uiHistoryAddress: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
): YarnAllocator
def shutdown(): Unit
}
private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
var addJars: String = null
var files: String = null
var archives: String = null
var userJar: String = null
var userClass: String = null
var userArgs: Array[String] = Array.empty
var executorMemory: Int = 1024
var executorCores: Int = 1
var numExecutors: Int = 2
var amQueue: String = "default"
var amMemory: Int = 512
var amCores: Int = 1
var appName: String = "Spark"
val amMemoryOverhead: Int = 384
val executorMemoryOverhead: Int = 384
}
// Collection types for utilities
import scala.collection.mutable.HashMap
import scala.collection.MapKey configuration properties for YARN integration:
spark.yarn.max.executor.failures - Maximum number of executor failures before failing the applicationspark.yarn.max.worker.failures - (Deprecated) Same as above for backward compatibilityspark.yarn.app.id - Application ID set by the Application Masterspark.ui.port - Set to "0" for ephemeral port allocation in YARN modeyarn-client)yarn-cluster)The YARN module supports multiple Hadoop versions through separate implementations:
Both implementations provide the same API surface but use different versions of the underlying Hadoop YARN API.