or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-yarn_2.11@1.6.x

docs

application-management.mdapplication-master.mdconfiguration-utilities.mdindex.mdresource-management.mdscheduler-backends.mdsecurity-authentication.md
tile.json

tessl/maven-org-apache-spark--spark-yarn_2-11

tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0

Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads

configuration-utilities.mddocs/

Configuration and Utilities

The YARN module provides extensive configuration options and utility classes to customize deployment behavior, manage distributed resources, and optimize performance for various cluster environments and workload requirements.

Configuration Properties

Core YARN Settings

These properties control basic YARN deployment behavior and resource allocation.

// Application Master Configuration
"spark.yarn.am.memory"              // AM memory allocation (default: 512m)
"spark.yarn.am.cores"               // AM CPU cores (default: 1)  
"spark.yarn.am.memoryOverhead"      // AM memory overhead (default: calculated)
"spark.yarn.am.waitTime"            // Max wait time for AM start (default: 100s)

// Queue and Resource Management
"spark.yarn.queue"                  // YARN queue name (default: "default")
"spark.yarn.maxAppAttempts"         // Max application attempts (default: unlimited)
"spark.yarn.submit.waitAppCompletion" // Wait for completion (default: true)

Example:

import org.apache.spark.SparkConf

val conf = new SparkConf()
  .setAppName("ProductionSparkJob")
  // Configure ApplicationMaster resources
  .set("spark.yarn.am.memory", "2g")
  .set("spark.yarn.am.cores", "2")
  .set("spark.yarn.am.memoryOverhead", "512m")
  // Queue and submission settings
  .set("spark.yarn.queue", "production")
  .set("spark.yarn.maxAppAttempts", "3")
  .set("spark.yarn.submit.waitAppCompletion", "true")

Executor Configuration

Properties for configuring executor resources and behavior.

// Executor Resources
"spark.executor.memory"             // Executor memory (default: 1g)
"spark.executor.cores"              // Executor CPU cores (default: 1)
"spark.executor.instances"          // Number of executors (default: 2)
"spark.yarn.executor.memoryOverhead" // Executor memory overhead

// Memory Management
"spark.yarn.executor.memoryFraction" // Heap memory fraction for execution
"spark.yarn.driver.memoryOverhead"   // Driver memory overhead

Example:

val conf = new SparkConf()
  // Executor resource allocation
  .set("spark.executor.memory", "8g")
  .set("spark.executor.cores", "4") 
  .set("spark.executor.instances", "20")
  .set("spark.yarn.executor.memoryOverhead", "1g")
  // Dynamic allocation
  .set("spark.dynamicAllocation.enabled", "true")
  .set("spark.dynamicAllocation.minExecutors", "2")
  .set("spark.dynamicAllocation.maxExecutors", "50")

Security and Authentication

Configuration for secure cluster deployment with Kerberos.

// Kerberos Authentication
"spark.yarn.principal"              // Kerberos principal name
"spark.yarn.keytab"                 // Path to keytab file
"spark.yarn.credentials.file"       // Delegation token file path

// Secure Access Configuration  
"spark.yarn.access.namenodes"       // HDFS namenodes to access
"spark.yarn.access.hadoopFileSystems" // Hadoop filesystems to access

Example:

val secureConf = new SparkConf()
  // Kerberos authentication
  .set("spark.yarn.principal", "spark/spark-service@EXAMPLE.COM")
  .set("spark.yarn.keytab", "/opt/spark/conf/spark.keytab")
  // Secure HDFS access
  .set("spark.yarn.access.namenodes", 
       "hdfs://namenode1:9000,hdfs://namenode2:9000")
  .set("spark.yarn.access.hadoopFileSystems",
       "hdfs://namenode1:9000,hdfs://namenode2:9000")
  // Delegation token management
  .set("spark.yarn.credentials.file.retention.days", "7")
  .set("spark.yarn.credentials.file.retention.count", "10")

File Distribution and Staging

Configuration for distributing application files and managing staging areas.

