or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-yarn_2.11@1.6.x

docs

application-management.mdapplication-master.mdconfiguration-utilities.mdindex.mdresource-management.mdscheduler-backends.mdsecurity-authentication.md
tile.json

tessl/maven-org-apache-spark--spark-yarn_2-11

tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0

Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads

application-management.mddocs/

Application Management

The Apache Spark YARN module provides comprehensive application management capabilities through the Client class and related components. This documentation covers application submission, monitoring, and lifecycle management on YARN clusters.

Client Class

The Client class is the primary interface for submitting and managing Spark applications on YARN clusters. It handles the complete application lifecycle from submission to termination.

package org.apache.spark.deploy.yarn

private[spark] class Client(
  val args: ClientArguments,
  val hadoopConf: Configuration,
  val sparkConf: SparkConf
) extends Logging {

  // Alternative constructor
  def this(clientArgs: ClientArguments, spConf: SparkConf) = 
    this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
}

Core Methods

Application Submission

def submitApplication(): ApplicationId

Submits a Spark application to the YARN ResourceManager. This method:

  • Creates the application submission context
  • Uploads necessary files to HDFS staging directory
  • Configures the ApplicationMaster container
  • Returns the assigned ApplicationId for monitoring

Example:

import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.SparkConf
import org.apache.hadoop.conf.Configuration

val sparkConf = new SparkConf()
  .setAppName("MySparkApp")
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "2")

val args = new ClientArguments(Array(
  "--class", "com.example.MyApp",
  "--jar", "myapp.jar",
  "--num-executors", "4"
), sparkConf)

val client = new Client(args, sparkConf)
val appId = client.submitApplication()
println(s"Submitted application: $appId")

Application Monitoring

def monitorApplication(
  appId: ApplicationId, 
  returnOnRunning: Boolean = false, 
  logApplicationReport: Boolean = true
): (YarnApplicationState, FinalApplicationStatus)

Monitors the application state and returns when the application reaches a terminal state or when returnOnRunning is true and the application is running.

Parameters:

  • appId: The application ID to monitor
  • returnOnRunning: If true, returns when application reaches RUNNING state
  • logApplicationReport: If true, logs application progress reports

Returns: Tuple of (YarnApplicationState, FinalApplicationStatus)

Example:

val (state, finalStatus) = client.monitorApplication(appId, returnOnRunning = true)
state match {
  case YarnApplicationState.RUNNING => 
    println("Application is now running")
  case YarnApplicationState.FINISHED =>
    println(s"Application completed with status: $finalStatus")
  case YarnApplicationState.FAILED =>
    println("Application failed")
  case YarnApplicationState.KILLED =>
    println("Application was killed")
}

Application Execution

def run(): Unit

Runs the complete application lifecycle: submits the application, monitors it until completion, and handles cleanup. This is a blocking operation that combines submitApplication() and monitorApplication().

Example:

try {
  client.run()
  println("Application completed successfully")
} catch {
  case e: SparkException =>
    println(s"Application failed: ${e.getMessage}")
}

Application Report Retrieval

def getApplicationReport(appId: ApplicationId): ApplicationReport

Retrieves the current application report from YARN ResourceManager, containing status, progress, and diagnostic information.

Parameters:

  • appId: The application ID to retrieve the report for

Returns: ApplicationReport containing the current state and details of the application

Example:

val report = client.getApplicationReport(appId)
println(s"Application state: ${report.getYarnApplicationState}")
println(s"Progress: ${report.getProgress * 100}%")
println(s"Tracking URL: ${report.getTrackingUrl}")

Resource Cleanup

def stop(): Unit

Stops the client and cleans up resources. This method should always be called to ensure proper resource cleanup.

Launcher Integration

def reportLauncherState(state: SparkAppHandle.State): Unit

Reports the application state to the launcher backend for external monitoring integration.

Parameters:

  • state: The SparkAppHandle.State to report (SUBMITTED, RUNNING, FINISHED, FAILED, KILLED)

ClientArguments

The ClientArguments class parses and validates command-line arguments for YARN application submission.

package org.apache.spark.deploy.yarn

class ClientArguments(args: Array[String], sparkConf: SparkConf)

Properties

val executorMemory: Int          // Executor memory in MB
val executorCores: Int           // Number of executor cores  
val numExecutors: Int            // Number of executors to request
val appName: String              // Application name
val userJar: String              // Path to user JAR file
val userClass: String            // Main class name
val isClusterMode: Boolean       // Whether running in cluster mode
val pyFiles: String              // Python files for PySpark applications
val files: String                // Additional files to distribute
val archives: String             // Archives to distribute
val amMemoryOverhead: Int        // ApplicationMaster memory overhead
val executorMemoryOverhead: Int  // Executor memory overhead

