or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-yarn_2.11@1.6.x

docs

application-management.mdapplication-master.mdconfiguration-utilities.mdindex.mdresource-management.mdscheduler-backends.mdsecurity-authentication.md
tile.json

tessl/maven-org-apache-spark--spark-yarn_2-11

tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0

Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads

security-authentication.mddocs/

Security and Authentication

The YARN module provides comprehensive security and authentication capabilities for Spark applications running on Kerberos-enabled Hadoop clusters. This includes delegation token management, credential renewal, and secure access to HDFS and other Hadoop services.

YarnSparkHadoopUtil

The YarnSparkHadoopUtil class extends SparkHadoopUtil with YARN-specific security functionality and utilities.

package org.apache.spark.deploy.yarn

class YarnSparkHadoopUtil extends SparkHadoopUtil

Core Security Methods

YARN Mode Detection

def isYarnMode(): Boolean

Always returns true since this utility is specific to YARN environments.

Configuration Management

def newConfiguration(conf: SparkConf): Configuration

Creates a new YARN-specific Hadoop configuration with YARN settings applied.

Parameters:

  • conf: Spark configuration to base the Hadoop configuration on

Returns: YarnConfiguration instance with merged settings

Example:

import org.apache.spark.SparkConf
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil

val sparkConf = new SparkConf()
  .set("spark.yarn.queue", "production")
  .set("spark.yarn.am.memory", "2g")

val yarnUtil = YarnSparkHadoopUtil.get
val hadoopConf = yarnUtil.newConfiguration(sparkConf)

println(s"YARN queue: ${hadoopConf.get("mapreduce.job.queuename")}")

Credential Management

Current User Credentials

def getCurrentUserCredentials(): Credentials

Retrieves the current user's Hadoop credentials containing delegation tokens.

def addCurrentUserCredentials(creds: Credentials): Unit

Adds additional credentials to the current user's credential store.

Example:

val yarnUtil = YarnSparkHadoopUtil.get

// Get current credentials
val currentCreds = yarnUtil.getCurrentUserCredentials()
println(s"Current tokens: ${currentCreds.numberOfTokens()}")

// Add additional credentials
val additionalCreds = new org.apache.hadoop.security.Credentials()
// ... populate additional credentials
yarnUtil.addCurrentUserCredentials(additionalCreds)

Secret Key Management

def addSecretKeyToUserCredentials(key: String, secret: String): Unit

Adds a secret key to the user's credentials for secure communication.

def getSecretKeyFromUserCredentials(key: String): Array[Byte]

Retrieves a secret key from the user's credentials.

Example:

val yarnUtil = YarnSparkHadoopUtil.get

// Store a secret key
val secretKey = "spark.authenticate.secret"
val secretValue = "my-secure-secret-key"
yarnUtil.addSecretKeyToUserCredentials(secretKey, secretValue)

// Retrieve the secret key
val retrievedSecret = yarnUtil.getSecretKeyFromUserCredentials(secretKey)
val secretString = new String(retrievedSecret, "UTF-8")
println(s"Retrieved secret: $secretString")

Namenode Access and Token Management

Namenode Discovery

def getNameNodesToAccess(sparkConf: SparkConf): Set[Path]

Retrieves the set of HDFS namenodes that the application needs to access, based on configuration.

Parameters:

  • sparkConf: Spark configuration containing namenode settings

Returns: Set of Path objects representing namenode locations

Example:

val sparkConf = new SparkConf()
  .set("spark.yarn.access.namenodes", "hdfs://nn1:9000,hdfs://nn2:9000")

val yarnUtil = YarnSparkHadoopUtil.get
val namenodes = yarnUtil.getNameNodesToAccess(sparkConf)

namenodes.foreach { nn =>
  println(s"Will access namenode: $nn")
}

Delegation Token Acquisition

def obtainTokensForNamenodes(
  paths: Set[Path],
  conf: Configuration,
  creds: Credentials,
  renewer: Option[String] = None
): Unit

Obtains delegation tokens for the specified namenodes and adds them to the credentials.

