CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--yarn-parent-2-11

Apache Spark's integration with Hadoop YARN cluster manager for running Spark applications on YARN clusters

Pending
Overview
Eval results
Files

hadoop-utils.mddocs/

Hadoop Utilities

The YARN module provides specialized Hadoop utilities that extend Spark's base Hadoop integration with YARN-specific functionality. These utilities handle security, configuration, environment management, and various operational tasks required for running Spark on YARN clusters.

Imports

import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import scala.collection.mutable.HashMap

Capabilities

YarnSparkHadoopUtil Class

Main utility class that extends Spark's base Hadoop utilities with YARN-specific functionality.

/**
 * YARN-specific Hadoop utilities for Spark
 * Extends base SparkHadoopUtil with YARN functionality
 */
class YarnSparkHadoopUtil extends SparkHadoopUtil {
  
  /**
   * Transfer credentials between user group information objects
   * @param source Source user group information
   * @param dest Destination user group information
   */
  override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation): Unit
  
  /**
   * Returns true indicating YARN mode is enabled
   * @return Always true for YARN mode
   */
  override def isYarnMode(): Boolean
  
  /**
   * Creates a new Hadoop configuration with YARN-specific settings
   * @param conf Spark configuration
   * @return YarnConfiguration instance
   */
  override def newConfiguration(conf: SparkConf): Configuration
  
  /**
   * Adds user credentials to job configuration for secure clusters
   * @param conf Job configuration to modify
   */
  override def addCredentials(conf: JobConf): Unit
  
  /**
   * Gets current user's security credentials
   * @return Current user's credentials
   */
  override def getCurrentUserCredentials(): Credentials
  
  /**
   * Adds credentials to current user's credential store
   * @param creds Credentials to add
   */
  override def addCurrentUserCredentials(creds: Credentials): Unit
  
  /**
   * Adds a secret key to current user's credentials
   * @param key Secret key name
   * @param secret Secret value as string
   */
  override def addSecretKeyToUserCredentials(key: String, secret: String): Unit
  
  /**
   * Retrieves a secret key from current user's credentials
   * @param key Secret key name
   * @return Secret value as byte array, or null if not found
   */
  override def getSecretKeyFromUserCredentials(key: String): Array[Byte]
}

Usage Example:

import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil

// Create YARN-specific Hadoop utilities
val yarnUtils = new YarnSparkHadoopUtil()

// Check if running in YARN mode
val isYarn = yarnUtils.isYarnMode() // Always true

// Create YARN configuration
val sparkConf = new SparkConf()
val yarnConf = yarnUtils.newConfiguration(sparkConf)

// Manage credentials for secure clusters
val currentCreds = yarnUtils.getCurrentUserCredentials()
yarnUtils.addSecretKeyToUserCredentials("myapp.secret", "secretValue")

YarnSparkHadoopUtil Object

Companion object providing constants, utility methods, and shared functionality for YARN operations.

/**
 * Companion object with YARN utility constants and methods
 */
object YarnSparkHadoopUtil {
  
  // Memory overhead configuration
  val MEMORY_OVERHEAD_FACTOR: Double = 0.07  // 7% memory overhead
  val MEMORY_OVERHEAD_MIN: Int = 384         // Minimum 384MB overhead
  
  // Host and resource constants
  val ANY_HOST: String = "*"                 // Wildcard for any host
  val DEFAULT_NUMBER_EXECUTORS: Int = 2      // Default executor count
  val RM_REQUEST_PRIORITY: Int = 1           // ResourceManager request priority
  
  /**
   * Adds a path variable to environment map
   * Appends to existing value if key already exists
   * @param env Environment map to modify
   * @param key Environment variable name
   * @param value Path value to add
   */
  def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit
  
  /**
   * Sets environment variables from input string
   * Input format: "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3"
   * @param env Environment map to modify
   * @param inputString Comma-separated key=value pairs
   */
  def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit
  
  /**
   * Escapes argument for shell execution in YARN
   * Handles special characters for bash execution
   * @param arg Argument to escape
   * @return Shell-escaped argument
   */
  def escapeForShell(arg: String): String
  
  /**
   * Looks up rack information for a host
   * Uses cached rack topology information
   * @param conf Hadoop configuration
   * @param host Hostname to look up
   * @return Rack name for the host
   */
  def lookupRack(conf: Configuration, host: String): String
  
