Apache Spark's integration with Hadoop YARN cluster manager for running Spark applications on YARN clusters
—
The Application Master is the central coordinator for Spark applications running on YARN. It manages the application lifecycle, negotiates resources with the YARN ResourceManager, and coordinates between the Spark driver and executor processes within YARN containers.
The main Application Master class that handles application coordination and resource management within the YARN cluster.
/**
* Application Master for Spark on YARN
* Manages application lifecycle and resource allocation
*/
private[spark] class ApplicationMaster(
args: ApplicationMasterArguments,
client: YarnRMClient
) extends Logging {
/**
* Main execution method for the Application Master
* @return Exit code (0 for success, non-zero for failure)
*/
final def run(): Int
}The Application Master operates in two modes:
Main entry points for launching the Application Master and Executor Launcher processes.
/**
* Entry point for Application Master process
* Called by YARN when launching the AM container
*/
object ApplicationMaster extends Logging {
def main(args: Array[String]): Unit
/**
* Notifies the Application Master that SparkContext has been initialized
* @param sc The initialized SparkContext
*/
private[spark] def sparkContextInitialized(sc: SparkContext): Unit
/**
* Notifies the Application Master that SparkContext has been stopped
* @param sc The stopped SparkContext
*/
private[spark] def sparkContextStopped(sc: SparkContext): Unit
}
/**
* Entry point for Executor Launcher (client mode)
* Launches executors when driver runs outside YARN
*/
object ExecutorLauncher {
def main(args: Array[String]): Unit
}Usage Example:
The Application Master is automatically launched by YARN, but the main methods are called as follows:
# YARN calls Application Master main method
java -cp $CLASSPATH org.apache.spark.deploy.yarn.ApplicationMaster \
--class com.example.MyApp \
--jar myapp.jar \
--args arg1,arg2 \
--num-executors 4 \
--executor-memory 2g
# For client mode, Executor Launcher is used
java -cp $CLASSPATH org.apache.spark.deploy.yarn.ExecutorLauncher \
--num-executors 4 \
--executor-memory 2gArgument parsing and configuration management for Application Master initialization.
/**
* Parses command-line arguments for Application Master
*/
class ApplicationMasterArguments(val args: Array[String]) {
var userJar: String = null
var userClass: String = null
var userArgs: Seq[String] = Nil
var propertiesFile: String = null
var numExecutors: Int = 2
var executorMemory: String = "1g"
var executorCores: Int = 1
}
object ApplicationMasterArguments {
// Utility methods for argument parsing
}Key Arguments:
--class: Main class to execute (cluster mode only)--jar: User application JAR file--args: Application arguments (comma-separated)--num-executors: Number of executor containers to request--executor-memory: Memory per executor container--executor-cores: CPU cores per executor container--properties-file: Spark properties file locationThe Application Master integrates with YARN's resource management through the YarnRMClient interface.
/**
* Interface for YARN Resource Manager client operations
* Handles resource requests and container management
*/
trait YarnRMClient {
def getAttemptId(): ApplicationAttemptId
def getMaxRegAttempts(conf: YarnConfiguration): Int
// Additional resource management methods (internal implementation)
}The Application Master manages executor lifecycle through the YarnAllocator.
/**
* Handles executor allocation and management
* Requests containers from YARN and launches executor processes
*/
private[spark] class YarnAllocator(/* parameters */) extends Logging {
// Internal implementation for:
// - Container allocation requests
// - Executor process launching
// - Container failure handling
// - Dynamic scaling based on workload
}
/**
* Enumeration for allocation types
*/
object AllocationType extends Enumeration {
// Defines different types of resource allocations
}In cluster mode, the Application Master includes the driver:
// Cluster mode - driver runs in AM
if (isClusterMode) {
// Launch user application class within AM process
runDriver(securityManager)
} else {
// Client mode - launch executors only
runExecutorLauncher(securityManager)
}The Application Master manages YARN containers for executors:
Key Spark properties that affect Application Master behavior:
val sparkConf = new SparkConf()
.set("spark.yarn.max.executor.failures", "6") // Max executor failures before AM fails
.set("spark.yarn.am.memory", "1g") // Application Master memory
.set("spark.yarn.am.cores", "1") // Application Master CPU cores
.set("spark.yarn.am.waitTime", "100s") // Max wait time for SparkContext
.set("spark.yarn.submit.waitAppCompletion", "true") // Wait for app completionIntegration with YARN configuration:
// YARN configuration integration
val yarnConf = new YarnConfiguration(hadoopConf)
// Security configuration
val securityManager = new SecurityManager(sparkConf)
// FileSystem integration
val fs = FileSystem.get(yarnConf)The Application Master handles various failure conditions:
Automatic cleanup includes:
The Application Master provides extensive logging and monitoring:
// Logging integration
extends Logging
// Key log events
logInfo("ApplicationAttemptId: " + appAttemptId)
logInfo("Registered with ResourceManager")
logInfo("Allocating " + numExecutors + " executors")
logWarning("Executor failed: " + executorId)
logError("Fatal error in Application Master", exception)The Application Master coordinates with Spark schedulers:
Comprehensive security integration:
Integration with distributed storage:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--yarn-parent-2-11