YARN-specific utilities for building container launch commands and managing Spark distribution. These utilities handle the low-level details of container command construction and environment setup.
Utility object providing YARN-specific Hadoop integration and environment management.
object YarnSparkHadoopUtil {
def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit
val MEMORY_OVERHEAD_FACTOR: Double
val MEMORY_OVERHEAD_MIN: Long
val RM_REQUEST_PRIORITY: Priority
}Environment Management:
addPathToEnvironment(env, key, value): Unit
Usage Example:
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import scala.collection.mutable.HashMap
val env = new HashMap[String, String]()
// Add to PATH environment variable
YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", "/usr/local/bin")
YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", "/opt/spark/bin")
// Add to LD_LIBRARY_PATH
YarnSparkHadoopUtil.addPathToEnvironment(env, "LD_LIBRARY_PATH", "/usr/local/lib")
println(env("PATH")) // /usr/local/bin:/opt/spark/bin:$PATHConstants:
MEMORY_OVERHEAD_FACTOR: Double
MEMORY_OVERHEAD_MIN: Long
RM_REQUEST_PRIORITY: Priority
Memory Calculation Example:
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
def calculateContainerMemory(executorMemoryMB: Long): Long = {
val overheadMB = math.max(
(executorMemoryMB * YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR).toLong,
YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN / (1024 * 1024)
)
executorMemoryMB + overheadMB
}
// Example calculations
println(calculateContainerMemory(1024)) // 1408MB (1024 + 384 min overhead)
println(calculateContainerMemory(8192)) // 9011MB (8192 + 819 calculated overhead)Utilities for building and formatting container launch commands on YARN.
object YarnCommandBuilderUtils {
def quoteForBatchScript(arg: String): String
def findJarsDir(sparkHome: String): String
}Command Building:
quoteForBatchScript(arg: String): String
findJarsDir(sparkHome: String): String
Usage Examples:
Argument Quoting:
import org.apache.spark.launcher.YarnCommandBuilderUtils
// Quote arguments with spaces or special characters
val arg1 = YarnCommandBuilderUtils.quoteForBatchScript("my app name")
val arg2 = YarnCommandBuilderUtils.quoteForBatchScript("--conf spark.sql.warehouse.dir=/path/with spaces")
val arg3 = YarnCommandBuilderUtils.quoteForBatchScript("value_with_$_symbol")
println(arg1) // "my app name" (on Unix/Linux)
println(arg2) // "--conf spark.sql.warehouse.dir=/path/with spaces"
println(arg3) // "value_with_\$_symbol" (escaped special chars)JAR Directory Discovery:
import org.apache.spark.launcher.YarnCommandBuilderUtils
// Find JAR directory in Spark installation
val sparkHome = "/opt/spark"
val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)
println(jarsDir) // /opt/spark/jars (for standard distribution)
// Use in classpath construction
val sparkJars = new File(jarsDir).listFiles()
.filter(_.getName.endsWith(".jar"))
.map(_.getAbsolutePath)
.mkString(":")// Example of environment preparation for executor containers
def prepareExecutorEnvironment(
sparkHome: String,
executorMemory: String,
additionalPaths: Seq[String]): HashMap[String, String] = {
val env = new HashMap[String, String]()
// Set SPARK_HOME
env("SPARK_HOME") = sparkHome
// Add Spark bins to PATH
YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", s"$sparkHome/bin")
YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", s"$sparkHome/sbin")
// Add additional paths
additionalPaths.foreach { path =>
YarnSparkHadoopUtil.addPathToEnvironment(env, "PATH", path)
}
// Set Java library path
val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)
YarnSparkHadoopUtil.addPathToEnvironment(env, "LD_LIBRARY_PATH", s"$jarsDir/../lib")
env
}// Example of building executor launch command
def buildExecutorCommand(
sparkHome: String,
executorMemory: String,
executorCores: Int,
userArgs: Seq[String]): Seq[String] = {
val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)
val sparkJars = s"$jarsDir/*"
val baseCommand = Seq(
"java",
s"-Xmx$executorMemory",
"-cp", sparkJars,
"org.apache.spark.executor.YarnCoarseGrainedExecutorBackend"
)
// Quote user arguments that might contain spaces
val quotedUserArgs = userArgs.map(YarnCommandBuilderUtils.quoteForBatchScript)
baseCommand ++ quotedUserArgs
}try {
val jarsDir = YarnCommandBuilderUtils.findJarsDir(sparkHome)
// Use jarsDir...
} catch {
case e: IllegalArgumentException =>
throw new SparkException(s"Unable to find Spark jars directory in: $sparkHome", e)
case e: SecurityException =>
throw new SparkException(s"Access denied reading Spark installation: $sparkHome", e)
}def validateEnvironment(env: HashMap[String, String]): Unit = {
// Check for conflicting PATH entries
val pathValue = env.getOrElse("PATH", "")
if (pathValue.contains("::")) {
logWarning("Empty path component detected in PATH environment variable")
}
// Validate SPARK_HOME consistency
val sparkHome = env.get("SPARK_HOME")
val pathContainsSparkBin = pathValue.contains("/bin") || pathValue.contains("/sbin")
if (sparkHome.isDefined && !pathContainsSparkBin) {
logWarning("SPARK_HOME set but Spark binaries may not be in PATH")
}
}// Platform-specific behavior in YarnCommandBuilderUtils.quoteForBatchScript
def quoteForPlatform(arg: String): String = {
val isWindows = System.getProperty("os.name").toLowerCase.contains("windows")
if (isWindows) {
// Windows batch script quoting
if (arg.contains(" ") || arg.contains("&") || arg.contains("|")) {
s""""$arg""""
} else {
arg
}
} else {
// Unix shell quoting
if (arg.contains(" ") || arg.contains("$") || arg.contains("'")) {
s"'${arg.replace("'", "'\\''")}'"
} else {
arg
}
}
}// Memory overhead calculation considering container limits
def calculateOptimalOverhead(
executorMemoryMB: Long,
nodeMemoryMB: Long,
executorsPerNode: Int): Long = {
val standardOverhead = math.max(
(executorMemoryMB * YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR).toLong,
YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN / (1024 * 1024)
)
val totalRequestedMemory = (executorMemoryMB + standardOverhead) * executorsPerNode
if (totalRequestedMemory > nodeMemoryMB * 0.9) {
// Reduce overhead if total memory exceeds 90% of node capacity
val maxOverhead = (nodeMemoryMB * 0.9 / executorsPerNode - executorMemoryMB).toLong
math.max(maxOverhead, YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN / (1024 * 1024))
} else {
standardOverhead
}
}