CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-yarn-2-12

Apache Spark YARN resource manager integration module that enables Spark applications to run on YARN clusters

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration System

Comprehensive configuration system for YARN-specific settings including resource allocation, security, deployment options, and application parameters. Provides both programmatic configuration through SparkConf and command-line argument parsing.

Capabilities

YARN Configuration Properties

The YARN module provides extensive configuration options through the config package object.

/**
 * YARN-specific configuration keys and defaults
 * All configuration properties are accessed through SparkConf
 */
package object config {
  
  /** YARN application tags for resource management and monitoring */
  val APPLICATION_TAGS: ConfigEntry[Set[String]]
  
  /** Application priority in YARN queue (0-10, higher = more priority) */
  val APPLICATION_PRIORITY: ConfigEntry[Int]
  
  /** YARN queue name for application submission */
  val QUEUE_NAME: ConfigEntry[String]
  
  /** Maximum application attempts allowed by YARN */
  val MAX_APP_ATTEMPTS: ConfigEntry[Int]
  
  /** ApplicationMaster memory allocation */
  val AM_MEMORY: ConfigEntry[Long]
  
  /** ApplicationMaster CPU core allocation */
  val AM_CORES: ConfigEntry[Int]
  
  /** ApplicationMaster memory overhead for YARN container */
  val AM_MEMORY_OVERHEAD: ConfigEntry[Long]
  
  /** Node label expression for executor placement */
  val EXECUTOR_NODE_LABEL_EXPRESSION: OptionalConfigEntry[String]
  
  /** Node label expression for ApplicationMaster placement */
  val AM_NODE_LABEL_EXPRESSION: OptionalConfigEntry[String]
  
  /** Location of Spark archive file for distribution */
  val SPARK_ARCHIVE: OptionalConfigEntry[String]
  
  /** Whether to wait for application completion */
  val WAIT_FOR_APP_COMPLETION: ConfigEntry[Boolean]
  
  /** Kerberos keytab file location for security */
  val KEYTAB: OptionalConfigEntry[String]
  
  /** Kerberos principal name for authentication */
  val PRINCIPAL: OptionalConfigEntry[String]
  
  /** Staging directory for application files */
  val STAGING_DIR: OptionalConfigEntry[String]
  
  /** Executor failure validity interval */
  val EXECUTOR_FAILURES_VALIDITY_INTERVAL: ConfigEntry[Long]
  
  /** Maximum executor failures before application failure */
  val MAX_EXECUTOR_FAILURES: ConfigEntry[Int]
  
  /** ApplicationMaster attempt failures validity interval */
  val AM_ATTEMPT_FAILURES_VALIDITY_INTERVAL: ConfigEntry[Long]
  
  /** Container launcher maximum threads */
  val CONTAINER_LAUNCHER_MAX_THREADS: ConfigEntry[Int]
  
  /** Scheduler heartbeat interval */
  val SCHEDULER_HEARTBEAT_INTERVAL: ConfigEntry[Long]
  
  /** Initial allocation interval */
  val SCHEDULER_INITIAL_ALLOCATION_INTERVAL: ConfigEntry[Long]
  
  /** Whether to preserve staging files after completion */
  val PRESERVE_STAGING_FILES: ConfigEntry[Boolean]
  
  /** File replication factor for staging files */
  val STAGING_FILE_REPLICATION: ConfigEntry[Short]
  
  /** Whether to roll application master logs */
  val AM_LOG_ROLL_ENABLE: ConfigEntry[Boolean]
  
  /** Application master log roll size */
  val AM_LOG_ROLL_SIZE: ConfigEntry[Long]
  
  /** Application master log roll interval */
  val AM_LOG_ROLL_INTERVAL: ConfigEntry[Long]
}

Configuration Usage Examples

Basic YARN Configuration

import org.apache.spark.SparkConf

val conf = new SparkConf()
  .setMaster("yarn")
  .setAppName("BasicYarnConfiguration")
  // Application settings
  .set("spark.yarn.queue", "production")
  .set("spark.yarn.tags", "spark,analytics,batch")
  .set("spark.yarn.priority", "5")
  // ApplicationMaster settings
  .set("spark.yarn.am.memory", "2g")
  .set("spark.yarn.am.cores", "2")
  .set("spark.yarn.am.memoryOverhead", "512m")
  // Executor settings
  .set("spark.executor.instances", "10")
  .set("spark.executor.memory", "4g")
  .set("spark.executor.cores", "2")
  .set("spark.executor.memoryOverhead", "1g")