// File Distribution
"spark.yarn.jars"                   // Spark JAR files for containers
"spark.yarn.archive"                // Archive with Spark dependencies
"spark.yarn.dist.files"             // Files to distribute to working directories
"spark.yarn.dist.archives"          // Archives to extract in working directories

// Staging Configuration
"spark.yarn.submit.file.replication" // HDFS replication factor (default: 3)
"spark.yarn.preserve.staging.files"  // Keep staging files (default: false)
"spark.yarn.stagingDir"             // Staging directory location

Example:

val conf = new SparkConf()
  // Distribute application files
  .set("spark.yarn.jars", "hdfs://namenode:9000/spark/jars/*")
  .set("spark.yarn.dist.files", "config.properties,log4j.properties")
  .set("spark.yarn.dist.archives", "python-libs.tar.gz,data-files.zip")
  // Staging configuration  
  .set("spark.yarn.submit.file.replication", "3")
  .set("spark.yarn.preserve.staging.files", "false")
  .set("spark.yarn.stagingDir", "/tmp/spark-staging")

Container and Environment Configuration

Settings for container behavior and environment setup.

// Container Configuration
"spark.yarn.containerLauncherMaxThreads" // Max container launcher threads
"spark.yarn.executor.nodeLabelExpression" // Node label expression
"spark.yarn.tags"                   // Application tags for YARN

// Environment Variables
"spark.yarn.appMasterEnv.[VAR]"     // AM environment variables
"spark.yarn.executorEnv.[VAR]"      // Executor environment variables

Example:

val conf = new SparkConf()
  // Container configuration
  .set("spark.yarn.containerLauncherMaxThreads", "50")
  .set("spark.yarn.executor.nodeLabelExpression", "compute-nodes")
  .set("spark.yarn.tags", "production,analytics,spark-2.4")
  // Environment variables
  .set("spark.yarn.appMasterEnv.JAVA_HOME", "/opt/jdk8")
  .set("spark.yarn.executorEnv.JAVA_HOME", "/opt/jdk8")
  .set("spark.yarn.executorEnv.SPARK_LOCAL_DIRS", "/tmp/spark-local")

Resource Management and Scheduling

Configuration for dynamic allocation and resource scheduling.

// Scheduler Configuration
"spark.yarn.scheduler.heartbeat.interval-ms" // Scheduler heartbeat interval
"spark.yarn.scheduler.initial-allocation.interval" // Initial allocation interval
"spark.yarn.max.executor.failures"   // Max executor failures before failing app

// Resource Allocation
"spark.yarn.executor.failuresValidityInterval" // Executor failure validity window
"spark.yarn.nodeBlacklist.enabled"   // Enable node blacklisting

Example:

val conf = new SparkConf()
  // Scheduler tuning
  .set("spark.yarn.scheduler.heartbeat.interval-ms", "5000")
  .set("spark.yarn.scheduler.initial-allocation.interval", "500ms")
  // Failure handling
  .set("spark.yarn.max.executor.failures", "10")
  .set("spark.yarn.executor.failuresValidityInterval", "1800000") // 30 minutes
  .set("spark.yarn.nodeBlacklist.enabled", "true")

ClientDistributedCacheManager

The ClientDistributedCacheManager manages Hadoop distributed cache setup for YARN applications.

package org.apache.spark.deploy.yarn

class ClientDistributedCacheManager() extends Logging

Resource Management

def addResource(
  fs: FileSystem,
  conf: Configuration,
  destPath: Path,
  localResources: HashMap[String, LocalResource],
  resourceType: LocalResourceType,
  link: String,
  statCache: Map[URI, FileStatus],
  appMasterOnly: Boolean = false
): Unit

Adds a resource to the distributed cache for container access.

Parameters:

  • fs: Hadoop FileSystem instance
  • conf: Hadoop configuration
  • destPath: Path to the resource in the distributed filesystem
  • localResources: Map to store the LocalResource entries
  • resourceType: Type of resource (FILE or ARCHIVE)
  • link: Symbolic link name for the resource
  • statCache: Cache for file status information
  • appMasterOnly: Whether resource is only for ApplicationMaster

Example:

