tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads
The YARN module provides extensive configuration options and utility classes to customize deployment behavior, manage distributed resources, and optimize performance for various cluster environments and workload requirements.
These properties control basic YARN deployment behavior and resource allocation.
// Application Master Configuration
"spark.yarn.am.memory" // AM memory allocation (default: 512m)
"spark.yarn.am.cores" // AM CPU cores (default: 1)
"spark.yarn.am.memoryOverhead" // AM memory overhead (default: calculated)
"spark.yarn.am.waitTime" // Max wait time for AM start (default: 100s)
// Queue and Resource Management
"spark.yarn.queue" // YARN queue name (default: "default")
"spark.yarn.maxAppAttempts" // Max application attempts (default: unlimited)
"spark.yarn.submit.waitAppCompletion" // Wait for completion (default: true)Example:
import org.apache.spark.SparkConf
val conf = new SparkConf()
.setAppName("ProductionSparkJob")
// Configure ApplicationMaster resources
.set("spark.yarn.am.memory", "2g")
.set("spark.yarn.am.cores", "2")
.set("spark.yarn.am.memoryOverhead", "512m")
// Queue and submission settings
.set("spark.yarn.queue", "production")
.set("spark.yarn.maxAppAttempts", "3")
.set("spark.yarn.submit.waitAppCompletion", "true")Properties for configuring executor resources and behavior.
// Executor Resources
"spark.executor.memory" // Executor memory (default: 1g)
"spark.executor.cores" // Executor CPU cores (default: 1)
"spark.executor.instances" // Number of executors (default: 2)
"spark.yarn.executor.memoryOverhead" // Executor memory overhead
// Memory Management
"spark.yarn.executor.memoryFraction" // Heap memory fraction for execution
"spark.yarn.driver.memoryOverhead" // Driver memory overheadExample:
val conf = new SparkConf()
// Executor resource allocation
.set("spark.executor.memory", "8g")
.set("spark.executor.cores", "4")
.set("spark.executor.instances", "20")
.set("spark.yarn.executor.memoryOverhead", "1g")
// Dynamic allocation
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", "2")
.set("spark.dynamicAllocation.maxExecutors", "50")Configuration for secure cluster deployment with Kerberos.
// Kerberos Authentication
"spark.yarn.principal" // Kerberos principal name
"spark.yarn.keytab" // Path to keytab file
"spark.yarn.credentials.file" // Delegation token file path
// Secure Access Configuration
"spark.yarn.access.namenodes" // HDFS namenodes to access
"spark.yarn.access.hadoopFileSystems" // Hadoop filesystems to accessExample:
val secureConf = new SparkConf()
// Kerberos authentication
.set("spark.yarn.principal", "spark/spark-service@EXAMPLE.COM")
.set("spark.yarn.keytab", "/opt/spark/conf/spark.keytab")
// Secure HDFS access
.set("spark.yarn.access.namenodes",
"hdfs://namenode1:9000,hdfs://namenode2:9000")
.set("spark.yarn.access.hadoopFileSystems",
"hdfs://namenode1:9000,hdfs://namenode2:9000")
// Delegation token management
.set("spark.yarn.credentials.file.retention.days", "7")
.set("spark.yarn.credentials.file.retention.count", "10")Configuration for distributing application files and managing staging areas.
// File Distribution
"spark.yarn.jars" // Spark JAR files for containers
"spark.yarn.archive" // Archive with Spark dependencies
"spark.yarn.dist.files" // Files to distribute to working directories
"spark.yarn.dist.archives" // Archives to extract in working directories
// Staging Configuration
"spark.yarn.submit.file.replication" // HDFS replication factor (default: 3)
"spark.yarn.preserve.staging.files" // Keep staging files (default: false)
"spark.yarn.stagingDir" // Staging directory locationExample:
val conf = new SparkConf()
// Distribute application files
.set("spark.yarn.jars", "hdfs://namenode:9000/spark/jars/*")
.set("spark.yarn.dist.files", "config.properties,log4j.properties")
.set("spark.yarn.dist.archives", "python-libs.tar.gz,data-files.zip")
// Staging configuration
.set("spark.yarn.submit.file.replication", "3")
.set("spark.yarn.preserve.staging.files", "false")
.set("spark.yarn.stagingDir", "/tmp/spark-staging")Settings for container behavior and environment setup.
// Container Configuration
"spark.yarn.containerLauncherMaxThreads" // Max container launcher threads
"spark.yarn.executor.nodeLabelExpression" // Node label expression
"spark.yarn.tags" // Application tags for YARN
// Environment Variables
"spark.yarn.appMasterEnv.[VAR]" // AM environment variables
"spark.yarn.executorEnv.[VAR]" // Executor environment variablesExample:
val conf = new SparkConf()
// Container configuration
.set("spark.yarn.containerLauncherMaxThreads", "50")
.set("spark.yarn.executor.nodeLabelExpression", "compute-nodes")
.set("spark.yarn.tags", "production,analytics,spark-2.4")
// Environment variables
.set("spark.yarn.appMasterEnv.JAVA_HOME", "/opt/jdk8")
.set("spark.yarn.executorEnv.JAVA_HOME", "/opt/jdk8")
.set("spark.yarn.executorEnv.SPARK_LOCAL_DIRS", "/tmp/spark-local")Configuration for dynamic allocation and resource scheduling.
// Scheduler Configuration
"spark.yarn.scheduler.heartbeat.interval-ms" // Scheduler heartbeat interval
"spark.yarn.scheduler.initial-allocation.interval" // Initial allocation interval
"spark.yarn.max.executor.failures" // Max executor failures before failing app
// Resource Allocation
"spark.yarn.executor.failuresValidityInterval" // Executor failure validity window
"spark.yarn.nodeBlacklist.enabled" // Enable node blacklistingExample:
val conf = new SparkConf()
// Scheduler tuning
.set("spark.yarn.scheduler.heartbeat.interval-ms", "5000")
.set("spark.yarn.scheduler.initial-allocation.interval", "500ms")
// Failure handling
.set("spark.yarn.max.executor.failures", "10")
.set("spark.yarn.executor.failuresValidityInterval", "1800000") // 30 minutes
.set("spark.yarn.nodeBlacklist.enabled", "true")The ClientDistributedCacheManager manages Hadoop distributed cache setup for YARN applications.
package org.apache.spark.deploy.yarn
class ClientDistributedCacheManager() extends Loggingdef addResource(
fs: FileSystem,
conf: Configuration,
destPath: Path,
localResources: HashMap[String, LocalResource],
resourceType: LocalResourceType,
link: String,
statCache: Map[URI, FileStatus],
appMasterOnly: Boolean = false
): UnitAdds a resource to the distributed cache for container access.
Parameters:
fs: Hadoop FileSystem instanceconf: Hadoop configurationdestPath: Path to the resource in the distributed filesystemlocalResources: Map to store the LocalResource entriesresourceType: Type of resource (FILE or ARCHIVE)link: Symbolic link name for the resourcestatCache: Cache for file status informationappMasterOnly: Whether resource is only for ApplicationMasterExample:
import org.apache.spark.deploy.yarn.ClientDistributedCacheManager
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.api.records.LocalResourceType
import scala.collection.mutable.HashMap
val cacheManager = new ClientDistributedCacheManager()
val fs = FileSystem.get(hadoopConf)
val localResources = new HashMap[String, LocalResource]()
val statCache = new HashMap[URI, FileStatus]()
// Add application JAR to distributed cache
cacheManager.addResource(
fs = fs,
conf = hadoopConf,
destPath = new Path("hdfs://namenode:9000/apps/myapp.jar"),
localResources = localResources,
resourceType = LocalResourceType.FILE,
link = "app.jar",
statCache = statCache.toMap,
appMasterOnly = false
)
// Add configuration files
cacheManager.addResource(
fs = fs,
conf = hadoopConf,
destPath = new Path("hdfs://namenode:9000/config/app.properties"),
localResources = localResources,
resourceType = LocalResourceType.FILE,
link = "app.properties",
statCache = statCache.toMap,
appMasterOnly = true // Only needed by AM
)
println(s"Added ${localResources.size} resources to distributed cache")def setDistFilesEnv(env: HashMap[String, String]): UnitSets environment variables for distributed cache files.
def setDistArchivesEnv(env: HashMap[String, String]): UnitSets environment variables for distributed cache archives.
Example:
val env = new HashMap[String, String]()
// Set distributed cache environment
cacheManager.setDistFilesEnv(env)
cacheManager.setDistArchivesEnv(env)
// Display cache environment variables
env.foreach { case (key, value) =>
if (key.startsWith("SPARK_YARN_CACHE")) {
println(s"$key = $value")
}
}def getVisibility(
conf: Configuration,
uri: URI,
statCache: Map[URI, FileStatus]
): LocalResourceVisibilityDetermines the visibility level of a resource (PUBLIC, PRIVATE, or APPLICATION).
def isPublic(
conf: Configuration,
uri: URI,
statCache: Map[URI, FileStatus]
): BooleanChecks if a resource is publicly accessible to all users.
Example:
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
import java.net.URI
val resourceUri = new URI("hdfs://namenode:9000/public/shared-lib.jar")
val visibility = cacheManager.getVisibility(hadoopConf, resourceUri, statCache.toMap)
visibility match {
case LocalResourceVisibility.PUBLIC =>
println("Resource is publicly accessible")
case LocalResourceVisibility.PRIVATE =>
println("Resource is private to user")
case LocalResourceVisibility.APPLICATION =>
println("Resource is application-scoped")
}
val isPublic = cacheManager.isPublic(hadoopConf, resourceUri, statCache.toMap)
println(s"Resource is public: $isPublic")Utility object for building YARN-specific command lines and JVM options.
package org.apache.spark.launcher
object YarnCommandBuilderUtilsdef quoteForBatchScript(arg: String): StringQuotes command line arguments for safe execution in batch scripts, handling special characters and spaces.
Parameters:
arg: Command line argument to quoteReturns: Properly quoted argument string
Example:
import org.apache.spark.launcher.YarnCommandBuilderUtils
val args = List(
"spark-submit",
"--class", "com.example.MyApp",
"/path with spaces/myapp.jar",
"--conf", "spark.sql.adaptive.enabled=true"
)
val quotedArgs = args.map(YarnCommandBuilderUtils.quoteForBatchScript)
quotedArgs.foreach(println)
// Output:
// spark-submit
// "--class"
// "com.example.MyApp"
// "/path with spaces/myapp.jar"
// "--conf"
// "spark.sql.adaptive.enabled=true"def addPermGenSizeOpt(args: ListBuffer[String]): UnitAdds PermGen size JVM options for older Java versions (Java 7 and earlier) when needed.
Parameters:
args: Mutable list of JVM arguments to modifyExample:
import scala.collection.mutable.ListBuffer
val jvmArgs = ListBuffer[String](
"-Xmx4g",
"-XX:+UseG1GC"
)
// Add PermGen option if running on older Java version
YarnCommandBuilderUtils.addPermGenSizeOpt(jvmArgs)
println("JVM Arguments:")
jvmArgs.foreach(arg => println(s" $arg"))
// On Java 7, might add: -XX:MaxPermSize=256m
// On Java 8+, no PermGen option neededimport org.apache.spark.SparkConf
def createProductionConfig(): SparkConf = {
new SparkConf()
.setAppName("ProductionETLJob")
.setMaster("yarn-cluster")
// Resource allocation
.set("spark.executor.memory", "16g")
.set("spark.executor.cores", "5")
.set("spark.executor.instances", "50")
.set("spark.yarn.executor.memoryOverhead", "3g")
// ApplicationMaster configuration
.set("spark.yarn.am.memory", "4g")
.set("spark.yarn.am.cores", "2")
.set("spark.yarn.am.memoryOverhead", "1g")
// Queue and resource management
.set("spark.yarn.queue", "production")
.set("spark.yarn.maxAppAttempts", "2")
.set("spark.yarn.submit.waitAppCompletion", "true")
// Dynamic allocation for cost optimization
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", "10")
.set("spark.dynamicAllocation.maxExecutors", "100")
.set("spark.dynamicAllocation.initialExecutors", "20")
.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
// Failure handling
.set("spark.yarn.max.executor.failures", "20")
.set("spark.yarn.executor.failuresValidityInterval", "1800000")
.set("spark.yarn.nodeBlacklist.enabled", "true")
// File staging and distribution
.set("spark.yarn.submit.file.replication", "3")
.set("spark.yarn.preserve.staging.files", "false")
.set("spark.yarn.jars", "hdfs://namenode:9000/spark/production/jars/*")
// Environment variables
.set("spark.yarn.appMasterEnv.JAVA_HOME", "/opt/jdk8")
.set("spark.yarn.executorEnv.JAVA_HOME", "/opt/jdk8")
.set("spark.yarn.executorEnv.HADOOP_CONF_DIR", "/opt/hadoop/conf")
// Performance tuning
.set("spark.yarn.scheduler.heartbeat.interval-ms", "3000")
.set("spark.yarn.containerLauncherMaxThreads", "100")
// Tagging for monitoring
.set("spark.yarn.tags", "production,etl,daily-batch")
}def createSecureConfig(): SparkConf = {
new SparkConf()
.setAppName("SecureAnalyticsJob")
.setMaster("yarn-cluster")
// Basic resources
.set("spark.executor.memory", "8g")
.set("spark.executor.cores", "3")
.set("spark.executor.instances", "25")
// Kerberos authentication
.set("spark.yarn.principal", "spark/analytics@COMPANY.COM")
.set("spark.yarn.keytab", "/opt/spark/conf/spark.keytab")
// Secure HDFS access
.set("spark.yarn.access.namenodes",
"hdfs://secure-nn1:9000,hdfs://secure-nn2:9000")
.set("spark.yarn.access.hadoopFileSystems",
"hdfs://secure-nn1:9000,hdfs://secure-nn2:9000")
// Delegation token management
.set("spark.yarn.credentials.file.retention.days", "5")
.set("spark.yarn.credentials.file.retention.count", "8")
// Secure file distribution
.set("spark.yarn.dist.files", "krb5.conf,secure-config.properties")
.set("spark.yarn.jars", "hdfs://secure-nn1:9000/secure-spark/jars/*")
// Security-specific settings
.set("spark.authenticate", "true")
.set("spark.network.crypto.enabled", "true")
.set("spark.io.encryption.enabled", "true")
// Environment for security
.set("spark.yarn.appMasterEnv.KRB5_CONFIG", "/opt/spark/conf/krb5.conf")
.set("spark.yarn.executorEnv.KRB5_CONFIG", "/opt/spark/conf/krb5.conf")
// Queue for secure workloads
.set("spark.yarn.queue", "secure-analytics")
}def createDevConfig(): SparkConf = {
new SparkConf()
.setAppName("DevelopmentSparkJob")
.setMaster("yarn-client") // Client mode for development
// Smaller resource allocation for development
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
.set("spark.executor.instances", "4")
.set("spark.yarn.executor.memoryOverhead", "512m")
// ApplicationMaster for client mode
.set("spark.yarn.am.memory", "1g")
.set("spark.yarn.am.cores", "1")
// Development queue
.set("spark.yarn.queue", "development")
.set("spark.yarn.maxAppAttempts", "1")
// Keep staging files for debugging
.set("spark.yarn.preserve.staging.files", "true")
.set("spark.yarn.submit.file.replication", "1")
// Faster feedback for development
.set("spark.yarn.scheduler.heartbeat.interval-ms", "1000")
.set("spark.yarn.scheduler.initial-allocation.interval", "100ms")
// Development-specific environment
.set("spark.yarn.executorEnv.SPARK_LOG_LEVEL", "DEBUG")
.set("spark.yarn.appMasterEnv.SPARK_LOG_LEVEL", "DEBUG")
// Tagging for development tracking
.set("spark.yarn.tags", "development,testing,feature-branch")
}import org.apache.spark.deploy.yarn.ClientDistributedCacheManager
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.api.records.LocalResourceType
import scala.collection.mutable.HashMap
def setupDistributedCache(): HashMap[String, LocalResource] = {
val cacheManager = new ClientDistributedCacheManager()
val fs = FileSystem.get(hadoopConf)
val localResources = new HashMap[String, LocalResource]()
val statCache = new HashMap[URI, FileStatus]()
// Add application dependencies
val dependencies = List(
("hdfs://namenode:9000/libs/spark-sql-kafka.jar", "spark-sql-kafka.jar", false),
("hdfs://namenode:9000/libs/mysql-connector.jar", "mysql-connector.jar", false),
("hdfs://namenode:9000/config/log4j.properties", "log4j.properties", true),
("hdfs://namenode:9000/data/reference-data.tar.gz", "reference-data.tar.gz", false)
)
dependencies.foreach { case (remotePath, linkName, amOnly) =>
val resourceType = if (remotePath.endsWith(".tar.gz")) {
LocalResourceType.ARCHIVE
} else {
LocalResourceType.FILE
}
cacheManager.addResource(
fs = fs,
conf = hadoopConf,
destPath = new Path(remotePath),
localResources = localResources,
resourceType = resourceType,
link = linkName,
statCache = statCache.toMap,
appMasterOnly = amOnly
)
}
// Set up environment variables for distributed cache
val env = new HashMap[String, String]()
cacheManager.setDistFilesEnv(env)
cacheManager.setDistArchivesEnv(env)
// Log cache setup
println(s"Configured ${localResources.size} distributed cache resources")
env.foreach { case (key, value) =>
if (key.startsWith("SPARK_YARN_CACHE")) {
println(s"Cache env: $key = $value")
}
}
localResources
}These configuration utilities provide comprehensive control over YARN deployment behavior, enabling optimization for different environments, security requirements, and performance characteristics.