Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads
—
The YARN module provides comprehensive security and authentication capabilities for Spark applications running on Kerberos-enabled Hadoop clusters. This includes delegation token management, credential renewal, and secure access to HDFS and other Hadoop services.
The YarnSparkHadoopUtil class extends SparkHadoopUtil with YARN-specific security functionality and utilities.
package org.apache.spark.deploy.yarn
class YarnSparkHadoopUtil extends SparkHadoopUtildef isYarnMode(): BooleanAlways returns true since this utility is specific to YARN environments.
def newConfiguration(conf: SparkConf): ConfigurationCreates a new YARN-specific Hadoop configuration with YARN settings applied.
Parameters:
conf: Spark configuration to base the Hadoop configuration onReturns: YarnConfiguration instance with merged settings
Example:
import org.apache.spark.SparkConf
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
val sparkConf = new SparkConf()
.set("spark.yarn.queue", "production")
.set("spark.yarn.am.memory", "2g")
val yarnUtil = YarnSparkHadoopUtil.get
val hadoopConf = yarnUtil.newConfiguration(sparkConf)
println(s"YARN queue: ${hadoopConf.get("mapreduce.job.queuename")}")def getCurrentUserCredentials(): CredentialsRetrieves the current user's Hadoop credentials containing delegation tokens.
def addCurrentUserCredentials(creds: Credentials): UnitAdds additional credentials to the current user's credential store.
Example:
val yarnUtil = YarnSparkHadoopUtil.get
// Get current credentials
val currentCreds = yarnUtil.getCurrentUserCredentials()
println(s"Current tokens: ${currentCreds.numberOfTokens()}")
// Add additional credentials
val additionalCreds = new org.apache.hadoop.security.Credentials()
// ... populate additional credentials
yarnUtil.addCurrentUserCredentials(additionalCreds)def addSecretKeyToUserCredentials(key: String, secret: String): UnitAdds a secret key to the user's credentials for secure communication.
def getSecretKeyFromUserCredentials(key: String): Array[Byte]Retrieves a secret key from the user's credentials.
Example:
val yarnUtil = YarnSparkHadoopUtil.get
// Store a secret key
val secretKey = "spark.authenticate.secret"
val secretValue = "my-secure-secret-key"
yarnUtil.addSecretKeyToUserCredentials(secretKey, secretValue)
// Retrieve the secret key
val retrievedSecret = yarnUtil.getSecretKeyFromUserCredentials(secretKey)
val secretString = new String(retrievedSecret, "UTF-8")
println(s"Retrieved secret: $secretString")def getNameNodesToAccess(sparkConf: SparkConf): Set[Path]Retrieves the set of HDFS namenodes that the application needs to access, based on configuration.
Parameters:
sparkConf: Spark configuration containing namenode settingsReturns: Set of Path objects representing namenode locations
Example:
val sparkConf = new SparkConf()
.set("spark.yarn.access.namenodes", "hdfs://nn1:9000,hdfs://nn2:9000")
val yarnUtil = YarnSparkHadoopUtil.get
val namenodes = yarnUtil.getNameNodesToAccess(sparkConf)
namenodes.foreach { nn =>
println(s"Will access namenode: $nn")
}def obtainTokensForNamenodes(
paths: Set[Path],
conf: Configuration,
creds: Credentials,
renewer: Option[String] = None
): UnitObtains delegation tokens for the specified namenodes and adds them to the credentials.
Parameters:
paths: Set of namenode paths to obtain tokens forconf: Hadoop configurationcreds: Credentials object to store the tokens inrenewer: Optional token renewer principalExample:
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.fs.Path
val yarnUtil = YarnSparkHadoopUtil.get
val credentials = new Credentials()
val namenodes = Set(
new Path("hdfs://namenode1:9000"),
new Path("hdfs://namenode2:9000")
)
yarnUtil.obtainTokensForNamenodes(
paths = namenodes,
conf = hadoopConf,
creds = credentials,
renewer = Some("spark/spark-user@EXAMPLE.COM")
)
println(s"Obtained ${credentials.numberOfTokens()} delegation tokens")def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]]Obtains a delegation token for the Hive metastore service if configured and available.
Parameters:
conf: Hadoop configuration containing Hive settingsReturns: Optional delegation token for Hive metastore
Example:
val hiveToken = yarnUtil.obtainTokenForHiveMetastore(hadoopConf)
hiveToken match {
case Some(token) =>
println(s"Obtained Hive metastore token: ${token.getService}")
case None =>
println("No Hive metastore token needed or available")
}def startExecutorDelegationTokenRenewer(sparkConf: SparkConf): UnitStarts the delegation token renewal service for executors.
def stopExecutorDelegationTokenRenewer(): UnitStops the delegation token renewal service.
Example:
val yarnUtil = YarnSparkHadoopUtil.get
try {
// Start token renewal for executors
yarnUtil.startExecutorDelegationTokenRenewer(sparkConf)
// Application runs with token renewal active
// ...
} finally {
// Stop token renewal
yarnUtil.stopExecutorDelegationTokenRenewer()
}def getContainerId: ContainerIdRetrieves the current YARN container ID from the environment.
Returns: ContainerId of the current container
Example:
val yarnUtil = YarnSparkHadoopUtil.get
val containerId = yarnUtil.getContainerId
println(s"Running in container: $containerId")The companion object provides utility methods and constants for YARN operations.
object YarnSparkHadoopUtilval MEMORY_OVERHEAD_FACTOR = 0.10 // Memory overhead factor (10%)
val MEMORY_OVERHEAD_MIN = 384 // Minimum memory overhead in MB
val ANY_HOST = "*" // Wildcard host designation
val DEFAULT_NUMBER_EXECUTORS = 2 // Default executor count
val RM_REQUEST_PRIORITY: Priority // Resource manager request prioritydef get: YarnSparkHadoopUtilRetrieves the singleton instance of YarnSparkHadoopUtil. Throws exception if not in YARN mode.
def addPathToEnvironment(
env: HashMap[String, String],
key: String,
value: String,
classPathSeparator: String
): UnitAdds a path to an environment variable, handling path separation correctly.
def setEnvFromInputString(
env: HashMap[String, String],
envString: String,
classPathSeparator: String
): UnitSets multiple environment variables from a formatted input string (KEY1=VAL1,KEY2=VAL2).
Example:
import scala.collection.mutable.HashMap
val env = new HashMap[String, String]()
// Add classpath entries
YarnSparkHadoopUtil.addPathToEnvironment(
env, "CLASSPATH", "/opt/spark/jars/*", ":"
)
// Set multiple environment variables
YarnSparkHadoopUtil.setEnvFromInputString(
env, "JAVA_HOME=/opt/jdk8,SPARK_HOME=/opt/spark", ":"
)
env.foreach { case (key, value) =>
println(s"$key = $value")
}def getOutOfMemoryErrorArgument: StringReturns JVM argument for handling out-of-memory errors.
def escapeForShell(arg: String): StringEscapes a string argument for safe use in shell commands.
def getApplicationAclsForYarn(securityMgr: SecurityManager): Map[ApplicationAccessType, String]Retrieves application ACLs for YARN based on security manager configuration.
def expandEnvironment(environment: Environment): StringExpands YARN environment variables to their string representations.
def getClassPathSeparator(): StringReturns the appropriate classpath separator for the current platform.
def getInitialTargetExecutorNumber(
conf: SparkConf,
numExecutors: Int = DEFAULT_NUMBER_EXECUTORS
): IntCalculates the initial target number of executors based on configuration.
The ExecutorDelegationTokenUpdater handles automatic renewal of delegation tokens on executor nodes.
package org.apache.spark.deploy.yarn
class ExecutorDelegationTokenUpdater(
sparkConf: SparkConf,
hadoopConf: Configuration
) extends Loggingdef updateCredentialsIfRequired(): UnitChecks for updated delegation tokens on HDFS and refreshes executor credentials if newer tokens are available. This method:
Example:
import org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater
val sparkConf = new SparkConf()
.set("spark.yarn.credentials.file", "hdfs://namenode:9000/.sparkStaging/app-123/credentials")
val tokenUpdater = new ExecutorDelegationTokenUpdater(sparkConf, hadoopConf)
// Manually trigger credential update (normally done automatically)
tokenUpdater.updateCredentialsIfRequired()
// Stop the updater when shutting down
tokenUpdater.stop()def stop(): UnitStops the delegation token renewal thread and cleans up resources.
The ExecutorDelegationTokenUpdater automatically:
Example configuration:
val sparkConf = new SparkConf()
.set("spark.yarn.credentials.file", "hdfs://namenode:9000/.sparkStaging/app-123/credentials")
.set("spark.yarn.credentials.renewalTime", "3600000") // 1 hour
.set("spark.yarn.credentials.updateTime", "2880000") // 48 minutes (80% of 1 hour)
// Executor automatically creates and manages token updater
val sc = new SparkContext(sparkConf)
// Token renewal happens automatically in backgroundThe AMDelegationTokenRenewer manages token renewal from the ApplicationMaster using Kerberos keytab authentication.
package org.apache.spark.deploy.yarn
class AMDelegationTokenRenewer(
sparkConf: SparkConf,
hadoopConf: Configuration
) extends Loggingdef scheduleLoginFromKeytab(): UnitSchedules periodic login from keytab and creation of new delegation tokens. This method:
Example:
import org.apache.spark.deploy.yarn.AMDelegationTokenRenewer
val sparkConf = new SparkConf()
.set("spark.yarn.principal", "spark/spark-user@EXAMPLE.COM")
.set("spark.yarn.keytab", "/opt/spark/conf/spark.keytab")
.set("spark.yarn.credentials.file", "hdfs://namenode:9000/.sparkStaging/app-123/credentials")
.set("spark.yarn.credentials.file.retention.days", "7")
.set("spark.yarn.credentials.file.retention.count", "10")
val tokenRenewer = new AMDelegationTokenRenewer(sparkConf, hadoopConf)
try {
// Start automatic token renewal
tokenRenewer.scheduleLoginFromKeytab()
// ApplicationMaster continues running with automatic token renewal
// New tokens are periodically written to HDFS for executors
} finally {
// Stop token renewal when shutting down
tokenRenewer.stop()
}def stop(): UnitStops the delegation token renewal service and cleans up resources.
The AMDelegationTokenRenewer manages token files with these features:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.yarn.{YarnSparkHadoopUtil, AMDelegationTokenRenewer}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.fs.Path
object SecureSparkYarnApp {
def main(args: Array[String]): Unit = {
// Configure security settings
val sparkConf = new SparkConf()
.setAppName("SecureSparkApp")
.setMaster("yarn-cluster")
// Kerberos authentication
.set("spark.yarn.principal", "spark/spark-service@EXAMPLE.COM")
.set("spark.yarn.keytab", "/opt/spark/conf/spark.keytab")
// Delegation token management
.set("spark.yarn.credentials.file", "hdfs://namenode:9000/.sparkStaging/{{APP_ID}}/credentials")
.set("spark.yarn.credentials.file.retention.days", "7")
.set("spark.yarn.credentials.file.retention.count", "10")
// Secure HDFS access
.set("spark.yarn.access.namenodes", "hdfs://namenode1:9000,hdfs://namenode2:9000")
.set("spark.yarn.access.hadoopFileSystems", "hdfs://namenode1:9000,hdfs://namenode2:9000")
// Executor configuration
.set("spark.executor.memory", "4g")
.set("spark.executor.cores", "4")
.set("spark.executor.instances", "10")
// Initialize security
val yarnUtil = YarnSparkHadoopUtil.get
val hadoopConf = yarnUtil.newConfiguration(sparkConf)
// Check security status
if (UserGroupInformation.isSecurityEnabled()) {
println("Kerberos security is enabled")
// Get delegation tokens for namenodes
val namenodes = yarnUtil.getNameNodesToAccess(sparkConf)
val credentials = yarnUtil.getCurrentUserCredentials()
yarnUtil.obtainTokensForNamenodes(namenodes, hadoopConf, credentials)
println(s"Obtained tokens for ${namenodes.size} namenodes")
// Get Hive metastore token if needed
yarnUtil.obtainTokenForHiveMetastore(hadoopConf) match {
case Some(token) =>
println("Obtained Hive metastore delegation token")
case None =>
println("No Hive metastore token required")
}
} else {
println("Running in non-secure mode")
}
// Set up delegation token renewal (for ApplicationMaster)
val tokenRenewer = new AMDelegationTokenRenewer(sparkConf, hadoopConf)
try {
// Start SparkContext
val sc = new SparkContext(sparkConf)
// Start token renewal if in secure mode
if (UserGroupInformation.isSecurityEnabled()) {
tokenRenewer.scheduleLoginFromKeytab()
yarnUtil.startExecutorDelegationTokenRenewer(sparkConf)
println("Started delegation token renewal services")
}
// Application logic
val data = sc.textFile("hdfs://namenode1:9000/secure-data/input.txt")
val wordCounts = data.flatMap(_.split("\\s+"))
.map((_, 1))
.reduceByKey(_ + _)
wordCounts.saveAsTextFile("hdfs://namenode1:9000/secure-data/output")
println("Secure Spark application completed successfully")
} finally {
// Cleanup security services
if (UserGroupInformation.isSecurityEnabled()) {
yarnUtil.stopExecutorDelegationTokenRenewer()
tokenRenewer.stop()
println("Stopped delegation token renewal services")
}
}
}
}This comprehensive security framework ensures that Spark applications can run securely on Kerberos-enabled YARN clusters with automatic credential management and token renewal for long-running applications.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-yarn_2-11