Apache Spark YARN resource manager integration module that enables Spark applications to run on YARN clusters
—
Comprehensive configuration system for YARN-specific settings including resource allocation, security, deployment options, and application parameters. Provides both programmatic configuration through SparkConf and command-line argument parsing.
The YARN module provides extensive configuration options through the config package object.
/**
* YARN-specific configuration keys and defaults
* All configuration properties are accessed through SparkConf
*/
package object config {
/** YARN application tags for resource management and monitoring */
val APPLICATION_TAGS: ConfigEntry[Set[String]]
/** Application priority in YARN queue (0-10, higher = more priority) */
val APPLICATION_PRIORITY: ConfigEntry[Int]
/** YARN queue name for application submission */
val QUEUE_NAME: ConfigEntry[String]
/** Maximum application attempts allowed by YARN */
val MAX_APP_ATTEMPTS: ConfigEntry[Int]
/** ApplicationMaster memory allocation */
val AM_MEMORY: ConfigEntry[Long]
/** ApplicationMaster CPU core allocation */
val AM_CORES: ConfigEntry[Int]
/** ApplicationMaster memory overhead for YARN container */
val AM_MEMORY_OVERHEAD: ConfigEntry[Long]
/** Node label expression for executor placement */
val EXECUTOR_NODE_LABEL_EXPRESSION: OptionalConfigEntry[String]
/** Node label expression for ApplicationMaster placement */
val AM_NODE_LABEL_EXPRESSION: OptionalConfigEntry[String]
/** Location of Spark archive file for distribution */
val SPARK_ARCHIVE: OptionalConfigEntry[String]
/** Whether to wait for application completion */
val WAIT_FOR_APP_COMPLETION: ConfigEntry[Boolean]
/** Kerberos keytab file location for security */
val KEYTAB: OptionalConfigEntry[String]
/** Kerberos principal name for authentication */
val PRINCIPAL: OptionalConfigEntry[String]
/** Staging directory for application files */
val STAGING_DIR: OptionalConfigEntry[String]
/** Executor failure validity interval */
val EXECUTOR_FAILURES_VALIDITY_INTERVAL: ConfigEntry[Long]
/** Maximum executor failures before application failure */
val MAX_EXECUTOR_FAILURES: ConfigEntry[Int]
/** ApplicationMaster attempt failures validity interval */
val AM_ATTEMPT_FAILURES_VALIDITY_INTERVAL: ConfigEntry[Long]
/** Container launcher maximum threads */
val CONTAINER_LAUNCHER_MAX_THREADS: ConfigEntry[Int]
/** Scheduler heartbeat interval */
val SCHEDULER_HEARTBEAT_INTERVAL: ConfigEntry[Long]
/** Initial allocation interval */
val SCHEDULER_INITIAL_ALLOCATION_INTERVAL: ConfigEntry[Long]
/** Whether to preserve staging files after completion */
val PRESERVE_STAGING_FILES: ConfigEntry[Boolean]
/** File replication factor for staging files */
val STAGING_FILE_REPLICATION: ConfigEntry[Short]
/** Whether to roll application master logs */
val AM_LOG_ROLL_ENABLE: ConfigEntry[Boolean]
/** Application master log roll size */
val AM_LOG_ROLL_SIZE: ConfigEntry[Long]
/** Application master log roll interval */
val AM_LOG_ROLL_INTERVAL: ConfigEntry[Long]
}import org.apache.spark.SparkConf
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("BasicYarnConfiguration")
// Application settings
.set("spark.yarn.queue", "production")
.set("spark.yarn.tags", "spark,analytics,batch")
.set("spark.yarn.priority", "5")
// ApplicationMaster settings
.set("spark.yarn.am.memory", "2g")
.set("spark.yarn.am.cores", "2")
.set("spark.yarn.am.memoryOverhead", "512m")
// Executor settings
.set("spark.executor.instances", "10")
.set("spark.executor.memory", "4g")
.set("spark.executor.cores", "2")
.set("spark.executor.memoryOverhead", "1g")import org.apache.spark.SparkConf
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("SecureYarnConfiguration")
// Kerberos authentication
.set("spark.yarn.keytab", "/path/to/user.keytab")
.set("spark.yarn.principal", "user@REALM.COM")
// Security settings
.set("spark.authenticate", "true")
.set("spark.authenticate.secret", "shared-secret")
.set("spark.network.crypto.enabled", "true")
.set("spark.io.encryption.enabled", "true")
// YARN security integration
.set("spark.yarn.security.credentials.hive.enabled", "true")
.set("spark.yarn.security.credentials.hbase.enabled", "true")import org.apache.spark.SparkConf
val conf = new SparkConf()
.setMaster("yarn")
.setAppName("AdvancedResourceConfiguration")
// Node labeling and placement
.set("spark.yarn.executor.nodeLabelExpression", "compute-nodes")
.set("spark.yarn.am.nodeLabelExpression", "management-nodes")
// Custom resources (GPUs, FPGAs, etc.)
.set("spark.executor.resource.gpu.amount", "1")
.set("spark.executor.resource.gpu.discoveryScript", "/opt/spark/gpu-discovery.sh")
// Container settings
.set("spark.yarn.containerLauncherMaxThreads", "50")
.set("spark.yarn.scheduler.heartbeat.interval-ms", "5000")
.set("spark.yarn.scheduler.initial-allocation.interval", "100ms")
// Failure handling
.set("spark.yarn.maxAppAttempts", "3")
.set("spark.yarn.max.executor.failures", "10")
.set("spark.yarn.executor.failuresValidityInterval", "2h")Argument parser for YARN client applications with support for various application types.
/**
* Argument parser for YARN client
* Handles command-line arguments for application submission
* @param args Command line arguments array
*/
class ClientArguments(args: Array[String]) {
/** User application JAR file path */
var userJar: String = null
/** Main class to execute */
var userClass: String = null
/** Primary Python file for PySpark applications */
var primaryPyFile: String = null
/** Primary R file for SparkR applications */
var primaryRFile: String = null
/** User application arguments */
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
/** Enable verbose logging */
var verbose: Boolean = false
/** Application name */
var name: String = "Spark"
/** Spark properties file */
var propertiesFile: String = null
/** Additional Python files */
var pyFiles: String = null
/** Additional files to distribute */
var files: String = null
/** Additional archives to distribute */
var archives: String = null
}Usage Example:
import org.apache.spark.deploy.yarn.ClientArguments
// Parse command line arguments
val args = Array(
"--jar", "/path/to/myapp.jar",
"--class", "com.example.MyMainClass",
"--name", "MySparkApplication",
"--files", "config.properties,data.txt",
"--py-files", "utils.py,helpers.py",
"--archives", "data.zip#data",
"--verbose",
"--", "app-arg1", "app-arg2" // Application arguments after --
)
val clientArgs = new ClientArguments(args)
println(s"JAR: ${clientArgs.userJar}")
println(s"Main class: ${clientArgs.userClass}")
println(s"App args: ${clientArgs.userArgs}")Argument parser for ApplicationMaster with comprehensive application metadata.
/**
* Argument parser for ApplicationMaster
* Handles ApplicationMaster startup arguments and configuration
* @param args Command line arguments array
*/
class ApplicationMasterArguments(args: Array[String]) {
/** User application JAR file path */
var userJar: String = null
/** Main class name to execute */
var userClass: String = null
/** Primary Python file for PySpark */
var primaryPyFile: String = null
/** Primary R file for SparkR */
var primaryRFile: String = null
/** User application arguments sequence */
var userArgs: Seq[String] = Nil
/** Spark properties file path */
var propertiesFile: String = null
/** Distributed cache configuration */
var distCacheConf: String = null
/** Additional Python files */
var pyFiles: String = null
/** Additional files to distribute */
var files: String = null
/** Additional archives to distribute */
var archives: String = null
/** Executor memory setting */
var executorMemory: String = "1g"
/** Executor cores setting */
var executorCores: Int = 1
/** Number of executors */
var numExecutors: Int = 2
}Usage Example:
import org.apache.spark.deploy.yarn.ApplicationMasterArguments
// ApplicationMaster arguments (typically set by YARN)
val amArgs = Array(
"--jar", "hdfs://cluster/apps/myapp.jar",
"--class", "com.example.MyMainClass",
"--properties-file", "__spark_conf__/spark-defaults.conf",
"--dist-cache-conf", "cache-config.txt",
"--executor-memory", "4g",
"--executor-cores", "2",
"--num-executors", "10",
"--", "user-arg1", "user-arg2"
)
val amArguments = new ApplicationMasterArguments(amArgs)
println(s"Executor memory: ${amArguments.executorMemory}")
println(s"Executor cores: ${amArguments.executorCores}")
println(s"Number of executors: ${amArguments.numExecutors}")import org.apache.spark.SparkConf
def createYarnConfig(environment: String): SparkConf = {
val baseConf = new SparkConf()
.setMaster("yarn")
.setAppName(s"MyApp-$environment")
environment match {
case "development" =>
baseConf
.set("spark.yarn.queue", "dev")
.set("spark.executor.instances", "2")
.set("spark.executor.memory", "2g")
.set("spark.yarn.am.memory", "1g")
case "staging" =>
baseConf
.set("spark.yarn.queue", "staging")
.set("spark.executor.instances", "5")
.set("spark.executor.memory", "4g")
.set("spark.yarn.am.memory", "2g")
.set("spark.yarn.max.executor.failures", "3")
case "production" =>
baseConf
.set("spark.yarn.queue", "production")
.set("spark.executor.instances", "20")
.set("spark.executor.memory", "8g")
.set("spark.yarn.am.memory", "4g")
.set("spark.yarn.max.executor.failures", "10")
.set("spark.yarn.keytab", "/etc/security/keytabs/spark.keytab")
.set("spark.yarn.principal", "spark-user@PRODUCTION.COM")
case _ =>
throw new IllegalArgumentException(s"Unknown environment: $environment")
}
}import org.apache.spark.SparkConf
def validateYarnConfiguration(conf: SparkConf): Unit = {
// Validate required settings
require(conf.get("spark.master", "").startsWith("yarn"),
"Master must be 'yarn' for YARN deployment")
// Validate memory settings
val amMemory = conf.get("spark.yarn.am.memory", "512m")
val executorMemory = conf.get("spark.executor.memory", "1g")
require(parseMemory(amMemory) >= 512,
"ApplicationMaster memory must be at least 512MB")
require(parseMemory(executorMemory) >= 1024,
"Executor memory must be at least 1GB")
// Validate core settings
val amCores = conf.getInt("spark.yarn.am.cores", 1)
val executorCores = conf.getInt("spark.executor.cores", 1)
require(amCores >= 1 && amCores <= 8,
"ApplicationMaster cores must be between 1 and 8")
require(executorCores >= 1 && executorCores <= 32,
"Executor cores must be between 1 and 32")
// Validate security settings if enabled
if (conf.getBoolean("spark.authenticate", false)) {
require(conf.contains("spark.yarn.keytab") && conf.contains("spark.yarn.principal"),
"Kerberos authentication requires both keytab and principal")
}
}
def parseMemory(memoryStr: String): Long = {
// Simple memory parsing logic
val pattern = """(\d+)([gmk]?)""".r
memoryStr.toLowerCase match {
case pattern(amount, unit) =>
val multiplier = unit match {
case "g" => 1024L * 1024 * 1024
case "m" => 1024L * 1024
case "k" => 1024L
case "" => 1L
}
amount.toLong * multiplier
case _ => throw new IllegalArgumentException(s"Invalid memory format: $memoryStr")
}
}import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
// Configuration that can be updated at runtime
def updateDynamicAllocation(sc: SparkContext,
minExecutors: Int,
maxExecutors: Int): Unit = {
// Update dynamic allocation settings
sc.conf.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
sc.conf.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
// Request executor changes
val targetExecutors = Math.max(minExecutors,
Math.min(maxExecutors, sc.executorIds.size))
if (targetExecutors > sc.executorIds.size) {
sc.requestTotalExecutors(targetExecutors, 0, Map.empty)
} else if (targetExecutors < sc.executorIds.size) {
val executorsToRemove = sc.executorIds.take(sc.executorIds.size - targetExecutors)
sc.killExecutors(executorsToRemove.toSeq)
}
}import org.apache.spark.SparkConf
object YarnConfigTemplates {
/** Template for batch processing workloads */
def batchProcessingConfig(appName: String): SparkConf = {
new SparkConf()
.setMaster("yarn")
.setAppName(appName)
.set("spark.yarn.queue", "batch")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
.set("spark.executor.instances", "10")
.set("spark.executor.memory", "6g")
.set("spark.executor.cores", "3")
.set("spark.executor.memoryOverhead", "1g")
.set("spark.yarn.am.memory", "2g")
.set("spark.yarn.am.cores", "2")
}
/** Template for streaming workloads */
def streamingConfig(appName: String): SparkConf = {
new SparkConf()
.setMaster("yarn")
.setAppName(appName)
.set("spark.yarn.queue", "streaming")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.streaming.kafka.maxRatePerPartition", "1000")
.set("spark.streaming.dynamicAllocation.enabled", "true")
.set("spark.executor.instances", "5")
.set("spark.executor.memory", "4g")
.set("spark.executor.cores", "2")
.set("spark.yarn.am.memory", "1g")
}
/** Template for machine learning workloads */
def mlConfig(appName: String): SparkConf = {
new SparkConf()
.setMaster("yarn")
.setAppName(appName)
.set("spark.yarn.queue", "ml")
.set("spark.executor.instances", "20")
.set("spark.executor.memory", "8g")
.set("spark.executor.cores", "4")
.set("spark.executor.memoryOverhead", "2g")
.set("spark.executor.resource.gpu.amount", "1")
.set("spark.task.resource.gpu.amount", "0.25")
.set("spark.yarn.am.memory", "4g")
.set("spark.yarn.am.cores", "2")
}
}import org.apache.spark.SparkConf
def calculateOptimalResources(
totalDataSize: Long,
availableNodes: Int,
coresPerNode: Int,
memoryPerNode: Long
): SparkConf = {
// Calculate optimal executor count and sizing
val optimalExecutorsPerNode = Math.max(1, coresPerNode / 4) // 4 cores per executor
val totalExecutors = availableNodes * optimalExecutorsPerNode
val executorCores = Math.min(5, coresPerNode / optimalExecutorsPerNode) // Max 5 cores
val executorMemory = (memoryPerNode * 0.8 / optimalExecutorsPerNode).toLong // 80% of node memory
val memoryOverhead = Math.max(384, (executorMemory * 0.1).toLong) // 10% overhead, min 384MB
new SparkConf()
.setMaster("yarn")
.set("spark.executor.instances", totalExecutors.toString)
.set("spark.executor.cores", executorCores.toString)
.set("spark.executor.memory", s"${executorMemory}m")
.set("spark.executor.memoryOverhead", s"${memoryOverhead}m")
.set("spark.yarn.am.memory", "2g")
.set("spark.yarn.am.cores", "2")
// Additional optimizations based on data size
.set("spark.sql.files.maxPartitionBytes",
Math.min(134217728, totalDataSize / (totalExecutors * executorCores)).toString) // 128MB max
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-yarn-2-12