import org.apache.spark.deploy.yarn.ClientDistributedCacheManager
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.api.records.LocalResourceType
import scala.collection.mutable.HashMap

val cacheManager = new ClientDistributedCacheManager()
val fs = FileSystem.get(hadoopConf)
val localResources = new HashMap[String, LocalResource]()
val statCache = new HashMap[URI, FileStatus]()

// Add application JAR to distributed cache
cacheManager.addResource(
  fs = fs,
  conf = hadoopConf,
  destPath = new Path("hdfs://namenode:9000/apps/myapp.jar"),
  localResources = localResources,
  resourceType = LocalResourceType.FILE,
  link = "app.jar",
  statCache = statCache.toMap,
  appMasterOnly = false
)

// Add configuration files
cacheManager.addResource(
  fs = fs,
  conf = hadoopConf, 
  destPath = new Path("hdfs://namenode:9000/config/app.properties"),
  localResources = localResources,
  resourceType = LocalResourceType.FILE,
  link = "app.properties",
  statCache = statCache.toMap,
  appMasterOnly = true  // Only needed by AM
)

println(s"Added ${localResources.size} resources to distributed cache")

Environment Configuration

def setDistFilesEnv(env: HashMap[String, String]): Unit

Sets environment variables for distributed cache files.

def setDistArchivesEnv(env: HashMap[String, String]): Unit

Sets environment variables for distributed cache archives.

Example:

val env = new HashMap[String, String]()

// Set distributed cache environment
cacheManager.setDistFilesEnv(env)
cacheManager.setDistArchivesEnv(env)

// Display cache environment variables
env.foreach { case (key, value) =>
  if (key.startsWith("SPARK_YARN_CACHE")) {
    println(s"$key = $value")
  }
}

Resource Visibility

def getVisibility(
  conf: Configuration, 
  uri: URI, 
  statCache: Map[URI, FileStatus]
): LocalResourceVisibility

Determines the visibility level of a resource (PUBLIC, PRIVATE, or APPLICATION).

def isPublic(
  conf: Configuration,
  uri: URI, 
  statCache: Map[URI, FileStatus]
): Boolean

Checks if a resource is publicly accessible to all users.

Example:

import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
import java.net.URI

val resourceUri = new URI("hdfs://namenode:9000/public/shared-lib.jar")
val visibility = cacheManager.getVisibility(hadoopConf, resourceUri, statCache.toMap)

visibility match {
  case LocalResourceVisibility.PUBLIC =>
    println("Resource is publicly accessible")
  case LocalResourceVisibility.PRIVATE =>
    println("Resource is private to user")
  case LocalResourceVisibility.APPLICATION =>
    println("Resource is application-scoped")
}

val isPublic = cacheManager.isPublic(hadoopConf, resourceUri, statCache.toMap)
println(s"Resource is public: $isPublic")

YarnCommandBuilderUtils

Utility object for building YARN-specific command lines and JVM options.

package org.apache.spark.launcher

object YarnCommandBuilderUtils

Command Line Processing

def quoteForBatchScript(arg: String): String

Quotes command line arguments for safe execution in batch scripts, handling special characters and spaces.

Parameters:

  • arg: Command line argument to quote

Returns: Properly quoted argument string

Example:

import org.apache.spark.launcher.YarnCommandBuilderUtils

val args = List(
  "spark-submit",
  "--class", "com.example.MyApp", 
  "/path with spaces/myapp.jar",
  "--conf", "spark.sql.adaptive.enabled=true"
)

val quotedArgs = args.map(YarnCommandBuilderUtils.quoteForBatchScript)
quotedArgs.foreach(println)

// Output:
// spark-submit
// "--class"
// "com.example.MyApp"
// "/path with spaces/myapp.jar"
// "--conf"
// "spark.sql.adaptive.enabled=true"

JVM Option Management

def addPermGenSizeOpt(args: ListBuffer[String]): Unit

Adds PermGen size JVM options for older Java versions (Java 7 and earlier) when needed.

Parameters:

  • args: Mutable list of JVM arguments to modify

Example:

