or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-deployment.mdcluster-management.mdcommand-building.mdconfiguration-system.mdextension-points.mdindex.mdresource-management.mdsecurity-integration.mdyarn-shuffle-service.md
tile.json

command-building.mddocs/

Command Building Utilities

YARN-specific utilities for building container launch commands and managing Spark distribution. These utilities handle the low-level details of container command construction and environment setup.

Capabilities

YarnSparkHadoopUtil

Utility object providing YARN-specific Hadoop integration and environment management.

object YarnSparkHadoopUtil {
  def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit
  val MEMORY_OVERHEAD_FACTOR: Double
  val MEMORY_OVERHEAD_MIN: Long
  val RM_REQUEST_PRIORITY: Priority
}

Environment Management:

addPathToEnvironment(env, key, value): Unit

  • Adds a path value to an environment variable
  • Handles path separator logic for cross-platform compatibility
  • Used for setting up executor environment variables

Usage Example:

import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import scala.collection.mutable.HashMap

val env = new HashMap[String, String]()

// Add to PATH environment variable
YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", "/usr/local/bin")
YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", "/opt/spark/bin")

// Add to LD_LIBRARY_PATH
YarnSparkHadoopUtil.addPathToEnvironment(env, "LD_LIBRARY_PATH", "/usr/local/lib")

println(env("PATH"))  // /usr/local/bin:/opt/spark/bin:$PATH

Constants:

MEMORY_OVERHEAD_FACTOR: Double

  • Default memory overhead factor for container memory calculation
  • Typically 0.1 (10% of executor memory)
  • Used when explicit overhead is not specified

MEMORY_OVERHEAD_MIN: Long

  • Minimum memory overhead in bytes
  • Typically 384MB (384 * 1024 * 1024 bytes)
  • Ensures adequate overhead for small executor containers

RM_REQUEST_PRIORITY: Priority

  • Standard priority for ResourceManager container requests
  • Consistent priority level for all Spark container requests
  • Used by YarnAllocator for container allocation

Memory Calculation Example:

import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil

def calculateContainerMemory(executorMemoryMB: Long): Long = {
  val overheadMB = math.max(
    (executorMemoryMB * YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR).toLong,
    YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN / (1024 * 1024)
  )
  executorMemoryMB + overheadMB
}

// Example calculations
println(calculateContainerMemory(1024))  // 1408MB (1024 + 384 min overhead)
println(calculateContainerMemory(8192))  // 9011MB (8192 + 819 calculated overhead)

YarnCommandBuilderUtils

Utilities for building and formatting container launch commands on YARN.

object YarnCommandBuilderUtils {
  def quoteForBatchScript(arg: String): String
  def findJarsDir(sparkHome: String): String
}

Command Building:

quoteForBatchScript(arg: String): String

  • Properly quotes arguments for batch script execution
  • Handles special characters and spaces in arguments
  • Platform-aware quoting for Windows and Unix systems

findJarsDir(sparkHome: String): String

  • Locates the jars directory within a Spark installation
  • Handles different Spark distribution layouts
  • Returns path to directory containing Spark JAR files

Usage Examples:

Argument Quoting:

import org.apache.spark.launcher.YarnCommandBuilderUtils

// Quote arguments with spaces or special characters
val arg1 = YarnCommandBuilderUtils.quoteForBatchScript("my app name")
val arg2 = YarnCommandBuilderUtils.quoteForBatchScript("--conf spark.sql.warehouse.dir=/path/with spaces")
val arg3 = YarnCommandBuilderUtils.quoteForBatchScript("value_with_$_symbol")

println(arg1)  // "my app name" (on Unix/Linux)
println(arg2)  // "--conf spark.sql.warehouse.dir=/path/with spaces"
println(arg3)  // "value_with_\$_symbol" (escaped special chars)

JAR Directory Discovery:

import org.apache.spark.launcher.YarnCommandBuilderUtils

// Find JAR directory in Spark installation
val sparkHome = "/opt/spark"
val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)

println(jarsDir)  // /opt/spark/jars (for standard distribution)