  /**
   * Populates rack information cache for a hostname
   * Resolves rack topology and caches results
   * @param conf Hadoop configuration
   * @param hostname Hostname to resolve
   */
  def populateRackInfo(conf: Configuration, hostname: String): Unit
  
  /**
   * Gets application ACLs formatted for YARN
   * Converts Spark security manager ACLs to YARN format
   * @param securityMgr Spark security manager
   * @return Map of YARN application access types to ACL strings
   */
  def getApplicationAclsForYarn(securityMgr: SecurityManager): Map[ApplicationAccessType, String]
}

Environment Management

Path Variable Handling

Managing environment variables and paths for YARN containers:

import scala.collection.mutable.HashMap

val env = HashMap[String, String]()

// Add path to existing environment variable
YarnSparkHadoopUtil.addPathToEnvironment(env, "CLASSPATH", "/path/to/jar")
YarnSparkHadoopUtil.addPathToEnvironment(env, "CLASSPATH", "/another/path")
// Result: env("CLASSPATH") = "/path/to/jar:/another/path"

// Set multiple environment variables from string
YarnSparkHadoopUtil.setEnvFromInputString(env, "JAVA_HOME=/usr/lib/jvm/java-8,SPARK_HOME=/opt/spark")

Shell Argument Escaping

Properly escape arguments for YARN's bash command execution:

val unsafeArg = "my file with spaces and $special chars"
val safeArg = YarnSparkHadoopUtil.escapeForShell(unsafeArg)
// Result: 'my file with spaces and \$special chars'

// Use in YARN container commands
val command = s"java -cp ${YarnSparkHadoopUtil.escapeForShell(classpath)} MyClass"

Security and Credentials

Credential Management

Handle security credentials for secure Hadoop clusters:

import org.apache.hadoop.security.{Credentials, UserGroupInformation}

val yarnUtils = new YarnSparkHadoopUtil()

// Get current user credentials
val currentCreds = yarnUtils.getCurrentUserCredentials()

// Add secret keys for application security
yarnUtils.addSecretKeyToUserCredentials("spark.authenticate.secret", "mySecretKey")
yarnUtils.addSecretKeyToUserCredentials("app.custom.token", "applicationToken")

// Retrieve secret keys
val authSecret = yarnUtils.getSecretKeyFromUserCredentials("spark.authenticate.secret")
val appToken = yarnUtils.getSecretKeyFromUserCredentials("app.custom.token")

// Transfer credentials between users
val sourceUGI = UserGroupInformation.getCurrentUser()
val destUGI = UserGroupInformation.createProxyUser("appuser", sourceUGI)
yarnUtils.transferCredentials(sourceUGI, destUGI)

Kerberos Integration

Integration with Kerberos authentication:

import org.apache.hadoop.mapred.JobConf

val jobConf = new JobConf()
val yarnUtils = new YarnSparkHadoopUtil()

// Add current user's Kerberos credentials to job configuration
yarnUtils.addCredentials(jobConf)

// Credentials include:
// - Kerberos tickets
// - HDFS delegation tokens  
// - Other service tokens (HBase, Hive, etc.)

Application ACLs

Convert Spark security settings to YARN application ACLs:

import org.apache.spark.SecurityManager
import org.apache.hadoop.yarn.api.records.ApplicationAccessType

val sparkConf = new SparkConf()
  .set("spark.ui.view.acls", "user1,user2")
  .set("spark.modify.acls", "admin1,admin2")

val securityMgr = new SecurityManager(sparkConf)
val yarnAcls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)

// Result map:
// ApplicationAccessType.VIEW_APP -> "user1,user2"
// ApplicationAccessType.MODIFY_APP -> "admin1,admin2"

Network Topology and Rack Awareness

Rack Information Management

Efficient rack topology resolution and caching:

import org.apache.hadoop.conf.Configuration

val hadoopConf = new Configuration()

// Look up rack for a host (with caching)
val rack1 = YarnSparkHadoopUtil.lookupRack(hadoopConf, "node1.example.com")
val rack2 = YarnSparkHadoopUtil.lookupRack(hadoopConf, "node2.example.com")

// Explicitly populate rack information cache
YarnSparkHadoopUtil.populateRackInfo(hadoopConf, "node3.example.com")