Security Configuration

import org.apache.spark.SparkConf

val conf = new SparkConf()
  .setMaster("yarn")
  .setAppName("SecureYarnConfiguration")
  // Kerberos authentication
  .set("spark.yarn.keytab", "/path/to/user.keytab")
  .set("spark.yarn.principal", "user@REALM.COM")
  // Security settings
  .set("spark.authenticate", "true")
  .set("spark.authenticate.secret", "shared-secret")
  .set("spark.network.crypto.enabled", "true")
  .set("spark.io.encryption.enabled", "true")
  // YARN security integration
  .set("spark.yarn.security.credentials.hive.enabled", "true")
  .set("spark.yarn.security.credentials.hbase.enabled", "true")

Advanced Resource Configuration

import org.apache.spark.SparkConf

val conf = new SparkConf()
  .setMaster("yarn")
  .setAppName("AdvancedResourceConfiguration")
  // Node labeling and placement
  .set("spark.yarn.executor.nodeLabelExpression", "compute-nodes")
  .set("spark.yarn.am.nodeLabelExpression", "management-nodes")
  // Custom resources (GPUs, FPGAs, etc.)
  .set("spark.executor.resource.gpu.amount", "1")
  .set("spark.executor.resource.gpu.discoveryScript", "/opt/spark/gpu-discovery.sh")
  // Container settings
  .set("spark.yarn.containerLauncherMaxThreads", "50")
  .set("spark.yarn.scheduler.heartbeat.interval-ms", "5000")
  .set("spark.yarn.scheduler.initial-allocation.interval", "100ms")
  // Failure handling
  .set("spark.yarn.maxAppAttempts", "3")
  .set("spark.yarn.max.executor.failures", "10")
  .set("spark.yarn.executor.failuresValidityInterval", "2h")

ClientArguments

Argument parser for YARN client applications with support for various application types.

/**
 * Argument parser for YARN client
 * Handles command-line arguments for application submission
 * @param args Command line arguments array
 */
class ClientArguments(args: Array[String]) {
  
  /** User application JAR file path */
  var userJar: String = null
  
  /** Main class to execute */
  var userClass: String = null
  
  /** Primary Python file for PySpark applications */
  var primaryPyFile: String = null
  
  /** Primary R file for SparkR applications */
  var primaryRFile: String = null
  
  /** User application arguments */
  var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
  
  /** Enable verbose logging */
  var verbose: Boolean = false
  
  /** Application name */
  var name: String = "Spark"
  
  /** Spark properties file */
  var propertiesFile: String = null
  
  /** Additional Python files */
  var pyFiles: String = null
  
  /** Additional files to distribute */
  var files: String = null
  
  /** Additional archives to distribute */
  var archives: String = null
}

Usage Example:

import org.apache.spark.deploy.yarn.ClientArguments

// Parse command line arguments
val args = Array(
  "--jar", "/path/to/myapp.jar",
  "--class", "com.example.MyMainClass",
  "--name", "MySparkApplication",
  "--files", "config.properties,data.txt",
  "--py-files", "utils.py,helpers.py",
  "--archives", "data.zip#data",
  "--verbose",
  "--", "app-arg1", "app-arg2"  // Application arguments after --
)

val clientArgs = new ClientArguments(args)
println(s"JAR: ${clientArgs.userJar}")
println(s"Main class: ${clientArgs.userClass}")
println(s"App args: ${clientArgs.userArgs}")

ApplicationMasterArguments

Argument parser for ApplicationMaster with comprehensive application metadata.

/**
 * Argument parser for ApplicationMaster
 * Handles ApplicationMaster startup arguments and configuration
 * @param args Command line arguments array
 */
class ApplicationMasterArguments(args: Array[String]) {
  
  /** User application JAR file path */
  var userJar: String = null
  
  /** Main class name to execute */
  var userClass: String = null
  
  /** Primary Python file for PySpark */
  var primaryPyFile: String = null
  
  /** Primary R file for SparkR */
  var primaryRFile: String = null
  
  /** User application arguments sequence */
  var userArgs: Seq[String] = Nil
  
  /** Spark properties file path */
  var propertiesFile: String = null
  
  /** Distributed cache configuration */
  var distCacheConf: String = null
  
  /** Additional Python files */
  var pyFiles: String = null
  