import scala.collection.mutable.ListBuffer

val jvmArgs = ListBuffer[String](
  "-Xmx4g",
  "-XX:+UseG1GC"
)

// Add PermGen option if running on older Java version
YarnCommandBuilderUtils.addPermGenSizeOpt(jvmArgs)

println("JVM Arguments:")
jvmArgs.foreach(arg => println(s"  $arg"))

// On Java 7, might add: -XX:MaxPermSize=256m
// On Java 8+, no PermGen option needed

Complete Configuration Examples

Production Cluster Configuration

import org.apache.spark.SparkConf

def createProductionConfig(): SparkConf = {
  new SparkConf()
    .setAppName("ProductionETLJob")
    .setMaster("yarn-cluster")
    
    // Resource allocation
    .set("spark.executor.memory", "16g")
    .set("spark.executor.cores", "5")
    .set("spark.executor.instances", "50")
    .set("spark.yarn.executor.memoryOverhead", "3g")
    
    // ApplicationMaster configuration
    .set("spark.yarn.am.memory", "4g")
    .set("spark.yarn.am.cores", "2")
    .set("spark.yarn.am.memoryOverhead", "1g")
    
    // Queue and resource management
    .set("spark.yarn.queue", "production")
    .set("spark.yarn.maxAppAttempts", "2")
    .set("spark.yarn.submit.waitAppCompletion", "true")
    
    // Dynamic allocation for cost optimization
    .set("spark.dynamicAllocation.enabled", "true")
    .set("spark.dynamicAllocation.minExecutors", "10")
    .set("spark.dynamicAllocation.maxExecutors", "100")
    .set("spark.dynamicAllocation.initialExecutors", "20")
    .set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
    .set("spark.dynamicAllocation.executorIdleTimeout", "60s")
    
    // Failure handling
    .set("spark.yarn.max.executor.failures", "20")
    .set("spark.yarn.executor.failuresValidityInterval", "1800000")
    .set("spark.yarn.nodeBlacklist.enabled", "true")
    
    // File staging and distribution
    .set("spark.yarn.submit.file.replication", "3")
    .set("spark.yarn.preserve.staging.files", "false")
    .set("spark.yarn.jars", "hdfs://namenode:9000/spark/production/jars/*")
    
    // Environment variables
    .set("spark.yarn.appMasterEnv.JAVA_HOME", "/opt/jdk8")
    .set("spark.yarn.executorEnv.JAVA_HOME", "/opt/jdk8")
    .set("spark.yarn.executorEnv.HADOOP_CONF_DIR", "/opt/hadoop/conf")
    
    // Performance tuning
    .set("spark.yarn.scheduler.heartbeat.interval-ms", "3000")
    .set("spark.yarn.containerLauncherMaxThreads", "100")
    
    // Tagging for monitoring
    .set("spark.yarn.tags", "production,etl,daily-batch")
}

Secure Cluster Configuration

def createSecureConfig(): SparkConf = {
  new SparkConf()
    .setAppName("SecureAnalyticsJob")
    .setMaster("yarn-cluster")
    
    // Basic resources
    .set("spark.executor.memory", "8g")
    .set("spark.executor.cores", "3")
    .set("spark.executor.instances", "25")
    
    // Kerberos authentication
    .set("spark.yarn.principal", "spark/analytics@COMPANY.COM")
    .set("spark.yarn.keytab", "/opt/spark/conf/spark.keytab")
    
    // Secure HDFS access
    .set("spark.yarn.access.namenodes", 
         "hdfs://secure-nn1:9000,hdfs://secure-nn2:9000")
    .set("spark.yarn.access.hadoopFileSystems",
         "hdfs://secure-nn1:9000,hdfs://secure-nn2:9000")
    
    // Delegation token management
    .set("spark.yarn.credentials.file.retention.days", "5")
    .set("spark.yarn.credentials.file.retention.count", "8")
    
    // Secure file distribution
    .set("spark.yarn.dist.files", "krb5.conf,secure-config.properties")
    .set("spark.yarn.jars", "hdfs://secure-nn1:9000/secure-spark/jars/*")
    
    // Security-specific settings
    .set("spark.authenticate", "true")
    .set("spark.network.crypto.enabled", "true")
    .set("spark.io.encryption.enabled", "true")
    
    // Environment for security
    .set("spark.yarn.appMasterEnv.KRB5_CONFIG", "/opt/spark/conf/krb5.conf")
    .set("spark.yarn.executorEnv.KRB5_CONFIG", "/opt/spark/conf/krb5.conf")
    
    // Queue for secure workloads
    .set("spark.yarn.queue", "secure-analytics")
}

