tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads
The YARN module provides comprehensive security and authentication capabilities for Spark applications running on Kerberos-enabled Hadoop clusters. This includes delegation token management, credential renewal, and secure access to HDFS and other Hadoop services.
The YarnSparkHadoopUtil class extends SparkHadoopUtil with YARN-specific security functionality and utilities.
package org.apache.spark.deploy.yarn
class YarnSparkHadoopUtil extends SparkHadoopUtildef isYarnMode(): BooleanAlways returns true since this utility is specific to YARN environments.
def newConfiguration(conf: SparkConf): ConfigurationCreates a new YARN-specific Hadoop configuration with YARN settings applied.
Parameters:
conf: Spark configuration to base the Hadoop configuration onReturns: YarnConfiguration instance with merged settings
Example:
import org.apache.spark.SparkConf
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
val sparkConf = new SparkConf()
.set("spark.yarn.queue", "production")
.set("spark.yarn.am.memory", "2g")
val yarnUtil = YarnSparkHadoopUtil.get
val hadoopConf = yarnUtil.newConfiguration(sparkConf)
println(s"YARN queue: ${hadoopConf.get("mapreduce.job.queuename")}")def getCurrentUserCredentials(): CredentialsRetrieves the current user's Hadoop credentials containing delegation tokens.
def addCurrentUserCredentials(creds: Credentials): UnitAdds additional credentials to the current user's credential store.
Example:
val yarnUtil = YarnSparkHadoopUtil.get
// Get current credentials
val currentCreds = yarnUtil.getCurrentUserCredentials()
println(s"Current tokens: ${currentCreds.numberOfTokens()}")
// Add additional credentials
val additionalCreds = new org.apache.hadoop.security.Credentials()
// ... populate additional credentials
yarnUtil.addCurrentUserCredentials(additionalCreds)def addSecretKeyToUserCredentials(key: String, secret: String): UnitAdds a secret key to the user's credentials for secure communication.
def getSecretKeyFromUserCredentials(key: String): Array[Byte]Retrieves a secret key from the user's credentials.
Example:
val yarnUtil = YarnSparkHadoopUtil.get
// Store a secret key
val secretKey = "spark.authenticate.secret"
val secretValue = "my-secure-secret-key"
yarnUtil.addSecretKeyToUserCredentials(secretKey, secretValue)
// Retrieve the secret key
val retrievedSecret = yarnUtil.getSecretKeyFromUserCredentials(secretKey)
val secretString = new String(retrievedSecret, "UTF-8")
println(s"Retrieved secret: $secretString")def getNameNodesToAccess(sparkConf: SparkConf): Set[Path]Retrieves the set of HDFS namenodes that the application needs to access, based on configuration.
Parameters:
sparkConf: Spark configuration containing namenode settingsReturns: Set of Path objects representing namenode locations
Example:
val sparkConf = new SparkConf()
.set("spark.yarn.access.namenodes", "hdfs://nn1:9000,hdfs://nn2:9000")
val yarnUtil = YarnSparkHadoopUtil.get
val namenodes = yarnUtil.getNameNodesToAccess(sparkConf)
namenodes.foreach { nn =>
println(s"Will access namenode: $nn")
}def obtainTokensForNamenodes(
paths: Set[Path],
conf: Configuration,
creds: Credentials,
renewer: Option[String] = None
): UnitObtains delegation tokens for the specified namenodes and adds them to the credentials.
Parameters:
paths: Set of namenode paths to obtain tokens forconf: Hadoop configurationcreds: Credentials object to store the tokens inrenewer: Optional token renewer principalExample:
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.fs.Path
val yarnUtil = YarnSparkHadoopUtil.get
val credentials = new Credentials()
val namenodes = Set(
new Path("hdfs://namenode1:9000"),
new Path("hdfs://namenode2:9000")
)
yarnUtil.obtainTokensForNamenodes(
paths = namenodes,
conf = hadoopConf,
creds = credentials,
renewer = Some("spark/spark-user@EXAMPLE.COM")
)
println(s"Obtained ${credentials.numberOfTokens()} delegation tokens")def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]]Obtains a delegation token for the Hive metastore service if configured and available.
Parameters:
conf: Hadoop configuration containing Hive settingsReturns: Optional delegation token for Hive metastore
Example:
val hiveToken = yarnUtil.obtainTokenForHiveMetastore(hadoopConf)
hiveToken match {
case Some(token) =>
println(s"Obtained Hive metastore token: ${token.getService}")
case None =>
println("No Hive metastore token needed or available")
}def startExecutorDelegationTokenRenewer(sparkConf: SparkConf): UnitStarts the delegation token renewal service for executors.
def stopExecutorDelegationTokenRenewer(): UnitStops the delegation token renewal service.
Example:
val yarnUtil = YarnSparkHadoopUtil.get
try {
// Start token renewal for executors
yarnUtil.startExecutorDelegationTokenRenewer(sparkConf)
// Application runs with token renewal active
// ...
} finally {
// Stop token renewal
yarnUtil.stopExecutorDelegationTokenRenewer()
}def getContainerId: ContainerIdRetrieves the current YARN container ID from the environment.
Returns: ContainerId of the current container
Example:
val yarnUtil = YarnSparkHadoopUtil.get
val containerId = yarnUtil.getContainerId
println(s"Running in container: $containerId")The companion object provides utility methods and constants for YARN operations.
object YarnSparkHadoopUtilval MEMORY_OVERHEAD_FACTOR = 0.10 // Memory overhead factor (10%)
val MEMORY_OVERHEAD_MIN = 384 // Minimum memory overhead in MB
val ANY_HOST = "*" // Wildcard host designation
val DEFAULT_NUMBER_EXECUTORS = 2 // Default executor count
val RM_REQUEST_PRIORITY: Priority // Resource manager request prioritydef get: YarnSparkHadoopUtilRetrieves the singleton instance of YarnSparkHadoopUtil. Throws exception if not in YARN mode.
def addPathToEnvironment(
env: HashMap[String, String],
key: String,
value: String,
classPathSeparator: String
): UnitAdds a path to an environment variable, handling path separation correctly.
def setEnvFromInputString(
env: HashMap[String, String],
envString: String,
classPathSeparator: String
): UnitSets multiple environment variables from a formatted input string (KEY1=VAL1,KEY2=VAL2).
Example:
import scala.collection.mutable.HashMap
val env = new HashMap[String, String]()
// Add classpath entries
YarnSparkHadoopUtil.addPathToEnvironment(
env, "CLASSPATH", "/opt/spark/jars/*", ":"
)
// Set multiple environment variables
YarnSparkHadoopUtil.setEnvFromInputString(
env, "JAVA_HOME=/opt/jdk8,SPARK_HOME=/opt/spark", ":"
)
env.foreach { case (key, value) =>
println(s"$key = $value")
}def getOutOfMemoryErrorArgument: StringReturns JVM argument for handling out-of-memory errors.
def escapeForShell(arg: String): StringEscapes a string argument for safe use in shell commands.
def getApplicationAclsForYarn(securityMgr: SecurityManager): Map[ApplicationAccessType, String]Retrieves application ACLs for YARN based on security manager configuration.
def expandEnvironment(environment: Environment): StringExpands YARN environment variables to their string representations.
def getClassPathSeparator(): StringReturns the appropriate classpath separator for the current platform.
def getInitialTargetExecutorNumber(
conf: SparkConf,
numExecutors: Int = DEFAULT_NUMBER_EXECUTORS
): IntCalculates the initial target number of executors based on configuration.
The ExecutorDelegationTokenUpdater handles automatic renewal of delegation tokens on executor nodes.
package org.apache.spark.deploy.yarn
class ExecutorDelegationTokenUpdater(
sparkConf: SparkConf,
hadoopConf: Configuration
) extends Loggingdef updateCredentialsIfRequired(): UnitChecks for updated delegation tokens on HDFS and refreshes executor credentials if newer tokens are available. This method:
Example:
import org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater
val sparkConf = new SparkConf()
.set("spark.yarn.credentials.file", "hdfs://namenode:9000/.sparkStaging/app-123/credentials")
val tokenUpdater = new ExecutorDelegationTokenUpdater(sparkConf, hadoopConf)
// Manually trigger credential update (normally done automatically)
tokenUpdater.updateCredentialsIfRequired()
// Stop the updater when shutting down
tokenUpdater.stop()def stop(): UnitStops the delegation token renewal thread and cleans up resources.
The ExecutorDelegationTokenUpdater automatically:
Example configuration:
val sparkConf = new SparkConf()
.set("spark.yarn.credentials.file", "hdfs://namenode:9000/.sparkStaging/app-123/credentials")
.set("spark.yarn.credentials.renewalTime", "3600000") // 1 hour
.set("spark.yarn.credentials.updateTime", "2880000") // 48 minutes (80% of 1 hour)
// Executor automatically creates and manages token updater
val sc = new SparkContext(sparkConf)
// Token renewal happens automatically in backgroundThe AMDelegationTokenRenewer manages token renewal from the ApplicationMaster using Kerberos keytab authentication.
package org.apache.spark.deploy.yarn
class AMDelegationTokenRenewer(
sparkConf: SparkConf,
hadoopConf: Configuration
) extends Loggingdef scheduleLoginFromKeytab(): UnitSchedules periodic login from keytab and creation of new delegation tokens. This method:
Example:
import org.apache.spark.deploy.yarn.AMDelegationTokenRenewer
val sparkConf = new SparkConf()
.set("spark.yarn.principal", "spark/spark-user@EXAMPLE.COM")
.set("spark.yarn.keytab", "/opt/spark/conf/spark.keytab")
.set("spark.yarn.credentials.file", "hdfs://namenode:9000/.sparkStaging/app-123/credentials")
.set("spark.yarn.credentials.file.retention.days", "7")
.set("spark.yarn.credentials.file.retention.count", "10")
val tokenRenewer = new AMDelegationTokenRenewer(sparkConf, hadoopConf)
try {
// Start automatic token renewal
tokenRenewer.scheduleLoginFromKeytab()
// ApplicationMaster continues running with automatic token renewal
// New tokens are periodically written to HDFS for executors
} finally {
// Stop token renewal when shutting down
tokenRenewer.stop()
}def stop(): UnitStops the delegation token renewal service and cleans up resources.
The AMDelegationTokenRenewer manages token files with these features:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.yarn.{YarnSparkHadoopUtil, AMDelegationTokenRenewer}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.fs.Path
object SecureSparkYarnApp {
def main(args: Array[String]): Unit = {
// Configure security settings
val sparkConf = new SparkConf()
.setAppName("SecureSparkApp")
.setMaster("yarn-cluster")
// Kerberos authentication
.set("spark.yarn.principal", "spark/spark-service@EXAMPLE.COM")
.set("spark.yarn.keytab", "/opt/spark/conf/spark.keytab")
// Delegation token management
.set("spark.yarn.credentials.file", "hdfs://namenode:9000/.sparkStaging/{{APP_ID}}/credentials")
.set("spark.yarn.credentials.file.retention.days", "7")
.set("spark.yarn.credentials.file.retention.count", "10")
// Secure HDFS access
.set("spark.yarn.access.namenodes", "hdfs://namenode1:9000,hdfs://namenode2:9000")
.set("spark.yarn.access.hadoopFileSystems", "hdfs://namenode1:9000,hdfs://namenode2:9000")
// Executor configuration
.set("spark.executor.memory", "4g")
.set("spark.executor.cores", "4")
.set("spark.executor.instances", "10")
// Initialize security
val yarnUtil = YarnSparkHadoopUtil.get
val hadoopConf = yarnUtil.newConfiguration(sparkConf)
// Check security status
if (UserGroupInformation.isSecurityEnabled()) {
println("Kerberos security is enabled")
// Get delegation tokens for namenodes
val namenodes = yarnUtil.getNameNodesToAccess(sparkConf)
val credentials = yarnUtil.getCurrentUserCredentials()
yarnUtil.obtainTokensForNamenodes(namenodes, hadoopConf, credentials)
println(s"Obtained tokens for ${namenodes.size} namenodes")
// Get Hive metastore token if needed
yarnUtil.obtainTokenForHiveMetastore(hadoopConf) match {
case Some(token) =>
println("Obtained Hive metastore delegation token")
case None =>
println("No Hive metastore token required")
}
} else {
println("Running in non-secure mode")
}
// Set up delegation token renewal (for ApplicationMaster)
val tokenRenewer = new AMDelegationTokenRenewer(sparkConf, hadoopConf)
try {
// Start SparkContext
val sc = new SparkContext(sparkConf)
// Start token renewal if in secure mode
if (UserGroupInformation.isSecurityEnabled()) {
tokenRenewer.scheduleLoginFromKeytab()
yarnUtil.startExecutorDelegationTokenRenewer(sparkConf)
println("Started delegation token renewal services")
}
// Application logic
val data = sc.textFile("hdfs://namenode1:9000/secure-data/input.txt")
val wordCounts = data.flatMap(_.split("\\s+"))
.map((_, 1))
.reduceByKey(_ + _)
wordCounts.saveAsTextFile("hdfs://namenode1:9000/secure-data/output")
println("Secure Spark application completed successfully")
} finally {
// Cleanup security services
if (UserGroupInformation.isSecurityEnabled()) {
yarnUtil.stopExecutorDelegationTokenRenewer()
tokenRenewer.stop()
println("Stopped delegation token renewal services")
}
}
}
}This comprehensive security framework ensures that Spark applications can run securely on Kerberos-enabled YARN clusters with automatic credential management and token renewal for long-running applications.