tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads
The ApplicationMaster is the central coordinator for Spark applications running on YARN. It manages executor allocation, handles communication between the driver and YARN ResourceManager, and coordinates the application lifecycle in both client and cluster deployment modes.
The ApplicationMaster class serves as the main entry point and coordination layer for Spark applications on YARN.
package org.apache.spark.deploy.yarn
class ApplicationMaster(
args: ApplicationMasterArguments,
client: YarnRMClient
) extends Loggingdef run(): IntMain execution method that coordinates the entire ApplicationMaster lifecycle. This method:
Example:
import org.apache.spark.deploy.yarn.{ApplicationMaster, ApplicationMasterArguments, YarnRMClient}
val args = new ApplicationMasterArguments(Array(
"--class", "com.example.MyApp",
"--jar", "/path/to/app.jar",
"--executor-memory", "2g",
"--executor-cores", "2"
))
val client = new YarnRMClient(args)
val am = new ApplicationMaster(args, client)
val exitCode = am.run()
System.exit(exitCode)def finish(status: FinalApplicationStatus, code: Int, msg: String = null): UnitTerminates the ApplicationMaster with the specified status and exit code.
Parameters:
status: Final application status (SUCCEEDED, FAILED, KILLED)code: Exit code to returnmsg: Optional message describing the termination reasondef unregister(status: FinalApplicationStatus, diagnostics: String = null): UnitUnregisters the ApplicationMaster from the YARN ResourceManager.
Parameters:
status: Final application statusdiagnostics: Optional diagnostic messageExample:
try {
// Application logic here
am.finish(FinalApplicationStatus.SUCCEEDED, 0, "Application completed successfully")
} catch {
case e: Exception =>
am.finish(FinalApplicationStatus.FAILED, 1, s"Application failed: ${e.getMessage}")
}The companion object provides the main entry point and utility methods for ApplicationMaster operations.
object ApplicationMasterval EXIT_SUCCESS = 0
val EXIT_UNCAUGHT_EXCEPTION = 10
val EXIT_MAX_EXECUTOR_FAILURES = 11
val EXIT_REPORTER_FAILURE = 12
val EXIT_SC_NOT_INITED = 13
val EXIT_SECURITY = 14
val EXIT_EXCEPTION_USER_CLASS = 15
val EXIT_EARLY = 16
val EXIT_SIGNAL = 17def main(args: Array[String]): UnitMain entry point for the ApplicationMaster process. This method:
Example command line:
# Invoked by YARN when launching the ApplicationMaster container
java org.apache.spark.deploy.yarn.ApplicationMaster \
--class com.example.MyApp \
--jar hdfs://namenode:9000/apps/myapp.jar \
--executor-memory 2g \
--executor-cores 2 \
--properties-file /opt/spark/conf/spark.propertiesdef sparkContextInitialized(sc: SparkContext): UnitCalled when SparkContext is successfully initialized in cluster mode.
def sparkContextStopped(sc: SparkContext): BooleanCalled when SparkContext is stopped. Returns true if the ApplicationMaster should exit.
Example:
// These methods are called automatically by Spark runtime
// In user code, you typically don't call them directly
class MySparkApp {
def main(args: Array[String]): Unit = {
val sc = new SparkContext()
// ApplicationMaster.sparkContextInitialized(sc) called automatically
try {
// Application logic
val rdd = sc.parallelize(1 to 100)
println(rdd.sum())
} finally {
sc.stop()
// ApplicationMaster.sparkContextStopped(sc) called automatically
}
}
}The ApplicationMasterArguments class parses and validates command-line arguments for the ApplicationMaster.
package org.apache.spark.deploy.yarn
class ApplicationMasterArguments(val args: Array[String])var userJar: String // Path to user application JAR
var userClass: String // Main class name for cluster mode
var primaryPyFile: String // Main Python file for PySpark apps
var primaryRFile: String // Main R file for SparkR apps
var userArgs: Seq[String] // Arguments to pass to user application
var executorMemory: Int // Executor memory in MB (default: 1024)
var executorCores: Int // Number of executor cores (default: 1)
var propertiesFile: String // Path to Spark properties fileExample:
val args = new ApplicationMasterArguments(Array(
"--class", "com.example.ProcessingApp",
"--jar", "hdfs://namenode:9000/apps/processing.jar",
"--executor-memory", "4g",
"--executor-cores", "4",
"--args", "input.txt",
"--args", "output.txt",
"--properties-file", "/opt/spark/conf/production.properties"
))
println(s"Main class: ${args.userClass}")
println(s"Executor memory: ${args.executorMemory}MB")
println(s"Executor cores: ${args.executorCores}")
println(s"User args: ${args.userArgs.mkString(", ")}")def printUsageAndExit(exitCode: Int, unknownParam: Any = null): UnitPrints usage information and exits with the specified code.
The ApplicationMaster operates differently depending on the deployment mode:
In cluster mode, the ApplicationMaster runs the driver program within the YARN cluster.
// Properties set automatically in cluster mode
System.setProperty("spark.master", "yarn-cluster")
System.setProperty("spark.ui.port", "0") // Use ephemeral port
System.setProperty("spark.yarn.app.id", appId.toString)
System.setProperty("spark.yarn.app.attemptId", attemptId.toString)
// ApplicationMaster manages both driver and executors
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
// Run user application within AM
runUserClass(args.userClass, args.userArgs)In client mode, the ApplicationMaster only manages executor allocation while the driver runs externally.
// ApplicationMaster acts as executor manager only
// Driver runs separately on client machine
val allocator = new YarnAllocator(
driverUrl = args.driverUrl,
driverRef = driverRef,
conf = yarnConf,
sparkConf = sparkConf,
amClient = amClient,
appAttemptId = attemptId,
args = args,
securityMgr = securityMgr
)
// Heartbeat loop to manage executors
while (!finished) {
allocator.allocateResources()
reporterThread.wait(heartbeatInterval)
}The ApplicationMaster integrates closely with YarnAllocator for container management:
import org.apache.spark.deploy.yarn.YarnAllocator
// ApplicationMaster creates and manages YarnAllocator
private var allocator: YarnAllocator = _
def createAllocator(driverRef: RpcEndpointRef): Unit = {
allocator = new YarnAllocator(
driverUrl = driverUrl,
driverRef = driverRef,
conf = yarnConf,
sparkConf = sparkConf,
amClient = client.amClient,
appAttemptId = client.getAttemptId(),
args = args,
securityMgr = securityMgr
)
}
// Heartbeat loop for resource allocation
def allocateExecutors(): Unit = {
allocatorLock.synchronized {
if (allocator != null) {
allocator.allocateResources()
}
}
}The ApplicationMaster handles security credentials and delegation token renewal:
import org.apache.spark.deploy.yarn.AMDelegationTokenRenewer
// Set up delegation token renewal for long-running applications
if (sparkConf.contains("spark.yarn.credentials.file")) {
val tokenRenewer = new AMDelegationTokenRenewer(sparkConf, yarnConf)
tokenRenewer.scheduleLoginFromKeytab()
}
// Security manager for authentication
val securityMgr = new SecurityManager(sparkConf)The ApplicationMaster provides comprehensive error handling and cleanup:
// Signal handlers for graceful shutdown
if (SystemUtils.IS_OS_UNIX) {
Seq("TERM", "INT", "HUP").foreach { sig =>
Signal.handle(new Signal(sig), new SignalHandler {
override def handle(sig: Signal): Unit = {
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SIGNAL)
}
})
}
}
// Shutdown hook for cleanup
val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
ShutdownHookManager.addShutdownHook(priority) { () =>
if (!finished) {
finish(finalStatus, ApplicationMaster.EXIT_SUCCESS,
"Shutdown hook called before final status was reported.")
}
if (!unregistered) {
unregister(finalStatus, finalMsg)
cleanupStagingDir(fs)
}
}import org.apache.spark.deploy.yarn.{ApplicationMaster, ApplicationMasterArguments}
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
object MyApplicationMaster {
def main(args: Array[String]): Unit = {
val amArgs = new ApplicationMasterArguments(args)
val client = new YarnRMClient(amArgs)
val am = new ApplicationMaster(amArgs, client)
try {
val exitCode = am.run()
exitCode match {
case ApplicationMaster.EXIT_SUCCESS =>
println("ApplicationMaster completed successfully")
case ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES =>
println("ApplicationMaster failed due to executor failures")
case ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION =>
println("ApplicationMaster failed with uncaught exception")
case _ =>
println(s"ApplicationMaster exited with code: $exitCode")
}
System.exit(exitCode)
} catch {
case e: Exception =>
am.finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
s"Uncaught exception: ${e.getMessage}")
System.exit(ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION)
}
}
}The ApplicationMaster provides the core coordination and management functionality that enables Spark applications to run effectively on YARN clusters, handling resource allocation, security, and application lifecycle management in both deployment modes.