// Benefits:
// - Cached lookups for performance
// - Data locality optimization
// - Cross-rack communication minimization

Topology-Aware Scheduling

Integration with Spark's scheduler for optimal task placement:

// Used internally by schedulers for rack-aware task placement
class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
  override def getRackForHost(hostPort: String): Option[String] = {
    val host = Utils.parseHostPort(hostPort)._1
    Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
  }
}

Resource Configuration

Memory Overhead Calculation

Automatic memory overhead calculation for YARN containers:

// Constants for memory overhead
val overheadFactor = YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR  // 0.07 (7%)
val minOverhead = YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN        // 384 MB

// Calculate memory overhead for executor
def calculateMemoryOverhead(executorMemoryMB: Int): Int = {
  val calculatedOverhead = (executorMemoryMB * overheadFactor).toInt
  math.max(calculatedOverhead, minOverhead)
}

// Example: 2GB executor memory
val executorMem = 2048  // 2GB
val overhead = calculateMemoryOverhead(executorMem)  // max(143, 384) = 384 MB
val totalMemory = executorMem + overhead  // 2432 MB requested from YARN

Resource Request Configuration

Default values and constants for resource requests:

// Default configuration values
val defaultExecutors = YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS  // 2
val requestPriority = YarnSparkHadoopUtil.RM_REQUEST_PRIORITY        // 1
val anyHost = YarnSparkHadoopUtil.ANY_HOST                          // "*"

// Use in resource requests to YARN ResourceManager
val resourceRequest = Records.newRecord(classOf[ResourceRequest])
resourceRequest.setPriority(Priority.newInstance(requestPriority))
resourceRequest.setResourceName(anyHost)  // No locality preference

Configuration Integration

YARN Configuration Creation

Create properly configured YarnConfiguration instances:

val sparkConf = new SparkConf()
  .set("spark.yarn.queue", "production")
  .set("spark.yarn.am.memory", "1g")

val yarnUtils = new YarnSparkHadoopUtil()
val yarnConf = yarnUtils.newConfiguration(sparkConf).asInstanceOf[YarnConfiguration]

// YarnConfiguration includes:
// - Spark configuration properties
// - Hadoop configuration from classpath
// - YARN-specific configuration
// - Security settings and credentials

Mode Detection

Reliable detection of YARN execution mode:

val yarnUtils = new YarnSparkHadoopUtil()

// Always returns true in YARN module
val isYarnMode = yarnUtils.isYarnMode()

// Used by Spark core to enable YARN-specific behavior:
// - Different security handling
// - YARN-specific UI integration  
// - Special configuration validation
// - YARN-aware error messages

Error Handling and Diagnostics

Robust Network Resolution

Reliable hostname and rack resolution with error handling:

// Handles various failure scenarios:
// - DNS resolution failures
// - Network connectivity issues
// - Rack resolver configuration problems  
// - Invalid hostname formats

try {
  YarnSparkHadoopUtil.populateRackInfo(hadoopConf, hostname)
  val rack = YarnSparkHadoopUtil.lookupRack(hadoopConf, hostname)
} catch {
  case e: Exception =>
    // Graceful degradation - continue without rack information
    logWarning(s"Failed to resolve rack for $hostname", e)
}

Security Error Handling

Comprehensive error handling for security operations:

try {
  yarnUtils.addSecretKeyToUserCredentials(key, secret)
} catch {
  case e: SecurityException =>
    logError("Failed to add secret key - insufficient permissions", e)
  case e: IOException =>
    logError("Failed to add secret key - credential store error", e)
}

Integration with Spark Components

SparkContext Integration

Automatic integration with SparkContext initialization:

// Automatic registration as Hadoop util implementation
SparkHadoopUtil.get  // Returns YarnSparkHadoopUtil instance in YARN mode

// Used throughout Spark for:
// - Configuration management
// - Security operations  
// - File system access
// - Credential handling

FileSystem Integration

Enhanced file system integration for YARN environments:

val yarnConf = yarnUtils.newConfiguration(sparkConf)
val fs = FileSystem.get(yarnConf)

// FileSystem configured with:
// - YARN-specific authentication
// - Proper delegation tokens
// - Security credentials
// - Optimal configuration for YARN clusters

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--yarn-parent-2-11

docs

application-master.md

client.md

hadoop-utils.md

index.md

schedulers.md

tile.json