Example:

val args = new ClientArguments(Array(
  "--class", "com.example.MySparkApp",
  "--jar", "/path/to/app.jar", 
  "--executor-memory", "2g",
  "--executor-cores", "2",
  "--num-executors", "4",
  "--name", "MyApplication"
), sparkConf)

println(s"App Name: ${args.appName}")
println(s"Executors: ${args.numExecutors}")
println(s"Executor Memory: ${args.executorMemory}MB")

Client Companion Object

The Client companion object provides utility methods and constants for YARN application management.

object Client

Constants

val SPARK_JAR = "__spark__.jar"      // Spark assembly JAR alias
val APP_JAR = "__app__.jar"          // User JAR alias  
val LOCAL_SCHEME = "local"           // Local URI scheme
val SPARK_STAGING = ".sparkStaging"  // Staging directory name

Main Entry Point

def main(argStrings: Array[String]): Unit

Main entry point for the YARN client. Parses arguments and runs the application submission process.

Example:

# Command line usage
spark-submit --master yarn --deploy-mode cluster \
  --class com.example.MyApp myapp.jar

Classpath Management

def populateClasspath(
  args: ClientArguments,
  conf: Configuration, 
  sparkConf: SparkConf,
  env: HashMap[String, String],
  extraClassPath: Option[String] = None
): Unit

Populates the classpath environment for YARN containers.

def getUserClasspath(conf: SparkConf): Array[URI]

Retrieves user-specified classpath entries from Spark configuration.

def getClusterPath(conf: SparkConf, path: String): String

Converts a local path to a cluster-accessible path, typically on HDFS.

Path Utilities

def buildPath(components: String*): String

Builds a filesystem path from components, handling separators correctly across platforms.

Example:

import org.apache.spark.deploy.yarn.Client

// Setup classpath for containers
val env = new HashMap[String, String]()
Client.populateClasspath(args, hadoopConf, sparkConf, env)

// Get user classpath
val userClasspath = Client.getUserClasspath(sparkConf)
userClasspath.foreach(uri => println(s"Classpath entry: $uri"))

// Build paths
val stagingPath = Client.buildPath("hdfs://namenode:9000", 
                                   Client.SPARK_STAGING, 
                                   "app-123")

Application Lifecycle Management

Typical Usage Pattern

import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.SparkConf
import org.apache.hadoop.yarn.api.records.YarnApplicationState

// 1. Configure Spark application
val sparkConf = new SparkConf()
  .setAppName("MySparkApplication")
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "2")
  .set("spark.yarn.queue", "production")

// 2. Parse arguments
val clientArgs = Array(
  "--class", "com.example.MySparkApp",
  "--jar", "hdfs://namenode:9000/apps/myapp.jar",
  "--num-executors", "10"
)
val args = new ClientArguments(clientArgs, sparkConf)

// 3. Create and configure client
val client = new Client(args, sparkConf)

try {
  // 4. Submit application
  val appId = client.submitApplication()
  println(s"Application submitted: $appId")
  
  // 5. Monitor application (optional - can use run() instead)
  val (finalState, finalStatus) = client.monitorApplication(appId)
  
  finalState match {
    case YarnApplicationState.FINISHED =>
      println(s"Application completed successfully: $finalStatus")
    case YarnApplicationState.FAILED =>
      println("Application failed")
      sys.exit(1)
    case YarnApplicationState.KILLED =>
      println("Application was killed")
      sys.exit(1)
  }
  
} finally {
  // 6. Clean up resources
  client.stop()
}

Error Handling

import org.apache.spark.SparkException
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException

try {
  client.run()
} catch {
  case e: SparkException =>
    println(s"Spark application error: ${e.getMessage}")
  case e: ApplicationNotFoundException =>
    println(s"Application not found: ${e.getMessage}")
  case e: Exception =>
    println(s"Unexpected error: ${e.getMessage}")
    throw e
}

Configuration Integration

The Client class integrates with Spark's configuration system and supports all standard YARN-related configuration properties:

val sparkConf = new SparkConf()
  .set("spark.yarn.queue", "high-priority")
  .set("spark.yarn.am.memory", "1g") 
  .set("spark.yarn.am.cores", "2")
  .set("spark.yarn.submit.waitAppCompletion", "true")
  .set("spark.yarn.maxAppAttempts", "3")
  .set("spark.yarn.submit.file.replication", "3")

val client = new Client(args, sparkConf)

This comprehensive API enables full programmatic control over Spark application submission and management on YARN clusters, supporting both client and cluster deployment modes with extensive configuration options.