or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-master.mdclient.mdhadoop-utils.mdindex.mdschedulers.md
tile.json

tessl/maven-org-apache-spark--yarn-parent-2-11

Apache Spark's integration with Hadoop YARN cluster manager for running Spark applications on YARN clusters

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/yarn-parent_2.11@1.2.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--yarn-parent-2-11@1.2.0

index.mddocs/

Apache Spark YARN Integration

Apache Spark's YARN integration module enables Spark applications to run on Hadoop YARN (Yet Another Resource Negotiator) clusters alongside other distributed computing frameworks. This module provides an Application Master implementation that manages Spark driver and executor processes within YARN containers, handles resource allocation and deallocation through YARN's ResourceManager, and supports both client and cluster deployment modes.

Package Information

  • Package Name: yarn-parent_2.11
  • Package Type: maven
  • Language: Scala
  • Group ID: org.apache.spark
  • Version: 1.2.2
  • Installation: Include in Maven/SBT project dependencies

Core Imports

import org.apache.spark.deploy.yarn._
import org.apache.spark.scheduler.cluster._

For basic usage:

import org.apache.spark.{SparkConf, SparkContext}

Basic Usage

The YARN integration is typically used by setting the Spark master URL and submitting applications through the YARN client:

import org.apache.spark.{SparkConf, SparkContext}

// Set master to YARN mode
val sparkConf = new SparkConf()
  .setMaster("yarn-client")  // or "yarn-cluster"
  .setAppName("MySparkApp")

val sparkContext = new SparkContext(sparkConf)

For command-line submission:

# Client mode - driver runs on client machine
spark-submit --master yarn-client --num-executors 4 myapp.jar

# Cluster mode - driver runs in YARN container
spark-submit --master yarn-cluster --num-executors 4 myapp.jar

Architecture

Apache Spark YARN integration is built around several key components:

  • Application Master: Coordinates with YARN ResourceManager and manages executor allocation
  • YARN Client: Submits applications to YARN and handles deployment preparation
  • Scheduler Backends: Interface between Spark's scheduler and YARN's resource management
  • Resource Allocation: Dynamic executor allocation based on workload demands
  • Security Integration: Kerberos authentication and delegation token management
  • Deployment Modes: Support for both client and cluster deployment patterns

Capabilities

YARN Client Operations

Core functionality for submitting Spark applications to YARN clusters, handling resource allocation requests, and managing application lifecycle.

// Main entry points for YARN submission
object Client {
  def main(argStrings: Array[String]): Unit
}

object ApplicationMaster {
  def main(args: Array[String]): Unit
}

YARN Client

Application Master Management

Application Master implementation that manages Spark applications within YARN containers, including driver execution and executor coordination.

private[spark] class ApplicationMaster(
  args: ApplicationMasterArguments,
  client: YarnRMClient
) extends Logging {
  def run(): Int
}

object ApplicationMaster {
  private[spark] def sparkContextInitialized(sc: SparkContext): Unit
  private[spark] def sparkContextStopped(sc: SparkContext): Unit
}

Application Master

Scheduler Integration

Scheduler implementations that integrate Spark's task scheduling with YARN's resource management, supporting both client and cluster deployment modes.

private[spark] abstract class YarnSchedulerBackend(
  scheduler: TaskSchedulerImpl, 
  sc: SparkContext
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)

private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
  override def getRackForHost(hostPort: String): Option[String]
  override def postStartHook(): Unit
  override def stop(): Unit
}

private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc)

private[spark] class YarnClientSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) {
  override def start(): Unit
  override def stop(): Unit
  override def applicationId(): String
}

private[spark] class YarnClusterSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) {
  override def start(): Unit
  override def applicationId(): String
}

Scheduler Integration

Hadoop Utilities

YARN-specific utilities for Hadoop integration, security, and environment management that extend Spark's base Hadoop utilities.

class YarnSparkHadoopUtil extends SparkHadoopUtil {
  override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation): Unit
  override def isYarnMode(): Boolean
  override def newConfiguration(conf: SparkConf): Configuration
  override def addCredentials(conf: JobConf): Unit
  override def getCurrentUserCredentials(): Credentials
  override def addCurrentUserCredentials(creds: Credentials): Unit
  override def addSecretKeyToUserCredentials(key: String, secret: String): Unit
  override def getSecretKeyFromUserCredentials(key: String): Array[Byte]
}

object YarnSparkHadoopUtil {
  def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit
  def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit
  def escapeForShell(arg: String): String
  def lookupRack(conf: Configuration, host: String): String
  def populateRackInfo(conf: Configuration, hostname: String): Unit
  def getApplicationAclsForYarn(securityMgr: SecurityManager): Map[ApplicationAccessType, String]
}

Hadoop Utilities

Types

// Core YARN types
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ApplicationAttemptId, ApplicationId, LocalResource}
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils

// YARN-specific classes
private[spark] class ApplicationMasterArguments(args: Array[String]) {
  var userClass: String = null
  var userJar: String = null
  var userArgs: Array[String] = Array.empty
  var numExecutors: Int = 2
  var executorMemory: Int = 1024
  var executorCores: Int = 1
  var amMemory: Int = 512
  var amCores: Int = 1
}

private[spark] trait YarnRMClient {
  def getAttemptId(): ApplicationAttemptId
  def getMaxRegAttempts(conf: YarnConfiguration, sparkConf: SparkConf): Int
  def register(
    driverUrl: String,
    driverRef: RpcEndpointRef,
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    uiAddress: String,
    uiHistoryAddress: String,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource]
  ): YarnAllocator
  def shutdown(): Unit
}

private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
  var addJars: String = null
  var files: String = null  
  var archives: String = null
  var userJar: String = null
  var userClass: String = null
  var userArgs: Array[String] = Array.empty
  var executorMemory: Int = 1024
  var executorCores: Int = 1
  var numExecutors: Int = 2
  var amQueue: String = "default"
  var amMemory: Int = 512
  var amCores: Int = 1
  var appName: String = "Spark"
  val amMemoryOverhead: Int = 384
  val executorMemoryOverhead: Int = 384
}

// Collection types for utilities
import scala.collection.mutable.HashMap
import scala.collection.Map

Configuration

Key configuration properties for YARN integration:

  • spark.yarn.max.executor.failures - Maximum number of executor failures before failing the application
  • spark.yarn.max.worker.failures - (Deprecated) Same as above for backward compatibility
  • spark.yarn.app.id - Application ID set by the Application Master
  • spark.ui.port - Set to "0" for ephemeral port allocation in YARN mode

Deployment Modes

YARN Client Mode (yarn-client)

  • Driver runs on the client machine outside YARN cluster
  • Direct communication between driver and executors
  • Interactive applications and development use cases

YARN Cluster Mode (yarn-cluster)

  • Driver runs inside YARN cluster as part of Application Master
  • Better for production batch jobs
  • Automatic cleanup and resource management

Version Support

The YARN module supports multiple Hadoop versions through separate implementations:

  • Alpha: Hadoop 0.23 and 2.0.x (deprecated in Spark 1.3+)
  • Stable: Hadoop 2.2+ (recommended)

Both implementations provide the same API surface but use different versions of the underlying Hadoop YARN API.