Development/Testing Configuration

def createDevConfig(): SparkConf = {
  new SparkConf()
    .setAppName("DevelopmentSparkJob")
    .setMaster("yarn-client") // Client mode for development
    
    // Smaller resource allocation for development
    .set("spark.executor.memory", "2g")
    .set("spark.executor.cores", "2")
    .set("spark.executor.instances", "4")
    .set("spark.yarn.executor.memoryOverhead", "512m")
    
    // ApplicationMaster for client mode
    .set("spark.yarn.am.memory", "1g")
    .set("spark.yarn.am.cores", "1")
    
    // Development queue
    .set("spark.yarn.queue", "development")
    .set("spark.yarn.maxAppAttempts", "1")
    
    // Keep staging files for debugging
    .set("spark.yarn.preserve.staging.files", "true")
    .set("spark.yarn.submit.file.replication", "1")
    
    // Faster feedback for development
    .set("spark.yarn.scheduler.heartbeat.interval-ms", "1000")
    .set("spark.yarn.scheduler.initial-allocation.interval", "100ms")
    
    // Development-specific environment
    .set("spark.yarn.executorEnv.SPARK_LOG_LEVEL", "DEBUG")
    .set("spark.yarn.appMasterEnv.SPARK_LOG_LEVEL", "DEBUG")
    
    // Tagging for development tracking
    .set("spark.yarn.tags", "development,testing,feature-branch")
}

Distributed Cache Usage Example

import org.apache.spark.deploy.yarn.ClientDistributedCacheManager
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.api.records.LocalResourceType
import scala.collection.mutable.HashMap

def setupDistributedCache(): HashMap[String, LocalResource] = {
  val cacheManager = new ClientDistributedCacheManager()
  val fs = FileSystem.get(hadoopConf)
  val localResources = new HashMap[String, LocalResource]()
  val statCache = new HashMap[URI, FileStatus]()
  
  // Add application dependencies
  val dependencies = List(
    ("hdfs://namenode:9000/libs/spark-sql-kafka.jar", "spark-sql-kafka.jar", false),
    ("hdfs://namenode:9000/libs/mysql-connector.jar", "mysql-connector.jar", false),
    ("hdfs://namenode:9000/config/log4j.properties", "log4j.properties", true),
    ("hdfs://namenode:9000/data/reference-data.tar.gz", "reference-data.tar.gz", false)
  )
  
  dependencies.foreach { case (remotePath, linkName, amOnly) =>
    val resourceType = if (remotePath.endsWith(".tar.gz")) {
      LocalResourceType.ARCHIVE
    } else {
      LocalResourceType.FILE
    }
    
    cacheManager.addResource(
      fs = fs,
      conf = hadoopConf,
      destPath = new Path(remotePath),
      localResources = localResources,
      resourceType = resourceType,
      link = linkName,
      statCache = statCache.toMap,
      appMasterOnly = amOnly
    )
  }
  
  // Set up environment variables for distributed cache
  val env = new HashMap[String, String]()
  cacheManager.setDistFilesEnv(env)
  cacheManager.setDistArchivesEnv(env)
  
  // Log cache setup
  println(s"Configured ${localResources.size} distributed cache resources")
  env.foreach { case (key, value) =>
    if (key.startsWith("SPARK_YARN_CACHE")) {
      println(s"Cache env: $key = $value")
    }
  }
  
  localResources
}

These configuration utilities provide comprehensive control over YARN deployment behavior, enabling optimization for different environments, security requirements, and performance characteristics.