Apache Spark's integration with Hadoop YARN cluster manager for running Spark applications on YARN clusters
—
The YARN Client provides the main entry point for submitting Spark applications to YARN clusters. It handles application packaging, resource allocation requests, and deployment preparation for both alpha and stable Hadoop versions.
import org.apache.spark.deploy.yarn._
import org.apache.spark.{Logging, SparkConf}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.client.api.YarnClientMain entry point for YARN client operations that handles command-line submission of Spark applications to YARN clusters.
/**
* Entry point for YARN client operations
* Handles command-line arguments and submits applications to YARN
*/
object Client {
def main(argStrings: Array[String]): Unit
}Usage Example:
// Programmatic usage (internal to Spark)
Client.main(Array(
"--jar", "myapp.jar",
"--class", "com.example.MyApp",
"--arg", "input.txt",
"--arg", "output.txt",
"--num-executors", "4",
"--executor-memory", "2g"
))The client is typically invoked through spark-submit rather than directly:
# Client automatically invoked by spark-submit
spark-submit --master yarn-client \
--num-executors 4 \
--executor-memory 2g \
--class com.example.MyApp \
myapp.jar input.txt output.txtThe actual client implementation varies by Hadoop version, with both alpha and stable versions providing the same interface.
/**
* YARN client implementation for Hadoop 2.2+
* Handles application submission to stable YARN API
*/
private[spark] class Client(
val args: ClientArguments,
val hadoopConf: Configuration,
val sparkConf: SparkConf
) extends ClientBase with Logging {
def this(clientArgs: ClientArguments, spConf: SparkConf)
def this(clientArgs: ClientArguments)
val yarnClient: YarnClient
val yarnConf: YarnConfiguration
def stop(): Unit
}/**
* YARN client implementation for Hadoop 0.23 and 2.0.x
* Handles application submission to alpha YARN API
*/
private[spark] class Client(
val args: ClientArguments,
val hadoopConf: Configuration,
val sparkConf: SparkConf
) extends ClientBase with Logging {
def this(clientArgs: ClientArguments, spConf: SparkConf)
def this(clientArgs: ClientArguments)
val yarnClient: YarnClient
val yarnConf: YarnConfiguration
def stop(): Unit
}Base trait providing common functionality for YARN client implementations across different Hadoop versions.
/**
* Base trait for YARN client implementations
* Provides common functionality for application submission
*/
private[spark] trait ClientBase extends Logging {
// Implementation details are internal to Spark
// Main functionality accessed through Client objects
}
private[spark] object ClientBase extends Logging {
// Constants and utility methods for client operations
}Argument parsing and configuration management for YARN client operations.
/**
* Parses and manages command-line arguments for YARN client
*/
private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
var addJars: String
var files: String
var archives: String
var userJar: String
var userClass: String
var userArgs: Array[String]
var executorMemory: Int
var executorCores: Int
var numExecutors: Int
var amQueue: String
var amMemory: Int
var amCores: Int
var appName: String
}
object ClientArguments {
// Utility methods for argument parsing
}Key Arguments:
--jar / --user-jar: User application JAR file--class / --user-class: Main class to execute--arg: Application arguments (can be specified multiple times)--num-executors: Number of executor instances--executor-memory: Memory per executor--executor-cores: CPU cores per executor--queue: YARN queue for resource allocation--name: Application nameManages file distribution and caching for YARN applications, handling JARs, files, and archives.
/**
* Manages distributed cache for YARN applications
* Handles JAR files, additional files, and archives
*/
private[spark] class ClientDistributedCacheManager() extends Logging {
// Internal implementation for managing distributed files
// Automatically handles file staging and cache management
}Most common usage through spark-submit:
# Client mode - driver runs locally
spark-submit \
--master yarn-client \
--deploy-mode client \
--num-executors 4 \
--executor-memory 2g \
--executor-cores 2 \
--queue production \
--class com.example.MySparkApp \
myapp.jar arg1 arg2
# Cluster mode - driver runs in YARN
spark-submit \
--master yarn-cluster \
--deploy-mode cluster \
--num-executors 8 \
--executor-memory 4g \
--executor-cores 2 \
--queue production \
--class com.example.MySparkApp \
myapp.jar arg1 arg2When integrating YARN submission into applications:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.yarn.ClientArguments
// Configure for YARN client mode
val conf = new SparkConf()
.setMaster("yarn-client")
.setAppName("MySparkApplication")
.set("spark.executor.instances", "4")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
val sc = new SparkContext(conf)YARN-specific resource configuration:
val conf = new SparkConf()
.set("spark.yarn.max.executor.failures", "6") // Allow 6 executor failures
.set("spark.yarn.am.memory", "1g") // Application Master memory
.set("spark.yarn.am.cores", "1") // Application Master CPU cores
.set("spark.yarn.queue", "production") // YARN queue name
.set("spark.yarn.submit.file.replication", "3") // HDFS replication for app filesThe client handles various failure scenarios:
The YARN client integrates with:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--yarn-parent-2-11