CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--yarn-parent-2-11

Apache Spark's integration with Hadoop YARN cluster manager for running Spark applications on YARN clusters

Pending
Overview
Eval results
Files

application-master.mddocs/

Application Master

The Application Master is the central coordinator for Spark applications running on YARN. It manages the application lifecycle, negotiates resources with the YARN ResourceManager, and coordinates between the Spark driver and executor processes within YARN containers.

Capabilities

Application Master Core

The main Application Master class that handles application coordination and resource management within the YARN cluster.

/**
 * Application Master for Spark on YARN
 * Manages application lifecycle and resource allocation
 */
private[spark] class ApplicationMaster(
  args: ApplicationMasterArguments,
  client: YarnRMClient
) extends Logging {
  
  /**
   * Main execution method for the Application Master
   * @return Exit code (0 for success, non-zero for failure)
   */
  final def run(): Int
}

The Application Master operates in two modes:

  • Client Mode: Acts as executor launcher, driver runs outside YARN
  • Cluster Mode: Includes the driver within the Application Master process

Application Master Entry Points

Main entry points for launching the Application Master and Executor Launcher processes.

/**
 * Entry point for Application Master process
 * Called by YARN when launching the AM container
 */
object ApplicationMaster extends Logging {
  def main(args: Array[String]): Unit
  
  /**
   * Notifies the Application Master that SparkContext has been initialized
   * @param sc The initialized SparkContext
   */
  private[spark] def sparkContextInitialized(sc: SparkContext): Unit
  
  /**
   * Notifies the Application Master that SparkContext has been stopped
   * @param sc The stopped SparkContext
   */
  private[spark] def sparkContextStopped(sc: SparkContext): Unit
}

/**
 * Entry point for Executor Launcher (client mode)
 * Launches executors when driver runs outside YARN
 */
object ExecutorLauncher {
  def main(args: Array[String]): Unit
}

Usage Example:

The Application Master is automatically launched by YARN, but the main methods are called as follows:

# YARN calls Application Master main method
java -cp $CLASSPATH org.apache.spark.deploy.yarn.ApplicationMaster \
  --class com.example.MyApp \
  --jar myapp.jar \
  --args arg1,arg2 \
  --num-executors 4 \
  --executor-memory 2g

# For client mode, Executor Launcher is used
java -cp $CLASSPATH org.apache.spark.deploy.yarn.ExecutorLauncher \
  --num-executors 4 \
  --executor-memory 2g

Application Master Arguments

Argument parsing and configuration management for Application Master initialization.

/**
 * Parses command-line arguments for Application Master
 */
class ApplicationMasterArguments(val args: Array[String]) {
  var userJar: String = null
  var userClass: String = null
  var userArgs: Seq[String] = Nil
  var propertiesFile: String = null
  var numExecutors: Int = 2
  var executorMemory: String = "1g"
  var executorCores: Int = 1
}

object ApplicationMasterArguments {
  // Utility methods for argument parsing
}

Key Arguments:

  • --class: Main class to execute (cluster mode only)
  • --jar: User application JAR file
  • --args: Application arguments (comma-separated)
  • --num-executors: Number of executor containers to request
  • --executor-memory: Memory per executor container
  • --executor-cores: CPU cores per executor container
  • --properties-file: Spark properties file location

Resource Management Integration

The Application Master integrates with YARN's resource management through the YarnRMClient interface.

/**
 * Interface for YARN Resource Manager client operations
 * Handles resource requests and container management
 */
trait YarnRMClient {
  def getAttemptId(): ApplicationAttemptId
  def getMaxRegAttempts(conf: YarnConfiguration): Int
  // Additional resource management methods (internal implementation)
}

Executor Management

The Application Master manages executor lifecycle through the YarnAllocator.

/**
 * Handles executor allocation and management
 * Requests containers from YARN and launches executor processes
 */
private[spark] class YarnAllocator(/* parameters */) extends Logging {
  // Internal implementation for:
  // - Container allocation requests
  // - Executor process launching  
  // - Container failure handling
  // - Dynamic scaling based on workload
}