Parameters:

  • paths: Set of namenode paths to obtain tokens for
  • conf: Hadoop configuration
  • creds: Credentials object to store the tokens in
  • renewer: Optional token renewer principal

Example:

import org.apache.hadoop.security.Credentials
import org.apache.hadoop.fs.Path

val yarnUtil = YarnSparkHadoopUtil.get
val credentials = new Credentials()
val namenodes = Set(
  new Path("hdfs://namenode1:9000"),
  new Path("hdfs://namenode2:9000")
)

yarnUtil.obtainTokensForNamenodes(
  paths = namenodes,
  conf = hadoopConf,
  creds = credentials,
  renewer = Some("spark/spark-user@EXAMPLE.COM")
)

println(s"Obtained ${credentials.numberOfTokens()} delegation tokens")

Hive Metastore Integration

def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]]

Obtains a delegation token for the Hive metastore service if configured and available.

Parameters:

  • conf: Hadoop configuration containing Hive settings

Returns: Optional delegation token for Hive metastore

Example:

val hiveToken = yarnUtil.obtainTokenForHiveMetastore(hadoopConf)

hiveToken match {
  case Some(token) =>
    println(s"Obtained Hive metastore token: ${token.getService}")
  case None =>
    println("No Hive metastore token needed or available")
}

Executor Token Renewal

def startExecutorDelegationTokenRenewer(sparkConf: SparkConf): Unit

Starts the delegation token renewal service for executors.

def stopExecutorDelegationTokenRenewer(): Unit

Stops the delegation token renewal service.

Example:

val yarnUtil = YarnSparkHadoopUtil.get

try {
  // Start token renewal for executors
  yarnUtil.startExecutorDelegationTokenRenewer(sparkConf)
  
  // Application runs with token renewal active
  // ...
  
} finally {
  // Stop token renewal
  yarnUtil.stopExecutorDelegationTokenRenewer()
}

Container Information

def getContainerId: ContainerId

Retrieves the current YARN container ID from the environment.

Returns: ContainerId of the current container

Example:

val yarnUtil = YarnSparkHadoopUtil.get
val containerId = yarnUtil.getContainerId
println(s"Running in container: $containerId")

YarnSparkHadoopUtil Companion Object

The companion object provides utility methods and constants for YARN operations.

object YarnSparkHadoopUtil

Constants

val MEMORY_OVERHEAD_FACTOR = 0.10    // Memory overhead factor (10%)
val MEMORY_OVERHEAD_MIN = 384        // Minimum memory overhead in MB
val ANY_HOST = "*"                   // Wildcard host designation
val DEFAULT_NUMBER_EXECUTORS = 2     // Default executor count
val RM_REQUEST_PRIORITY: Priority    // Resource manager request priority

Singleton Access

def get: YarnSparkHadoopUtil

Retrieves the singleton instance of YarnSparkHadoopUtil. Throws exception if not in YARN mode.

Environment Management

def addPathToEnvironment(
  env: HashMap[String, String], 
  key: String, 
  value: String,
  classPathSeparator: String
): Unit

Adds a path to an environment variable, handling path separation correctly.

def setEnvFromInputString(
  env: HashMap[String, String], 
  envString: String, 
  classPathSeparator: String
): Unit

Sets multiple environment variables from a formatted input string (KEY1=VAL1,KEY2=VAL2).

Example:

import scala.collection.mutable.HashMap

val env = new HashMap[String, String]()

// Add classpath entries
YarnSparkHadoopUtil.addPathToEnvironment(
  env, "CLASSPATH", "/opt/spark/jars/*", ":"
)

// Set multiple environment variables
YarnSparkHadoopUtil.setEnvFromInputString(
  env, "JAVA_HOME=/opt/jdk8,SPARK_HOME=/opt/spark", ":"
)

env.foreach { case (key, value) =>
  println(s"$key = $value")
}

Utility Methods

def getOutOfMemoryErrorArgument: String

Returns JVM argument for handling out-of-memory errors.

def escapeForShell(arg: String): String

Escapes a string argument for safe use in shell commands.

def getApplicationAclsForYarn(securityMgr: SecurityManager): Map[ApplicationAccessType, String]