// Use in classpath construction
val sparkJars = new File(jarsDir).listFiles()
  .filter(_.getName.endsWith(".jar"))
  .map(_.getAbsolutePath)
  .mkString(":")

Integration with Container Launch

Environment Setup

// Example of environment preparation for executor containers
def prepareExecutorEnvironment(
    sparkHome: String, 
    executorMemory: String,
    additionalPaths: Seq[String]): HashMap[String, String] = {
  
  val env = new HashMap[String, String]()
  
  // Set SPARK_HOME
  env("SPARK_HOME") = sparkHome
  
  // Add Spark bins to PATH
  YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", s"$sparkHome/bin")
  YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", s"$sparkHome/sbin")
  
  // Add additional paths
  additionalPaths.foreach { path =>
    YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", path)
  }
  
  // Set Java library path
  val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)
  YarnSparkHadoopUtil.addPathToEnvironment(env, "LD_LIBRARY_PATH", s"$jarsDir/../lib")
  
  env
}

Command Construction

// Example of building executor launch command
def buildExecutorCommand(
    sparkHome: String,
    executorMemory: String, 
    executorCores: Int,
    userArgs: Seq[String]): Seq[String] = {
  
  val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)
  val sparkJars = s"$jarsDir/*"
  
  val baseCommand = Seq(
    "java",
    s"-Xmx$executorMemory",
    "-cp", sparkJars,
    "org.apache.spark.executor.YarnCoarseGrainedExecutorBackend"
  )
  
  // Quote user arguments that might contain spaces
  val quotedUserArgs = userArgs.map(YarnCommandBuilderUtils.quoteForBatchScript)
  
  baseCommand ++ quotedUserArgs
}

Error Handling

Path Resolution Errors

try {
  val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)
  // Use jarsDir...
} catch {
  case e: IllegalArgumentException =>
    throw new SparkException(s"Unable to find Spark jars directory in: $sparkHome", e)
  case e: SecurityException =>
    throw new SparkException(s"Access denied reading Spark installation: $sparkHome", e)
}

Environment Variable Conflicts

def validateEnvironment(env: HashMap[String, String]): Unit = {
  // Check for conflicting PATH entries
  val pathValue = env.getOrElse("PATH", "")
  if (pathValue.contains("::")) {
    logWarning("Empty path component detected in PATH environment variable")
  }
  
  // Validate SPARK_HOME consistency
  val sparkHome = env.get("SPARK_HOME")
  val pathContainsSparkBin = pathValue.contains("/bin") || pathValue.contains("/sbin")
  
  if (sparkHome.isDefined && !pathContainsSparkBin) {
    logWarning("SPARK_HOME set but Spark binaries may not be in PATH")
  }
}

Platform Considerations

Windows Compatibility

// Platform-specific behavior in YarnCommandBuilderUtils.quoteForBatchScript
def quoteForPlatform(arg: String): String = {
  val isWindows = System.getProperty("os.name").toLowerCase.contains("windows")
  
  if (isWindows) {
    // Windows batch script quoting
    if (arg.contains(" ") || arg.contains("&") || arg.contains("|")) {
      s""""$arg""""
    } else {
      arg
    }
  } else {
    // Unix shell quoting
    if (arg.contains(" ") || arg.contains("$") || arg.contains("'")) {
      s"'${arg.replace("'", "'\\''")}'"
    } else {
      arg
    }
  }
}

Container Resource Constraints

// Memory overhead calculation considering container limits
def calculateOptimalOverhead(
    executorMemoryMB: Long,
    nodeMemoryMB: Long,
    executorsPerNode: Int): Long = {
  
  val standardOverhead = math.max(
    (executorMemoryMB * YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR).toLong,
    YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN / (1024 * 1024)
  )
  
  val totalRequestedMemory = (executorMemoryMB + standardOverhead) * executorsPerNode
  
  if (totalRequestedMemory > nodeMemoryMB * 0.9) {
    // Reduce overhead if total memory exceeds 90% of node capacity
    val maxOverhead = (nodeMemoryMB * 0.9 / executorsPerNode - executorMemoryMB).toLong
    math.max(maxOverhead, YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN / (1024 * 1024))
  } else {
    standardOverhead
  }
}