  /** Additional files to distribute */
  var files: String = null
  
  /** Additional archives to distribute */
  var archives: String = null
  
  /** Executor memory setting */
  var executorMemory: String = "1g"
  
  /** Executor cores setting */
  var executorCores: Int = 1
  
  /** Number of executors */
  var numExecutors: Int = 2
}

Usage Example:

import org.apache.spark.deploy.yarn.ApplicationMasterArguments

// ApplicationMaster arguments (typically set by YARN)
val amArgs = Array(
  "--jar", "hdfs://cluster/apps/myapp.jar",
  "--class", "com.example.MyMainClass",  
  "--properties-file", "__spark_conf__/spark-defaults.conf",
  "--dist-cache-conf", "cache-config.txt",
  "--executor-memory", "4g",
  "--executor-cores", "2",
  "--num-executors", "10",
  "--", "user-arg1", "user-arg2"
)

val amArguments = new ApplicationMasterArguments(amArgs)
println(s"Executor memory: ${amArguments.executorMemory}")
println(s"Executor cores: ${amArguments.executorCores}")
println(s"Number of executors: ${amArguments.numExecutors}")

Configuration Patterns

Environment-Specific Configuration

import org.apache.spark.SparkConf

def createYarnConfig(environment: String): SparkConf = {
  val baseConf = new SparkConf()
    .setMaster("yarn")
    .setAppName(s"MyApp-$environment")
  
  environment match {
    case "development" =>
      baseConf
        .set("spark.yarn.queue", "dev")
        .set("spark.executor.instances", "2")
        .set("spark.executor.memory", "2g")
        .set("spark.yarn.am.memory", "1g")
        
    case "staging" =>
      baseConf
        .set("spark.yarn.queue", "staging")
        .set("spark.executor.instances", "5")
        .set("spark.executor.memory", "4g")
        .set("spark.yarn.am.memory", "2g")
        .set("spark.yarn.max.executor.failures", "3")
        
    case "production" =>
      baseConf
        .set("spark.yarn.queue", "production")
        .set("spark.executor.instances", "20")
        .set("spark.executor.memory", "8g")
        .set("spark.yarn.am.memory", "4g")
        .set("spark.yarn.max.executor.failures", "10")
        .set("spark.yarn.keytab", "/etc/security/keytabs/spark.keytab")
        .set("spark.yarn.principal", "spark-user@PRODUCTION.COM")
        
    case _ =>
      throw new IllegalArgumentException(s"Unknown environment: $environment")
  }
}

Configuration Validation

import org.apache.spark.SparkConf

def validateYarnConfiguration(conf: SparkConf): Unit = {
  // Validate required settings
  require(conf.get("spark.master", "").startsWith("yarn"), 
    "Master must be 'yarn' for YARN deployment")
  
  // Validate memory settings
  val amMemory = conf.get("spark.yarn.am.memory", "512m")
  val executorMemory = conf.get("spark.executor.memory", "1g")
  
  require(parseMemory(amMemory) >= 512, 
    "ApplicationMaster memory must be at least 512MB")
  require(parseMemory(executorMemory) >= 1024,
    "Executor memory must be at least 1GB")
  
  // Validate core settings
  val amCores = conf.getInt("spark.yarn.am.cores", 1)
  val executorCores = conf.getInt("spark.executor.cores", 1)
  
  require(amCores >= 1 && amCores <= 8,
    "ApplicationMaster cores must be between 1 and 8")
  require(executorCores >= 1 && executorCores <= 32,
    "Executor cores must be between 1 and 32")
  
  // Validate security settings if enabled
  if (conf.getBoolean("spark.authenticate", false)) {
    require(conf.contains("spark.yarn.keytab") && conf.contains("spark.yarn.principal"),
      "Kerberos authentication requires both keytab and principal")
  }
}

def parseMemory(memoryStr: String): Long = {
  // Simple memory parsing logic
  val pattern = """(\d+)([gmk]?)""".r
  memoryStr.toLowerCase match {
    case pattern(amount, unit) =>
      val multiplier = unit match {
        case "g" => 1024L * 1024 * 1024
        case "m" => 1024L * 1024  
        case "k" => 1024L
        case "" => 1L
      }
      amount.toLong * multiplier
    case _ => throw new IllegalArgumentException(s"Invalid memory format: $memoryStr")
  }
}

Dynamic Configuration Updates

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

