YARN-specific configuration options for controlling resource allocation, security, and deployment behavior. This module provides comprehensive configuration management for all aspects of YARN integration.
Key configuration entries for YARN integration, organized by functional area.
// Application metadata and submission
val APPLICATION_TAGS: ConfigEntry[Seq[String]]
val MAX_APP_ATTEMPTS: ConfigEntry[Int]
val QUEUE_NAME: ConfigEntry[String]
val PRIORITY: ConfigEntry[Int]
// Resource allocation
val EXECUTOR_INSTANCES: ConfigEntry[Int]
val EXECUTOR_CORES: ConfigEntry[Int]
val EXECUTOR_MEMORY: ConfigEntry[String]
val EXECUTOR_MEMORY_OVERHEAD: OptionalConfigEntry[Long]
// ApplicationMaster configuration
val AM_MEMORY: ConfigEntry[String]
val AM_CORES: ConfigEntry[Int]
val AM_MEMORY_OVERHEAD: ConfigEntry[Long]
// File distribution and staging
val SPARK_ARCHIVE: OptionalConfigEntry[String]
val ARCHIVES: OptionalConfigEntry[Seq[String]]
val FILES: OptionalConfigEntry[Seq[String]]
val JARS: OptionalConfigEntry[Seq[String]]
// Classpath and dependency management
val USER_CLASS_PATH_FIRST: ConfigEntry[Boolean]
val POPULATE_HADOOP_CLASSPATH: ConfigEntry[Boolean]
val CLASSPATH_PREP_TASK: ConfigEntry[Boolean]
// Security configuration
val KERBEROS_PRINCIPAL: OptionalConfigEntry[String]
val KERBEROS_KEYTAB: OptionalConfigEntry[String]
val ACCESS_NAMENODES: ConfigEntry[Seq[String]]
val CREDENTIALS_FILE: OptionalConfigEntry[String]
// Memory and resource constraints
val MEMORY_OVERHEAD_FACTOR: ConfigEntry[Double]
val MEMORY_OVERHEAD_MIN: ConfigEntry[Long]
// Staging and cleanup
val STAGING_DIR: ConfigEntry[String]
val PRESERVE_STAGING_FILES: ConfigEntry[Boolean]
val REPLACE_STAGING_DIR: ConfigEntry[Boolean]
// Container management
val CONTAINER_LAUNCHER_MAX_THREADS: ConfigEntry[Int]
val RM_HEARTBEAT_INTERVAL: ConfigEntry[Long]
val AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL: ConfigEntry[Long]
// Container and node management
val CONTAINER_PLACEMENT_STRATEGY: ConfigEntry[String]
val NODE_LABEL_EXPRESSION: OptionalConfigEntry[String]
val ROLLED_LOG_INCLUDE_PATTERN: OptionalConfigEntry[String]
val ROLLED_LOG_EXCLUDE_PATTERN: OptionalConfigEntry[String]
// Additional internal configuration entries
val APP_JAR: OptionalConfigEntry[String]
val SECONDARY_JARS: OptionalConfigEntry[Seq[String]]
val CACHED_FILES: ConfigEntry[Seq[String]]
val CACHED_FILES_SIZES: ConfigEntry[Seq[Long]]
val CACHED_FILES_TIMESTAMPS: ConfigEntry[Seq[Long]]
val CACHED_FILES_VISIBILITIES: ConfigEntry[Seq[String]]
val CACHED_FILES_TYPES: ConfigEntry[Seq[String]]
val CACHED_CONF_ARCHIVE: OptionalConfigEntry[String]
// Security configuration
val KERBEROS_RELOGIN_PERIOD: ConfigEntry[Long]
val NAMENODES_TO_ACCESS: ConfigEntry[Seq[String]]
val FILESYSTEMS_TO_ACCESS: ConfigEntry[Seq[String]]
// Executor node placement
val EXECUTOR_NODE_LABEL_EXPRESSION: OptionalConfigEntry[String]
// YARN allocator-level blacklisting
val YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED: ConfigEntry[Boolean]// Application identification and metadata
val sparkConf = new SparkConf()
.setAppName("MySparkApplication")
.set("spark.yarn.tags", "batch,analytics,production") // APPLICATION_TAGS
.set("spark.yarn.maxAppAttempts", "3") // MAX_APP_ATTEMPTS
.set("spark.yarn.queue", "production") // QUEUE_NAME
.set("spark.yarn.priority", "10") // PRIORITYConfiguration Details:
spark.yarn.tags
"etl,daily,finance"spark.yarn.maxAppAttempts
spark.yarn.queue
spark.yarn.priority
// Client mode configuration
val clientConf = new SparkConf()
.setMaster("yarn")
.set("spark.submit.deployMode", "client")
.set("spark.yarn.am.waitTime", "100s") // Wait time for AM startup
// Cluster mode configuration
val clusterConf = new SparkConf()
.setMaster("yarn")
.set("spark.submit.deployMode", "cluster")
.set("spark.yarn.submit.waitAppCompletion", "true") // Wait for completionval sparkConf = new SparkConf()
// Executor resource allocation
.set("spark.executor.instances", "20") // EXECUTOR_INSTANCES
.set("spark.executor.cores", "4") // EXECUTOR_CORES
.set("spark.executor.memory", "8g") // EXECUTOR_MEMORY
.set("spark.yarn.executor.memoryOverhead", "1g") // EXECUTOR_MEMORY_OVERHEAD
// Dynamic allocation (alternative to static instances)
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", "5")
.set("spark.dynamicAllocation.maxExecutors", "100")
.set("spark.dynamicAllocation.initialExecutors", "20")Memory Configuration:
spark.executor.memory: JVM heap memory per executorspark.yarn.executor.memoryOverhead: Off-heap memory (native libraries, JVM overhead)Core Configuration:
spark.executor.cores: CPU cores per executor containerval sparkConf = new SparkConf()
.set("spark.yarn.am.memory", "2g") // AM_MEMORY
.set("spark.yarn.am.cores", "2") // AM_CORES
.set("spark.yarn.am.memoryOverhead", "384m") // AM_MEMORY_OVERHEAD
.set("spark.yarn.am.nodeLabelExpression", "compute") // NODE_LABEL_EXPRESSIONApplicationMaster Resources:
val sparkConf = new SparkConf()
.set("spark.yarn.containerLauncherMaxThreads", "25")
.set("spark.yarn.executor.failuresValidityInterval", "1h")
.set("spark.yarn.max.executor.failures", "3")
.set("spark.yarn.executor.launch.blacklist.enabled", "true")val sparkConf = new SparkConf()
// Spark distribution archive
.set("spark.yarn.archive", "hdfs://namenode:8020/spark-libs/spark-2.4.8.tgz") // SPARK_ARCHIVE
// Additional files and archives
.set("spark.yarn.dist.files", "config.properties,data.txt") // FILES
.set("spark.yarn.dist.archives", "libs.tar.gz,resources.zip") // ARCHIVES
.set("spark.yarn.dist.jars", "external-lib.jar,utils.jar") // JARSFile Distribution Types:
spark.yarn.archive
spark.yarn.dist.files
spark.yarn.dist.archives
spark.yarn.dist.jars
spark.jars for automatic classpath inclusionval sparkConf = new SparkConf()
.set("spark.yarn.user.classpath.first", "true") // USER_CLASS_PATH_FIRST
.set("spark.yarn.populateHadoopClasspath", "true") // POPULATE_HADOOP_CLASSPATH
.set("spark.yarn.classpath.prepTask", "true") // CLASSPATH_PREP_TASKClasspath Configuration:
spark.yarn.user.classpath.first
spark.yarn.populateHadoopClasspath
val sparkConf = new SparkConf()
.set("spark.yarn.principal", "spark/_HOST@REALM") // KERBEROS_PRINCIPAL
.set("spark.yarn.keytab", "/etc/security/keytabs/spark.keytab") // KERBEROS_KEYTAB
.set("spark.yarn.access.hadoopFileSystems",
"hdfs://namenode1:8020,hdfs://namenode2:8020") // ACCESS_NAMENODESKerberos Configuration:
spark.yarn.principal
_HOST token for hostname substitutionspark/hostname.domain.com@REALMspark.yarn.keytab
spark.yarn.access.hadoopFileSystems
val sparkConf = new SparkConf()
.set("spark.yarn.credentials.file", "/tmp/hadoop-tokens") // CREDENTIALS_FILE
.set("spark.yarn.credentials.renewalTime", "24h")
.set("spark.yarn.credentials.updateTime", "1h")
// Service-specific credential providers
.set("spark.yarn.security.credentials.hive.enabled", "true")
.set("spark.yarn.security.credentials.hbase.enabled", "true")
.set("spark.yarn.security.credentials.hadoopfs.enabled", "true")val sparkConf = new SparkConf()
// Container launch optimization
.set("spark.yarn.containerLauncherMaxThreads", "50")
.set("spark.yarn.launchContainer.timeout", "120s")
// Memory tuning
.set("spark.yarn.executor.memoryOverheadFactor", "0.15") // 15% overhead
.set("spark.yarn.am.memoryOverhead", "512m")
// Network optimization
.set("spark.yarn.network.timeout", "300s")
.set("spark.yarn.nodemanager.address", "0.0.0.0:8034")val sparkConf = new SparkConf()
.set("spark.yarn.submit.file.replication", "3") // HDFS replication for logs
.set("spark.yarn.preserve.staging.files", "false") // Cleanup staging files
.set("spark.yarn.rolling.strategy", "time") // Log rolling strategy
.set("spark.yarn.rolling.time.interval", "daily") // Rolling intervalval sparkConf = new SparkConf()
// Resource limits
.set("spark.yarn.executor.resource.memory", "8g")
.set("spark.yarn.executor.resource.cpu", "4")
// YARN node labels for placement
.set("spark.yarn.executor.nodeLabelExpression", "compute")
.set("spark.yarn.am.nodeLabelExpression", "management")def validateYarnConfiguration(conf: SparkConf): Unit = {
// Check master URL
require(conf.get("spark.master") == "yarn", "Master must be 'yarn' for YARN mode")
// Validate deploy mode
val deployMode = conf.get("spark.submit.deployMode", "client")
require(deployMode == "client" || deployMode == "cluster",
s"Invalid deploy mode: $deployMode")
// Check resource configuration
val executorMemory = conf.getSizeAsBytes("spark.executor.memory", "1g")
require(executorMemory >= 384 * 1024 * 1024, "Executor memory must be at least 384MB")
// Validate queue name if specified
conf.getOption("spark.yarn.queue").foreach { queue =>
require(queue.nonEmpty, "YARN queue name cannot be empty")
}
}def checkConfigurationConflicts(conf: SparkConf): Unit = {
// Dynamic allocation vs static instances
val dynamicEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
val staticInstances = conf.getOption("spark.executor.instances")
if (dynamicEnabled && staticInstances.isDefined) {
logWarning("Both dynamic allocation and static executor instances configured. " +
"Dynamic allocation will take precedence.")
}
// Security configuration consistency
val principal = conf.getOption("spark.yarn.principal")
val keytab = conf.getOption("spark.yarn.keytab")
if (principal.isDefined != keytab.isDefined) {
throw new IllegalArgumentException(
"Both spark.yarn.principal and spark.yarn.keytab must be specified together")
}
}val devConf = new SparkConf()
.setMaster("yarn")
.set("spark.submit.deployMode", "client")
.set("spark.yarn.queue", "dev")
.set("spark.executor.instances", "2")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
.set("spark.yarn.preserve.staging.files", "true") // Keep files for debuggingval prodConf = new SparkConf()
.setMaster("yarn")
.set("spark.submit.deployMode", "cluster")
.set("spark.yarn.queue", "production")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", "10")
.set("spark.dynamicAllocation.maxExecutors", "200")
.set("spark.yarn.maxAppAttempts", "3")
.set("spark.yarn.archive", "hdfs://namenode:8020/spark-libs/spark-2.4.8.tgz")
.set("spark.yarn.principal", "spark-prod/_HOST@PROD.REALM")
.set("spark.yarn.keytab", "/etc/security/keytabs/spark-prod.keytab")val testConf = new SparkConf()
.setMaster("yarn")
.set("spark.submit.deployMode", "client")
.set("spark.yarn.queue", "test")
.set("spark.executor.instances", "1")
.set("spark.executor.memory", "1g")
.set("spark.executor.cores", "1")
.set("spark.sql.adaptive.enabled", "false") // Disable adaptive query execution
.set("spark.sql.adaptive.coalescePartitions.enabled", "false")// Memory sizing formula
val nodeMemory = 64 * 1024 // 64GB node
val systemReserved = 8 * 1024 // 8GB for OS and services
val yarnMemory = nodeMemory - systemReserved
val executorMemory = 6 * 1024 // 6GB heap
val executorOverhead = math.max(executorMemory * 0.1, 384).toInt // 10% overhead
val totalExecutorMemory = executorMemory + executorOverhead
val executorsPerNode = yarnMemory / totalExecutorMemory // ~9 executors per nodeval secureConf = new SparkConf()
// Use service keytabs, not user tickets
.set("spark.yarn.principal", "spark-service/_HOST@REALM")
.set("spark.yarn.keytab", "/etc/security/keytabs/spark.headless.keytab")
// Enable delegation token renewal
.set("spark.yarn.credentials.renewalTime", "12h")
// Limit application attempts in secure mode
.set("spark.yarn.maxAppAttempts", "1")
// Enable secure communication
.set("spark.authenticate", "true")
.set("spark.network.crypto.enabled", "true")val optimizedConf = new SparkConf()
// Use Spark archive for faster container startup
.set("spark.yarn.archive", "hdfs://namenode:8020/spark-libs/spark-2.4.8.tgz")
// Optimize container launching
.set("spark.yarn.containerLauncherMaxThreads", "25")
// Enable dynamic allocation for variable workloads
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", "5")
.set("spark.dynamicAllocation.maxExecutors", "500")
.set("spark.dynamicAllocation.targetUtilization", "0.8")
// Optimize shuffle and serialization
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")