Retrieves application ACLs for YARN based on security manager configuration.

def expandEnvironment(environment: Environment): String

Expands YARN environment variables to their string representations.

def getClassPathSeparator(): String

Returns the appropriate classpath separator for the current platform.

def getInitialTargetExecutorNumber(
  conf: SparkConf, 
  numExecutors: Int = DEFAULT_NUMBER_EXECUTORS
): Int

Calculates the initial target number of executors based on configuration.

ExecutorDelegationTokenUpdater

The ExecutorDelegationTokenUpdater handles automatic renewal of delegation tokens on executor nodes.

package org.apache.spark.deploy.yarn

class ExecutorDelegationTokenUpdater(
  sparkConf: SparkConf,
  hadoopConf: Configuration
) extends Logging

Token Update Management

def updateCredentialsIfRequired(): Unit

Checks for updated delegation tokens on HDFS and refreshes executor credentials if newer tokens are available. This method:

  • Reads credentials from the configured credentials file path
  • Compares timestamps to determine if updates are needed
  • Updates the current user's credentials with new tokens
  • Schedules the next update based on token expiration time

Example:

import org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater

val sparkConf = new SparkConf()
  .set("spark.yarn.credentials.file", "hdfs://namenode:9000/.sparkStaging/app-123/credentials")

val tokenUpdater = new ExecutorDelegationTokenUpdater(sparkConf, hadoopConf)

// Manually trigger credential update (normally done automatically)
tokenUpdater.updateCredentialsIfRequired()

// Stop the updater when shutting down
tokenUpdater.stop()
def stop(): Unit

Stops the delegation token renewal thread and cleans up resources.

Automatic Renewal Process

The ExecutorDelegationTokenUpdater automatically:

  1. Monitors Token Expiration: Checks token validity at 80% of expiration time
  2. Reads Updated Tokens: Fetches new tokens from HDFS staging area
  3. Updates Credentials: Refreshes the current UGI with new tokens
  4. Schedules Next Update: Calculates next renewal time based on new token expiration

Example configuration:

val sparkConf = new SparkConf()
  .set("spark.yarn.credentials.file", "hdfs://namenode:9000/.sparkStaging/app-123/credentials")
  .set("spark.yarn.credentials.renewalTime", "3600000") // 1 hour
  .set("spark.yarn.credentials.updateTime", "2880000")  // 48 minutes (80% of 1 hour)

// Executor automatically creates and manages token updater
val sc = new SparkContext(sparkConf)
// Token renewal happens automatically in background

AMDelegationTokenRenewer

The AMDelegationTokenRenewer manages token renewal from the ApplicationMaster using Kerberos keytab authentication.

package org.apache.spark.deploy.yarn

class AMDelegationTokenRenewer(
  sparkConf: SparkConf,
  hadoopConf: Configuration
) extends Logging

Keytab-Based Renewal

def scheduleLoginFromKeytab(): Unit

Schedules periodic login from keytab and creation of new delegation tokens. This method:

  • Reads principal and keytab from Spark configuration
  • Schedules periodic renewal at 75% of token lifetime
  • Creates new tokens and writes them to HDFS for executor consumption
  • Handles immediate renewal if current tokens have expired

Example:

import org.apache.spark.deploy.yarn.AMDelegationTokenRenewer

val sparkConf = new SparkConf()
  .set("spark.yarn.principal", "spark/spark-user@EXAMPLE.COM")
  .set("spark.yarn.keytab", "/opt/spark/conf/spark.keytab")
  .set("spark.yarn.credentials.file", "hdfs://namenode:9000/.sparkStaging/app-123/credentials")
  .set("spark.yarn.credentials.file.retention.days", "7")
  .set("spark.yarn.credentials.file.retention.count", "10")

val tokenRenewer = new AMDelegationTokenRenewer(sparkConf, hadoopConf)

try {
  // Start automatic token renewal
  tokenRenewer.scheduleLoginFromKeytab()
  
  // ApplicationMaster continues running with automatic token renewal
  // New tokens are periodically written to HDFS for executors
  
} finally {
  // Stop token renewal when shutting down
  tokenRenewer.stop()
}
def stop(): Unit