// Configuration that can be updated at runtime
def updateDynamicAllocation(sc: SparkContext, 
                          minExecutors: Int, 
                          maxExecutors: Int): Unit = {
  
  // Update dynamic allocation settings
  sc.conf.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
  sc.conf.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
  
  // Request executor changes
  val targetExecutors = Math.max(minExecutors, 
    Math.min(maxExecutors, sc.executorIds.size))
  
  if (targetExecutors > sc.executorIds.size) {
    sc.requestTotalExecutors(targetExecutors, 0, Map.empty)
  } else if (targetExecutors < sc.executorIds.size) {
    val executorsToRemove = sc.executorIds.take(sc.executorIds.size - targetExecutors)
    sc.killExecutors(executorsToRemove.toSeq)
  }
}

Configuration Templates

import org.apache.spark.SparkConf

object YarnConfigTemplates {
  
  /** Template for batch processing workloads */
  def batchProcessingConfig(appName: String): SparkConf = {
    new SparkConf()
      .setMaster("yarn")
      .setAppName(appName)
      .set("spark.yarn.queue", "batch")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.sql.adaptive.enabled", "true")
      .set("spark.sql.adaptive.coalescePartitions.enabled", "true")
      .set("spark.executor.instances", "10")
      .set("spark.executor.memory", "6g")
      .set("spark.executor.cores", "3")
      .set("spark.executor.memoryOverhead", "1g")
      .set("spark.yarn.am.memory", "2g")
      .set("spark.yarn.am.cores", "2")
  }
  
  /** Template for streaming workloads */
  def streamingConfig(appName: String): SparkConf = {
    new SparkConf()
      .setMaster("yarn")
      .setAppName(appName)
      .set("spark.yarn.queue", "streaming")
      .set("spark.streaming.backpressure.enabled", "true")
      .set("spark.streaming.kafka.maxRatePerPartition", "1000")
      .set("spark.streaming.dynamicAllocation.enabled", "true")
      .set("spark.executor.instances", "5")
      .set("spark.executor.memory", "4g")
      .set("spark.executor.cores", "2")
      .set("spark.yarn.am.memory", "1g")
  }
  
  /** Template for machine learning workloads */
  def mlConfig(appName: String): SparkConf = {
    new SparkConf()
      .setMaster("yarn")
      .setAppName(appName)
      .set("spark.yarn.queue", "ml")
      .set("spark.executor.instances", "20")
      .set("spark.executor.memory", "8g")
      .set("spark.executor.cores", "4")
      .set("spark.executor.memoryOverhead", "2g")
      .set("spark.executor.resource.gpu.amount", "1")
      .set("spark.task.resource.gpu.amount", "0.25")
      .set("spark.yarn.am.memory", "4g")
      .set("spark.yarn.am.cores", "2")
  }
}

Configuration Best Practices

Resource Sizing Guidelines

import org.apache.spark.SparkConf

def calculateOptimalResources(
  totalDataSize: Long,
  availableNodes: Int,
  coresPerNode: Int,
  memoryPerNode: Long
): SparkConf = {
  
  // Calculate optimal executor count and sizing
  val optimalExecutorsPerNode = Math.max(1, coresPerNode / 4)  // 4 cores per executor
  val totalExecutors = availableNodes * optimalExecutorsPerNode
  val executorCores = Math.min(5, coresPerNode / optimalExecutorsPerNode)  // Max 5 cores
  val executorMemory = (memoryPerNode * 0.8 / optimalExecutorsPerNode).toLong  // 80% of node memory
  val memoryOverhead = Math.max(384, (executorMemory * 0.1).toLong)  // 10% overhead, min 384MB
  
  new SparkConf()
    .setMaster("yarn")
    .set("spark.executor.instances", totalExecutors.toString)
    .set("spark.executor.cores", executorCores.toString)
    .set("spark.executor.memory", s"${executorMemory}m")
    .set("spark.executor.memoryOverhead", s"${memoryOverhead}m")
    .set("spark.yarn.am.memory", "2g")
    .set("spark.yarn.am.cores", "2")
    // Additional optimizations based on data size
    .set("spark.sql.files.maxPartitionBytes", 
      Math.min(134217728, totalDataSize / (totalExecutors * executorCores)).toString)  // 128MB max
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-yarn-2-12

docs

application-management.md

configuration.md

index.md

resource-management.md

scheduler-integration.md

tile.json