/**
 * Enumeration for allocation types
 */
object AllocationType extends Enumeration {
  // Defines different types of resource allocations
}

Lifecycle Management

Application Master Lifecycle

  1. Initialization: Parse arguments, set up configuration, establish RM connection
  2. Registration: Register with YARN ResourceManager
  3. Resource Allocation: Request executor containers based on configuration
  4. Executor Launch: Start executor processes in allocated containers
  5. Application Execution: Coordinate between driver and executors
  6. Cleanup: Release resources and unregister from ResourceManager

Driver Integration

In cluster mode, the Application Master includes the driver:

// Cluster mode - driver runs in AM
if (isClusterMode) {
  // Launch user application class within AM process
  runDriver(securityManager)
} else {
  // Client mode - launch executors only
  runExecutorLauncher(securityManager)
}

Container Management

The Application Master manages YARN containers for executors:

  • Container Requests: Specify memory, CPU, and locality preferences
  • Container Allocation: Handle ResourceManager responses
  • Executor Launch: Start Spark executor processes in containers
  • Health Monitoring: Track container status and handle failures
  • Dynamic Scaling: Add/remove executors based on workload

Configuration Integration

Spark Configuration

Key Spark properties that affect Application Master behavior:

val sparkConf = new SparkConf()
  .set("spark.yarn.max.executor.failures", "6")    // Max executor failures before AM fails
  .set("spark.yarn.am.memory", "1g")               // Application Master memory
  .set("spark.yarn.am.cores", "1")                 // Application Master CPU cores
  .set("spark.yarn.am.waitTime", "100s")           // Max wait time for SparkContext
  .set("spark.yarn.submit.waitAppCompletion", "true") // Wait for app completion

YARN Configuration

Integration with YARN configuration:

// YARN configuration integration
val yarnConf = new YarnConfiguration(hadoopConf)

// Security configuration
val securityManager = new SecurityManager(sparkConf)

// FileSystem integration
val fs = FileSystem.get(yarnConf)

Error Handling and Recovery

Failure Scenarios

The Application Master handles various failure conditions:

  • Executor Failures: Automatic restart up to configured limits
  • Driver Failures: Application termination with proper cleanup
  • ResourceManager Communication: Retry logic for RM interactions
  • Container Preemption: Handle YARN preemption gracefully
  • Network Partitions: Robust handling of connectivity issues

Cleanup Operations

Automatic cleanup includes:

  • Container Release: Return unused containers to YARN
  • File Cleanup: Remove staging files from HDFS
  • Resource Deallocation: Clean up allocated resources
  • Registration Cleanup: Unregister from ResourceManager

Monitoring and Logging

The Application Master provides extensive logging and monitoring:

// Logging integration
extends Logging

// Key log events
logInfo("ApplicationAttemptId: " + appAttemptId)
logInfo("Registered with ResourceManager")
logInfo("Allocating " + numExecutors + " executors")
logWarning("Executor failed: " + executorId)
logError("Fatal error in Application Master", exception)

Integration with Spark Components

Scheduler Integration

The Application Master coordinates with Spark schedulers:

  • Task Scheduling: Interface with TaskScheduler for task placement
  • Resource Updates: Notify scheduler of resource changes
  • Locality Preferences: Honor data locality requirements
  • Load Balancing: Distribute tasks across available executors

Security Integration

Comprehensive security integration:

  • Kerberos Authentication: Support for secure clusters
  • Delegation Tokens: Manage HDFS and other service tokens
  • Secret Management: Secure distribution of secrets
  • ACL Management: Application access control lists

Storage Integration

Integration with distributed storage:

  • HDFS Integration: Access to Hadoop Distributed File System
  • Staging Directory: Temporary file storage during application lifecycle
  • Log Aggregation: Collection of executor logs after completion

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--yarn-parent-2-11

docs

application-master.md

client.md

hadoop-utils.md

index.md

schedulers.md

tile.json