or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-yarn_2.11@1.6.x

docs

application-management.mdapplication-master.mdconfiguration-utilities.mdindex.mdresource-management.mdscheduler-backends.mdsecurity-authentication.md
tile.json

tessl/maven-org-apache-spark--spark-yarn_2-11

tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0

Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads

scheduler-backends.mddocs/

Scheduler Backends

The YARN scheduler backends provide the core scheduling and executor management functionality for Spark applications running on YARN clusters. These components handle communication between the driver, ApplicationMaster, and executors, with different implementations for client and cluster deployment modes.

YarnSchedulerBackend (Base Class)

The abstract YarnSchedulerBackend class provides common functionality shared between client and cluster modes.

package org.apache.spark.scheduler.cluster

private[spark] abstract class YarnSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)

Core Properties

protected var totalExpectedExecutors: Int = 0
protected val minRegisteredRatio: Double = 0.8  // Default minimum registration ratio

Executor Management

def doRequestTotalExecutors(requestedTotal: Int): Boolean

Requests executors from the ApplicationMaster by specifying the total number desired, including executors already pending or running.

Parameters:

  • requestedTotal: Total number of executors desired (including pending/running)

Returns: true if the request was submitted successfully

def doKillExecutors(executorIds: Seq[String]): Boolean

Requests that the ApplicationMaster kill the specified executors.

Parameters:

  • executorIds: Sequence of executor IDs to terminate

Returns: true if the kill request was submitted successfully

Example:

import org.apache.spark.scheduler.cluster.YarnSchedulerBackend

// Request 10 total executors
val success = backend.doRequestTotalExecutors(10)
if (success) {
  println("Executor request submitted successfully")
}

// Kill specific executors
val executorIds = Seq("executor-1", "executor-2") 
val killed = backend.doKillExecutors(executorIds)

Resource Monitoring

def sufficientResourcesRegistered(): Boolean

Determines whether sufficient executors have been registered to start scheduling tasks.

Returns: true if registered executors meet the minimum threshold

def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint

Creates a YARN-specific driver endpoint that handles executor disconnections and communicates with the ApplicationMaster.

YarnClientSchedulerBackend

The YarnClientSchedulerBackend is used for client deployment mode where the driver runs outside the YARN cluster.

package org.apache.spark.scheduler.cluster

private[spark] class YarnClientSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) with Logging

Properties

private var client: Client = null
private var appId: ApplicationId = null
private var monitorThread: MonitorThread = null

Lifecycle Management

def start(): Unit

Creates a YARN client to submit an application to the ResourceManager and waits until the application is running. This method:

  • Configures the driver host and port
  • Creates and submits the YARN application
  • Starts credential renewal if security is enabled
  • Launches monitoring thread for application state

Example:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend

val conf = new SparkConf()
  .setAppName("MyYarnApp")
  .set("spark.driver.host", "driver-host.example.com")
  .set("spark.driver.port", "7077")

val sc = new SparkContext(conf)
val taskScheduler = new TaskSchedulerImpl(sc)
val backend = new YarnClientSchedulerBackend(taskScheduler, sc)

backend.start()
// Application is now running on YARN
def stop(): Unit

Stops the scheduler backend and cleans up resources. This method:

  • Stops the application monitoring thread
  • Reports final state to the launcher
  • Stops credential renewal
  • Cleans up the YARN client

Application Information

def applicationId(): String

Returns the YARN application ID as a string.

Example:

val appId = backend.applicationId()
println(s"Application ID: $appId")

Internal Components

MonitorThread

private class MonitorThread extends Thread

Monitors the application state and stops the SparkContext if the YARN application exits unexpectedly.

def stopMonitor(): Unit

Safely stops the monitoring thread.

Example:

// The MonitorThread is managed internally by YarnClientSchedulerBackend
// It automatically monitors application state and handles cleanup
try {
  backend.start()
  // Application runs...
} catch {
  case e: SparkException =>
    println(s"Application failed: ${e.getMessage}")
} finally {
  backend.stop()  // Automatically stops monitor thread
}

YarnClusterSchedulerBackend

The YarnClusterSchedulerBackend is used for cluster deployment mode where the driver runs within the YARN cluster.

package org.apache.spark.scheduler.cluster

private[spark] class YarnClusterSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc)

Lifecycle Management

def start(): Unit

Initializes the cluster mode scheduler backend and sets the initial target executor count.

Application Information

def applicationId(): String

Returns the application ID from the Spark configuration. Logs an error if not set since it's expected to be available in cluster mode.

def applicationAttemptId(): Option[String]

Returns the application attempt ID from the Spark configuration.

Example:

import org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend

val backend = new YarnClusterSchedulerBackend(scheduler, sc)
backend.start()

val appId = backend.applicationId()
val attemptId = backend.applicationAttemptId()

println(s"Application: $appId, Attempt: ${attemptId.getOrElse("unknown")}")

Driver Log Integration

def getDriverLogUrls: Option[Map[String, String]]

Retrieves the URLs for the driver logs in the YARN NodeManager web interface.