Stops the delegation token renewal service and cleans up resources.

Token File Management

The AMDelegationTokenRenewer manages token files with these features:

  • Versioned Files: Creates numbered credential files (credentials-1, credentials-2, etc.)
  • Atomic Updates: Uses temporary files and atomic rename operations
  • Cleanup: Automatically removes old credential files based on retention policies
  • Fault Tolerance: Handles failures gracefully with retry mechanisms

Complete Security Setup Example

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.yarn.{YarnSparkHadoopUtil, AMDelegationTokenRenewer}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.fs.Path

object SecureSparkYarnApp {
  def main(args: Array[String]): Unit = {
    // Configure security settings
    val sparkConf = new SparkConf()
      .setAppName("SecureSparkApp")
      .setMaster("yarn-cluster")
      // Kerberos authentication
      .set("spark.yarn.principal", "spark/spark-service@EXAMPLE.COM")
      .set("spark.yarn.keytab", "/opt/spark/conf/spark.keytab")
      // Delegation token management
      .set("spark.yarn.credentials.file", "hdfs://namenode:9000/.sparkStaging/{{APP_ID}}/credentials")
      .set("spark.yarn.credentials.file.retention.days", "7")
      .set("spark.yarn.credentials.file.retention.count", "10")
      // Secure HDFS access
      .set("spark.yarn.access.namenodes", "hdfs://namenode1:9000,hdfs://namenode2:9000")
      .set("spark.yarn.access.hadoopFileSystems", "hdfs://namenode1:9000,hdfs://namenode2:9000")
      // Executor configuration
      .set("spark.executor.memory", "4g")
      .set("spark.executor.cores", "4")
      .set("spark.executor.instances", "10")

    // Initialize security
    val yarnUtil = YarnSparkHadoopUtil.get
    val hadoopConf = yarnUtil.newConfiguration(sparkConf)
    
    // Check security status
    if (UserGroupInformation.isSecurityEnabled()) {
      println("Kerberos security is enabled")
      
      // Get delegation tokens for namenodes
      val namenodes = yarnUtil.getNameNodesToAccess(sparkConf)
      val credentials = yarnUtil.getCurrentUserCredentials()
      
      yarnUtil.obtainTokensForNamenodes(namenodes, hadoopConf, credentials)
      println(s"Obtained tokens for ${namenodes.size} namenodes")
      
      // Get Hive metastore token if needed
      yarnUtil.obtainTokenForHiveMetastore(hadoopConf) match {
        case Some(token) =>
          println("Obtained Hive metastore delegation token")
        case None =>
          println("No Hive metastore token required")
      }
    } else {
      println("Running in non-secure mode")
    }
    
    // Set up delegation token renewal (for ApplicationMaster)
    val tokenRenewer = new AMDelegationTokenRenewer(sparkConf, hadoopConf)
    
    try {
      // Start SparkContext
      val sc = new SparkContext(sparkConf)
      
      // Start token renewal if in secure mode
      if (UserGroupInformation.isSecurityEnabled()) {
        tokenRenewer.scheduleLoginFromKeytab()
        yarnUtil.startExecutorDelegationTokenRenewer(sparkConf)
        println("Started delegation token renewal services")
      }
      
      // Application logic
      val data = sc.textFile("hdfs://namenode1:9000/secure-data/input.txt")
      val wordCounts = data.flatMap(_.split("\\s+"))
                          .map((_, 1))
                          .reduceByKey(_ + _)
      
      wordCounts.saveAsTextFile("hdfs://namenode1:9000/secure-data/output")
      println("Secure Spark application completed successfully")
      
    } finally {
      // Cleanup security services
      if (UserGroupInformation.isSecurityEnabled()) {
        yarnUtil.stopExecutorDelegationTokenRenewer()
        tokenRenewer.stop()
        println("Stopped delegation token renewal services")
      }
    }
  }
}

This comprehensive security framework ensures that Spark applications can run securely on Kerberos-enabled YARN clusters with automatic credential management and token renewal for long-running applications.