Apache Spark's integration with Hadoop YARN cluster manager for running Spark applications on YARN clusters
—
The YARN module provides specialized Hadoop utilities that extend Spark's base Hadoop integration with YARN-specific functionality. These utilities handle security, configuration, environment management, and various operational tasks required for running Spark on YARN clusters.
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import scala.collection.mutable.HashMapMain utility class that extends Spark's base Hadoop utilities with YARN-specific functionality.
/**
* YARN-specific Hadoop utilities for Spark
* Extends base SparkHadoopUtil with YARN functionality
*/
class YarnSparkHadoopUtil extends SparkHadoopUtil {
/**
* Transfer credentials between user group information objects
* @param source Source user group information
* @param dest Destination user group information
*/
override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation): Unit
/**
* Returns true indicating YARN mode is enabled
* @return Always true for YARN mode
*/
override def isYarnMode(): Boolean
/**
* Creates a new Hadoop configuration with YARN-specific settings
* @param conf Spark configuration
* @return YarnConfiguration instance
*/
override def newConfiguration(conf: SparkConf): Configuration
/**
* Adds user credentials to job configuration for secure clusters
* @param conf Job configuration to modify
*/
override def addCredentials(conf: JobConf): Unit
/**
* Gets current user's security credentials
* @return Current user's credentials
*/
override def getCurrentUserCredentials(): Credentials
/**
* Adds credentials to current user's credential store
* @param creds Credentials to add
*/
override def addCurrentUserCredentials(creds: Credentials): Unit
/**
* Adds a secret key to current user's credentials
* @param key Secret key name
* @param secret Secret value as string
*/
override def addSecretKeyToUserCredentials(key: String, secret: String): Unit
/**
* Retrieves a secret key from current user's credentials
* @param key Secret key name
* @return Secret value as byte array, or null if not found
*/
override def getSecretKeyFromUserCredentials(key: String): Array[Byte]
}Usage Example:
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
// Create YARN-specific Hadoop utilities
val yarnUtils = new YarnSparkHadoopUtil()
// Check if running in YARN mode
val isYarn = yarnUtils.isYarnMode() // Always true
// Create YARN configuration
val sparkConf = new SparkConf()
val yarnConf = yarnUtils.newConfiguration(sparkConf)
// Manage credentials for secure clusters
val currentCreds = yarnUtils.getCurrentUserCredentials()
yarnUtils.addSecretKeyToUserCredentials("myapp.secret", "secretValue")Companion object providing constants, utility methods, and shared functionality for YARN operations.
/**
* Companion object with YARN utility constants and methods
*/
object YarnSparkHadoopUtil {
// Memory overhead configuration
val MEMORY_OVERHEAD_FACTOR: Double = 0.07 // 7% memory overhead
val MEMORY_OVERHEAD_MIN: Int = 384 // Minimum 384MB overhead
// Host and resource constants
val ANY_HOST: String = "*" // Wildcard for any host
val DEFAULT_NUMBER_EXECUTORS: Int = 2 // Default executor count
val RM_REQUEST_PRIORITY: Int = 1 // ResourceManager request priority
/**
* Adds a path variable to environment map
* Appends to existing value if key already exists
* @param env Environment map to modify
* @param key Environment variable name
* @param value Path value to add
*/
def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit
/**
* Sets environment variables from input string
* Input format: "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3"
* @param env Environment map to modify
* @param inputString Comma-separated key=value pairs
*/
def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit
/**
* Escapes argument for shell execution in YARN
* Handles special characters for bash execution
* @param arg Argument to escape
* @return Shell-escaped argument
*/
def escapeForShell(arg: String): String
/**
* Looks up rack information for a host
* Uses cached rack topology information
* @param conf Hadoop configuration
* @param host Hostname to look up
* @return Rack name for the host
*/
def lookupRack(conf: Configuration, host: String): String
/**
* Populates rack information cache for a hostname
* Resolves rack topology and caches results
* @param conf Hadoop configuration
* @param hostname Hostname to resolve
*/
def populateRackInfo(conf: Configuration, hostname: String): Unit
/**
* Gets application ACLs formatted for YARN
* Converts Spark security manager ACLs to YARN format
* @param securityMgr Spark security manager
* @return Map of YARN application access types to ACL strings
*/
def getApplicationAclsForYarn(securityMgr: SecurityManager): Map[ApplicationAccessType, String]
}Managing environment variables and paths for YARN containers:
import scala.collection.mutable.HashMap
val env = HashMap[String, String]()
// Add path to existing environment variable
YarnSparkHadoopUtil.addPathToEnvironment(env, "CLASSPATH", "/path/to/jar")
YarnSparkHadoopUtil.addPathToEnvironment(env, "CLASSPATH", "/another/path")
// Result: env("CLASSPATH") = "/path/to/jar:/another/path"
// Set multiple environment variables from string
YarnSparkHadoopUtil.setEnvFromInputString(env, "JAVA_HOME=/usr/lib/jvm/java-8,SPARK_HOME=/opt/spark")Properly escape arguments for YARN's bash command execution:
val unsafeArg = "my file with spaces and $special chars"
val safeArg = YarnSparkHadoopUtil.escapeForShell(unsafeArg)
// Result: 'my file with spaces and \$special chars'
// Use in YARN container commands
val command = s"java -cp ${YarnSparkHadoopUtil.escapeForShell(classpath)} MyClass"Handle security credentials for secure Hadoop clusters:
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
val yarnUtils = new YarnSparkHadoopUtil()
// Get current user credentials
val currentCreds = yarnUtils.getCurrentUserCredentials()
// Add secret keys for application security
yarnUtils.addSecretKeyToUserCredentials("spark.authenticate.secret", "mySecretKey")
yarnUtils.addSecretKeyToUserCredentials("app.custom.token", "applicationToken")
// Retrieve secret keys
val authSecret = yarnUtils.getSecretKeyFromUserCredentials("spark.authenticate.secret")
val appToken = yarnUtils.getSecretKeyFromUserCredentials("app.custom.token")
// Transfer credentials between users
val sourceUGI = UserGroupInformation.getCurrentUser()
val destUGI = UserGroupInformation.createProxyUser("appuser", sourceUGI)
yarnUtils.transferCredentials(sourceUGI, destUGI)Integration with Kerberos authentication:
import org.apache.hadoop.mapred.JobConf
val jobConf = new JobConf()
val yarnUtils = new YarnSparkHadoopUtil()
// Add current user's Kerberos credentials to job configuration
yarnUtils.addCredentials(jobConf)
// Credentials include:
// - Kerberos tickets
// - HDFS delegation tokens
// - Other service tokens (HBase, Hive, etc.)Convert Spark security settings to YARN application ACLs:
import org.apache.spark.SecurityManager
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
val sparkConf = new SparkConf()
.set("spark.ui.view.acls", "user1,user2")
.set("spark.modify.acls", "admin1,admin2")
val securityMgr = new SecurityManager(sparkConf)
val yarnAcls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
// Result map:
// ApplicationAccessType.VIEW_APP -> "user1,user2"
// ApplicationAccessType.MODIFY_APP -> "admin1,admin2"Efficient rack topology resolution and caching:
import org.apache.hadoop.conf.Configuration
val hadoopConf = new Configuration()
// Look up rack for a host (with caching)
val rack1 = YarnSparkHadoopUtil.lookupRack(hadoopConf, "node1.example.com")
val rack2 = YarnSparkHadoopUtil.lookupRack(hadoopConf, "node2.example.com")
// Explicitly populate rack information cache
YarnSparkHadoopUtil.populateRackInfo(hadoopConf, "node3.example.com")
// Benefits:
// - Cached lookups for performance
// - Data locality optimization
// - Cross-rack communication minimizationIntegration with Spark's scheduler for optimal task placement:
// Used internally by schedulers for rack-aware task placement
class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
}
}Automatic memory overhead calculation for YARN containers:
// Constants for memory overhead
val overheadFactor = YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR // 0.07 (7%)
val minOverhead = YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN // 384 MB
// Calculate memory overhead for executor
def calculateMemoryOverhead(executorMemoryMB: Int): Int = {
val calculatedOverhead = (executorMemoryMB * overheadFactor).toInt
math.max(calculatedOverhead, minOverhead)
}
// Example: 2GB executor memory
val executorMem = 2048 // 2GB
val overhead = calculateMemoryOverhead(executorMem) // max(143, 384) = 384 MB
val totalMemory = executorMem + overhead // 2432 MB requested from YARNDefault values and constants for resource requests:
// Default configuration values
val defaultExecutors = YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS // 2
val requestPriority = YarnSparkHadoopUtil.RM_REQUEST_PRIORITY // 1
val anyHost = YarnSparkHadoopUtil.ANY_HOST // "*"
// Use in resource requests to YARN ResourceManager
val resourceRequest = Records.newRecord(classOf[ResourceRequest])
resourceRequest.setPriority(Priority.newInstance(requestPriority))
resourceRequest.setResourceName(anyHost) // No locality preferenceCreate properly configured YarnConfiguration instances:
val sparkConf = new SparkConf()
.set("spark.yarn.queue", "production")
.set("spark.yarn.am.memory", "1g")
val yarnUtils = new YarnSparkHadoopUtil()
val yarnConf = yarnUtils.newConfiguration(sparkConf).asInstanceOf[YarnConfiguration]
// YarnConfiguration includes:
// - Spark configuration properties
// - Hadoop configuration from classpath
// - YARN-specific configuration
// - Security settings and credentialsReliable detection of YARN execution mode:
val yarnUtils = new YarnSparkHadoopUtil()
// Always returns true in YARN module
val isYarnMode = yarnUtils.isYarnMode()
// Used by Spark core to enable YARN-specific behavior:
// - Different security handling
// - YARN-specific UI integration
// - Special configuration validation
// - YARN-aware error messagesReliable hostname and rack resolution with error handling:
// Handles various failure scenarios:
// - DNS resolution failures
// - Network connectivity issues
// - Rack resolver configuration problems
// - Invalid hostname formats
try {
YarnSparkHadoopUtil.populateRackInfo(hadoopConf, hostname)
val rack = YarnSparkHadoopUtil.lookupRack(hadoopConf, hostname)
} catch {
case e: Exception =>
// Graceful degradation - continue without rack information
logWarning(s"Failed to resolve rack for $hostname", e)
}Comprehensive error handling for security operations:
try {
yarnUtils.addSecretKeyToUserCredentials(key, secret)
} catch {
case e: SecurityException =>
logError("Failed to add secret key - insufficient permissions", e)
case e: IOException =>
logError("Failed to add secret key - credential store error", e)
}Automatic integration with SparkContext initialization:
// Automatic registration as Hadoop util implementation
SparkHadoopUtil.get // Returns YarnSparkHadoopUtil instance in YARN mode
// Used throughout Spark for:
// - Configuration management
// - Security operations
// - File system access
// - Credential handlingEnhanced file system integration for YARN environments:
val yarnConf = yarnUtils.newConfiguration(sparkConf)
val fs = FileSystem.get(yarnConf)
// FileSystem configured with:
// - YARN-specific authentication
// - Proper delegation tokens
// - Security credentials
// - Optimal configuration for YARN clustersInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--yarn-parent-2-11