Returns: Map containing log URLs for "stdout" and "stderr", or None if unavailable

Example:

backend.getDriverLogUrls match {
  case Some(logUrls) =>
    logUrls.foreach { case (logType, url) =>
      println(s"$logType logs: $url")
    }
  case None =>
    println("Driver log URLs not available")
}

YarnScheduler

The YarnScheduler class extends TaskSchedulerImpl with YARN-specific rack awareness functionality.

package org.apache.spark.scheduler.cluster

private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc)

Rack Awareness

def getRackForHost(hostPort: String): Option[String]

Resolves the rack location for a given host, enabling rack-aware task scheduling.

Parameters:

  • hostPort: Host and port string (e.g., "host.example.com:8080")

Returns: The rack location as an Option[String], or None if unknown

Example:

import org.apache.spark.scheduler.cluster.YarnScheduler

val scheduler = new YarnScheduler(sc)
val rack = scheduler.getRackForHost("worker1.example.com:8080")

rack match {
  case Some(rackId) => println(s"Host is in rack: $rackId")
  case None => println("Rack information unavailable")
}

YarnClusterScheduler

The YarnClusterScheduler extends YarnScheduler with ApplicationMaster integration for cluster mode.

package org.apache.spark.scheduler.cluster

private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc)

ApplicationMaster Integration

def postStartHook(): Unit

Called after scheduler initialization to notify the ApplicationMaster that the SparkContext has been initialized.

def stop(): Unit

Stops the scheduler and notifies the ApplicationMaster that the SparkContext has been stopped.

Example:

import org.apache.spark.scheduler.cluster.YarnClusterScheduler

val scheduler = new YarnClusterScheduler(sc)
// postStartHook() called automatically during SparkContext initialization
// stop() called automatically during SparkContext shutdown

Usage Patterns

Client Mode Setup

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.{YarnClientSchedulerBackend, YarnScheduler}

// Configure for client mode
val conf = new SparkConf()
  .setAppName("ClientModeApp")
  .setMaster("yarn-client")
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "2")

val sc = new SparkContext(conf)
val scheduler = new YarnScheduler(sc)
val backend = new YarnClientSchedulerBackend(scheduler, sc)

try {
  scheduler.initialize(backend)
  backend.start()
  
  // Application logic
  val rdd = sc.parallelize(1 to 1000)
  println(s"Sum: ${rdd.sum()}")
  
} finally {
  backend.stop()
  sc.stop()
}

Cluster Mode Setup

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.cluster.{YarnClusterSchedulerBackend, YarnClusterScheduler}

// Configure for cluster mode
val conf = new SparkConf()
  .setAppName("ClusterModeApp")
  .setMaster("yarn-cluster")
  .set("spark.executor.memory", "4g")
  .set("spark.executor.cores", "4")

val sc = new SparkContext(conf)
val scheduler = new YarnClusterScheduler(sc)
val backend = new YarnClusterSchedulerBackend(scheduler, sc)

try {
  scheduler.initialize(backend)
  backend.start()
  
  // Application logic runs in cluster
  val rdd = sc.textFile("hdfs://namenode:9000/input/data.txt")
  val wordCounts = rdd.flatMap(_.split(" "))
                     .map(word => (word, 1))
                     .reduceByKey(_ + _)
  wordCounts.saveAsTextFile("hdfs://namenode:9000/output/results")
  
} finally {
  backend.stop()
  sc.stop()
}

Dynamic Executor Management

import org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend

// Request more executors dynamically
val backend: YarnClientSchedulerBackend = // ... initialized backend

// Scale up to 20 executors
if (backend.doRequestTotalExecutors(20)) {
  println("Successfully requested 20 executors")
} else {
  println("Failed to request executors")
}

// Scale down by killing specific executors
val executorsToKill = Seq("executor-15", "executor-16", "executor-17")
if (backend.doKillExecutors(executorsToKill)) {
  println(s"Successfully requested to kill executors: ${executorsToKill.mkString(", ")}")
}

// Check if sufficient resources are available
if (backend.sufficientResourcesRegistered()) {
  println("Sufficient executors registered, can start scheduling tasks")
} else {
  println("Waiting for more executors to register...")
}

Error Handling and Monitoring

import org.apache.spark.SparkException
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException

try {
  val backend = new YarnClientSchedulerBackend(scheduler, sc)
  backend.start()
  
  // Monitor application
  val appId = backend.applicationId()
  println(s"Monitoring application: $appId")
  
  // Run application logic
  // ...
  
} catch {
  case e: SparkException =>
    println(s"Spark application failed: ${e.getMessage}")
  case e: ApplicationNotFoundException =>
    println(s"YARN application not found: ${e.getMessage}")
  case e: Exception =>
    println(s"Unexpected error: ${e.getMessage}")
    throw e
} finally {
  if (backend != null) {
    backend.stop()
  }
}

These scheduler backends provide the essential coordination layer between Spark and YARN, handling executor lifecycle management, resource allocation, and communication in both client and cluster deployment modes.