tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads
The Apache Spark YARN module provides comprehensive application management capabilities through the Client class and related components. This documentation covers application submission, monitoring, and lifecycle management on YARN clusters.
The Client class is the primary interface for submitting and managing Spark applications on YARN clusters. It handles the complete application lifecycle from submission to termination.
package org.apache.spark.deploy.yarn
private[spark] class Client(
val args: ClientArguments,
val hadoopConf: Configuration,
val sparkConf: SparkConf
) extends Logging {
// Alternative constructor
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
}def submitApplication(): ApplicationIdSubmits a Spark application to the YARN ResourceManager. This method:
Example:
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.SparkConf
import org.apache.hadoop.conf.Configuration
val sparkConf = new SparkConf()
.setAppName("MySparkApp")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
val args = new ClientArguments(Array(
"--class", "com.example.MyApp",
"--jar", "myapp.jar",
"--num-executors", "4"
), sparkConf)
val client = new Client(args, sparkConf)
val appId = client.submitApplication()
println(s"Submitted application: $appId")def monitorApplication(
appId: ApplicationId,
returnOnRunning: Boolean = false,
logApplicationReport: Boolean = true
): (YarnApplicationState, FinalApplicationStatus)Monitors the application state and returns when the application reaches a terminal state or when returnOnRunning is true and the application is running.
Parameters:
appId: The application ID to monitorreturnOnRunning: If true, returns when application reaches RUNNING statelogApplicationReport: If true, logs application progress reportsReturns: Tuple of (YarnApplicationState, FinalApplicationStatus)
Example:
val (state, finalStatus) = client.monitorApplication(appId, returnOnRunning = true)
state match {
case YarnApplicationState.RUNNING =>
println("Application is now running")
case YarnApplicationState.FINISHED =>
println(s"Application completed with status: $finalStatus")
case YarnApplicationState.FAILED =>
println("Application failed")
case YarnApplicationState.KILLED =>
println("Application was killed")
}def run(): UnitRuns the complete application lifecycle: submits the application, monitors it until completion, and handles cleanup. This is a blocking operation that combines submitApplication() and monitorApplication().
Example:
try {
client.run()
println("Application completed successfully")
} catch {
case e: SparkException =>
println(s"Application failed: ${e.getMessage}")
}def getApplicationReport(appId: ApplicationId): ApplicationReportRetrieves the current application report from YARN ResourceManager, containing status, progress, and diagnostic information.
Parameters:
appId: The application ID to retrieve the report forReturns: ApplicationReport containing the current state and details of the application
Example:
val report = client.getApplicationReport(appId)
println(s"Application state: ${report.getYarnApplicationState}")
println(s"Progress: ${report.getProgress * 100}%")
println(s"Tracking URL: ${report.getTrackingUrl}")def stop(): UnitStops the client and cleans up resources. This method should always be called to ensure proper resource cleanup.
def reportLauncherState(state: SparkAppHandle.State): UnitReports the application state to the launcher backend for external monitoring integration.
Parameters:
state: The SparkAppHandle.State to report (SUBMITTED, RUNNING, FINISHED, FAILED, KILLED)The ClientArguments class parses and validates command-line arguments for YARN application submission.
package org.apache.spark.deploy.yarn
class ClientArguments(args: Array[String], sparkConf: SparkConf)val executorMemory: Int // Executor memory in MB
val executorCores: Int // Number of executor cores
val numExecutors: Int // Number of executors to request
val appName: String // Application name
val userJar: String // Path to user JAR file
val userClass: String // Main class name
val isClusterMode: Boolean // Whether running in cluster mode
val pyFiles: String // Python files for PySpark applications
val files: String // Additional files to distribute
val archives: String // Archives to distribute
val amMemoryOverhead: Int // ApplicationMaster memory overhead
val executorMemoryOverhead: Int // Executor memory overheadExample:
val args = new ClientArguments(Array(
"--class", "com.example.MySparkApp",
"--jar", "/path/to/app.jar",
"--executor-memory", "2g",
"--executor-cores", "2",
"--num-executors", "4",
"--name", "MyApplication"
), sparkConf)
println(s"App Name: ${args.appName}")
println(s"Executors: ${args.numExecutors}")
println(s"Executor Memory: ${args.executorMemory}MB")The Client companion object provides utility methods and constants for YARN application management.
object Clientval SPARK_JAR = "__spark__.jar" // Spark assembly JAR alias
val APP_JAR = "__app__.jar" // User JAR alias
val LOCAL_SCHEME = "local" // Local URI scheme
val SPARK_STAGING = ".sparkStaging" // Staging directory namedef main(argStrings: Array[String]): UnitMain entry point for the YARN client. Parses arguments and runs the application submission process.
Example:
# Command line usage
spark-submit --master yarn --deploy-mode cluster \
--class com.example.MyApp myapp.jardef populateClasspath(
args: ClientArguments,
conf: Configuration,
sparkConf: SparkConf,
env: HashMap[String, String],
extraClassPath: Option[String] = None
): UnitPopulates the classpath environment for YARN containers.
def getUserClasspath(conf: SparkConf): Array[URI]Retrieves user-specified classpath entries from Spark configuration.
def getClusterPath(conf: SparkConf, path: String): StringConverts a local path to a cluster-accessible path, typically on HDFS.
def buildPath(components: String*): StringBuilds a filesystem path from components, handling separators correctly across platforms.
Example:
import org.apache.spark.deploy.yarn.Client
// Setup classpath for containers
val env = new HashMap[String, String]()
Client.populateClasspath(args, hadoopConf, sparkConf, env)
// Get user classpath
val userClasspath = Client.getUserClasspath(sparkConf)
userClasspath.foreach(uri => println(s"Classpath entry: $uri"))
// Build paths
val stagingPath = Client.buildPath("hdfs://namenode:9000",
Client.SPARK_STAGING,
"app-123")import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.SparkConf
import org.apache.hadoop.yarn.api.records.YarnApplicationState
// 1. Configure Spark application
val sparkConf = new SparkConf()
.setAppName("MySparkApplication")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
.set("spark.yarn.queue", "production")
// 2. Parse arguments
val clientArgs = Array(
"--class", "com.example.MySparkApp",
"--jar", "hdfs://namenode:9000/apps/myapp.jar",
"--num-executors", "10"
)
val args = new ClientArguments(clientArgs, sparkConf)
// 3. Create and configure client
val client = new Client(args, sparkConf)
try {
// 4. Submit application
val appId = client.submitApplication()
println(s"Application submitted: $appId")
// 5. Monitor application (optional - can use run() instead)
val (finalState, finalStatus) = client.monitorApplication(appId)
finalState match {
case YarnApplicationState.FINISHED =>
println(s"Application completed successfully: $finalStatus")
case YarnApplicationState.FAILED =>
println("Application failed")
sys.exit(1)
case YarnApplicationState.KILLED =>
println("Application was killed")
sys.exit(1)
}
} finally {
// 6. Clean up resources
client.stop()
}import org.apache.spark.SparkException
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
try {
client.run()
} catch {
case e: SparkException =>
println(s"Spark application error: ${e.getMessage}")
case e: ApplicationNotFoundException =>
println(s"Application not found: ${e.getMessage}")
case e: Exception =>
println(s"Unexpected error: ${e.getMessage}")
throw e
}The Client class integrates with Spark's configuration system and supports all standard YARN-related configuration properties:
val sparkConf = new SparkConf()
.set("spark.yarn.queue", "high-priority")
.set("spark.yarn.am.memory", "1g")
.set("spark.yarn.am.cores", "2")
.set("spark.yarn.submit.waitAppCompletion", "true")
.set("spark.yarn.maxAppAttempts", "3")
.set("spark.yarn.submit.file.replication", "3")
val client = new Client(args, sparkConf)This comprehensive API enables full programmatic control over Spark application submission and management on YARN clusters, supporting both client and cluster deployment modes with extensive configuration options.