or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-yarn_2.11@1.6.x

docs

application-management.mdapplication-master.mdconfiguration-utilities.mdindex.mdresource-management.mdscheduler-backends.mdsecurity-authentication.md
tile.json

tessl/maven-org-apache-spark--spark-yarn_2-11

tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0

Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads

application-master.mddocs/

Application Master

The ApplicationMaster is the central coordinator for Spark applications running on YARN. It manages executor allocation, handles communication between the driver and YARN ResourceManager, and coordinates the application lifecycle in both client and cluster deployment modes.

ApplicationMaster Class

The ApplicationMaster class serves as the main entry point and coordination layer for Spark applications on YARN.

package org.apache.spark.deploy.yarn

class ApplicationMaster(
  args: ApplicationMasterArguments,
  client: YarnRMClient
) extends Logging

Core Methods

Application Execution

def run(): Int

Main execution method that coordinates the entire ApplicationMaster lifecycle. This method:

  • Initializes the YARN application context
  • Sets up security and delegation token renewal
  • Creates and manages the allocator for executor containers
  • Handles both client and cluster mode operations
  • Returns exit code (0 for success, non-zero for failure)

Example:

import org.apache.spark.deploy.yarn.{ApplicationMaster, ApplicationMasterArguments, YarnRMClient}

val args = new ApplicationMasterArguments(Array(
  "--class", "com.example.MyApp",
  "--jar", "/path/to/app.jar",
  "--executor-memory", "2g",
  "--executor-cores", "2"
))

val client = new YarnRMClient(args)
val am = new ApplicationMaster(args, client)

val exitCode = am.run()
System.exit(exitCode)

Application Termination

def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit

Terminates the ApplicationMaster with the specified status and exit code.

Parameters:

  • status: Final application status (SUCCEEDED, FAILED, KILLED)
  • code: Exit code to return
  • msg: Optional message describing the termination reason
def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit

Unregisters the ApplicationMaster from the YARN ResourceManager.

Parameters:

  • status: Final application status
  • diagnostics: Optional diagnostic message

Example:

try {
  // Application logic here
  am.finish(FinalApplicationStatus.SUCCEEDED, 0, "Application completed successfully")
} catch {
  case e: Exception =>
    am.finish(FinalApplicationStatus.FAILED, 1, s"Application failed: ${e.getMessage}")
}

ApplicationMaster Companion Object

The companion object provides the main entry point and utility methods for ApplicationMaster operations.

object ApplicationMaster

Constants

val EXIT_SUCCESS = 0
val EXIT_UNCAUGHT_EXCEPTION = 10
val EXIT_MAX_EXECUTOR_FAILURES = 11
val EXIT_REPORTER_FAILURE = 12
val EXIT_SC_NOT_INITED = 13
val EXIT_SECURITY = 14
val EXIT_EXCEPTION_USER_CLASS = 15
val EXIT_EARLY = 16
val EXIT_SIGNAL = 17

Main Entry Point

def main(args: Array[String]): Unit

Main entry point for the ApplicationMaster process. This method:

  • Parses command line arguments
  • Creates YarnRMClient and ApplicationMaster instances
  • Executes the ApplicationMaster run loop
  • Handles cleanup and exit code management

Example command line:

# Invoked by YARN when launching the ApplicationMaster container
java org.apache.spark.deploy.yarn.ApplicationMaster \
  --class com.example.MyApp \
  --jar hdfs://namenode:9000/apps/myapp.jar \
  --executor-memory 2g \
  --executor-cores 2 \
  --properties-file /opt/spark/conf/spark.properties

SparkContext Integration

def sparkContextInitialized(sc: SparkContext): Unit

Called when SparkContext is successfully initialized in cluster mode.

def sparkContextStopped(sc: SparkContext): Boolean

Called when SparkContext is stopped. Returns true if the ApplicationMaster should exit.

Example:

// These methods are called automatically by Spark runtime
// In user code, you typically don't call them directly

class MySparkApp {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext()
    // ApplicationMaster.sparkContextInitialized(sc) called automatically
    
    try {
      // Application logic
      val rdd = sc.parallelize(1 to 100)
      println(rdd.sum())
    } finally {
      sc.stop()
      // ApplicationMaster.sparkContextStopped(sc) called automatically
    }
  }
}

ApplicationMasterArguments

The ApplicationMasterArguments class parses and validates command-line arguments for the ApplicationMaster.

package org.apache.spark.deploy.yarn

class ApplicationMasterArguments(val args: Array[String])

Properties

var userJar: String              // Path to user application JAR
var userClass: String            // Main class name for cluster mode
var primaryPyFile: String        // Main Python file for PySpark apps
var primaryRFile: String         // Main R file for SparkR apps  
var userArgs: Seq[String]        // Arguments to pass to user application
var executorMemory: Int          // Executor memory in MB (default: 1024)
var executorCores: Int           // Number of executor cores (default: 1)
var propertiesFile: String       // Path to Spark properties file

Example:

val args = new ApplicationMasterArguments(Array(
  "--class", "com.example.ProcessingApp",
  "--jar", "hdfs://namenode:9000/apps/processing.jar",
  "--executor-memory", "4g", 
  "--executor-cores", "4",
  "--args", "input.txt",
  "--args", "output.txt",
  "--properties-file", "/opt/spark/conf/production.properties"
))

println(s"Main class: ${args.userClass}")
println(s"Executor memory: ${args.executorMemory}MB")
println(s"Executor cores: ${args.executorCores}")
println(s"User args: ${args.userArgs.mkString(", ")}")

Usage Information

def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit

Prints usage information and exits with the specified code.

Deployment Modes

The ApplicationMaster operates differently depending on the deployment mode:

Cluster Mode

In cluster mode, the ApplicationMaster runs the driver program within the YARN cluster.

// Properties set automatically in cluster mode
System.setProperty("spark.master", "yarn-cluster")
System.setProperty("spark.ui.port", "0")  // Use ephemeral port
System.setProperty("spark.yarn.app.id", appId.toString)
System.setProperty("spark.yarn.app.attemptId", attemptId.toString)

// ApplicationMaster manages both driver and executors
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
// Run user application within AM
runUserClass(args.userClass, args.userArgs)

Client Mode

In client mode, the ApplicationMaster only manages executor allocation while the driver runs externally.

// ApplicationMaster acts as executor manager only
// Driver runs separately on client machine
val allocator = new YarnAllocator(
  driverUrl = args.driverUrl,
  driverRef = driverRef,
  conf = yarnConf,
  sparkConf = sparkConf,
  amClient = amClient,
  appAttemptId = attemptId,
  args = args,
  securityMgr = securityMgr
)

// Heartbeat loop to manage executors
while (!finished) {
  allocator.allocateResources()
  reporterThread.wait(heartbeatInterval)
}

Resource Management Integration

The ApplicationMaster integrates closely with YarnAllocator for container management:

import org.apache.spark.deploy.yarn.YarnAllocator

// ApplicationMaster creates and manages YarnAllocator
private var allocator: YarnAllocator = _

def createAllocator(driverRef: RpcEndpointRef): Unit = {
  allocator = new YarnAllocator(
    driverUrl = driverUrl,
    driverRef = driverRef, 
    conf = yarnConf,
    sparkConf = sparkConf,
    amClient = client.amClient,
    appAttemptId = client.getAttemptId(),
    args = args,
    securityMgr = securityMgr
  )
}

// Heartbeat loop for resource allocation
def allocateExecutors(): Unit = {
  allocatorLock.synchronized {
    if (allocator != null) {
      allocator.allocateResources()
    }
  }
}

Security and Token Management

The ApplicationMaster handles security credentials and delegation token renewal:

import org.apache.spark.deploy.yarn.AMDelegationTokenRenewer

// Set up delegation token renewal for long-running applications
if (sparkConf.contains("spark.yarn.credentials.file")) {
  val tokenRenewer = new AMDelegationTokenRenewer(sparkConf, yarnConf)
  tokenRenewer.scheduleLoginFromKeytab()
}

// Security manager for authentication
val securityMgr = new SecurityManager(sparkConf)

Error Handling and Cleanup

The ApplicationMaster provides comprehensive error handling and cleanup:

// Signal handlers for graceful shutdown
if (SystemUtils.IS_OS_UNIX) {
  Seq("TERM", "INT", "HUP").foreach { sig =>
    Signal.handle(new Signal(sig), new SignalHandler {
      override def handle(sig: Signal): Unit = {
        finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SIGNAL)
      }
    })
  }
}

// Shutdown hook for cleanup
val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
ShutdownHookManager.addShutdownHook(priority) { () =>
  if (!finished) {
    finish(finalStatus, ApplicationMaster.EXIT_SUCCESS, 
           "Shutdown hook called before final status was reported.")
  }
  if (!unregistered) {
    unregister(finalStatus, finalMsg)
    cleanupStagingDir(fs)
  }
}

Complete Usage Example

import org.apache.spark.deploy.yarn.{ApplicationMaster, ApplicationMasterArguments}
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus

object MyApplicationMaster {
  def main(args: Array[String]): Unit = {
    val amArgs = new ApplicationMasterArguments(args)
    val client = new YarnRMClient(amArgs)
    val am = new ApplicationMaster(amArgs, client)
    
    try {
      val exitCode = am.run()
      
      exitCode match {
        case ApplicationMaster.EXIT_SUCCESS =>
          println("ApplicationMaster completed successfully")
        case ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES =>
          println("ApplicationMaster failed due to executor failures")
        case ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION =>
          println("ApplicationMaster failed with uncaught exception")
        case _ =>
          println(s"ApplicationMaster exited with code: $exitCode")
      }
      
      System.exit(exitCode)
      
    } catch {
      case e: Exception =>
        am.finish(FinalApplicationStatus.FAILED, 
                 ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, 
                 s"Uncaught exception: ${e.getMessage}")
        System.exit(ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION)
    }
  }
}

The ApplicationMaster provides the core coordination and management functionality that enables Spark applications to run effectively on YARN clusters, handling resource allocation, security, and application lifecycle management in both deployment modes.