or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-deployment.mdcluster-management.mdcommand-building.mdconfiguration-system.mdextension-points.mdindex.mdresource-management.mdsecurity-integration.mdyarn-shuffle-service.md
tile.json

application-deployment.mddocs/

Application Deployment

Client API for submitting and managing YARN applications programmatically. This module provides comprehensive support for both client and cluster deployment modes, including application lifecycle management and monitoring capabilities.

Capabilities

Client

Main client interface for YARN application submission and management. Handles resource staging, application submission, and optional monitoring.

class Client(args: ClientArguments, sparkConf: SparkConf) {
  def submitApplication(): ApplicationId
  def run(): Unit  
  def stop(): Unit
  def monitorApplication(appId: ApplicationId, returnOnRunning: Boolean, logApplicationReport: Boolean): YarnAppReport
  def getApplicationReport(appId: ApplicationId): ApplicationReport
  def reportLauncherState(state: SparkAppHandle.State): Unit
  def cleanupStagingDir(appId: ApplicationId): Unit
  def createApplicationSubmissionContext(newApp: YarnClientApplication, containerContext: ContainerLaunchContext, appStagingBaseDir: String): ApplicationSubmissionContext
  def prepareLocalResources(destDir: Path, pySparkArchives: Seq[String]): HashMap[String, LocalResource]
  def copyFileToRemote(destDir: Path, srcPath: Path, replication: Short, symlinkCache: HashMap[String, Path] = null): Path
  def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit
}

Constructor Parameters:

  • args: ClientArguments containing application jar, main class, and program arguments
  • sparkConf: SparkConf with YARN-specific configuration settings

Core Methods:

submitApplication(): ApplicationId

  • Submits application to YARN ResourceManager
  • Stages resources and creates ApplicationMaster
  • Returns YARN ApplicationId for tracking

run(): Unit

  • Submits application and optionally monitors execution
  • Blocks until completion if monitoring is enabled
  • Combines submitApplication() with optional monitoring

stop(): Unit

  • Stops client and performs cleanup
  • Cancels any ongoing monitoring
  • Releases staged resources

monitorApplication(appId, returnOnRunning, logApplicationReport): YarnAppReport

  • Monitors application state changes
  • returnOnRunning: If true, returns when app reaches RUNNING state
  • logApplicationReport: If true, logs periodic status updates
  • Returns final application report

getApplicationReport(appId: ApplicationId): ApplicationReport

  • Retrieves current YARN application report
  • Provides state, progress, and diagnostic information
  • Non-blocking status query

Usage Examples:

Basic Application Submission:

import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.SparkConf

val sparkConf = new SparkConf()
  .setAppName("MyYarnApp")
  .set("spark.yarn.queue", "default")
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "2")

val args = Array(
  "--jar", "/path/to/my-app.jar",
  "--class", "com.example.MyMainClass",
  "--arg", "arg1",
  "--arg", "arg2"
)

val clientArgs = new ClientArguments(args)
val client = new Client(clientArgs, sparkConf)

try {
  val applicationId = client.submitApplication()
  println(s"Application submitted with ID: $applicationId")
  
  // Monitor until completion
  val finalReport = client.monitorApplication(applicationId, returnOnRunning = false, logApplicationReport = true)
  println(s"Application finished with state: ${finalReport.appState}")
} finally {
  client.stop()
}

Submit and Monitor Separately:

import org.apache.hadoop.yarn.api.records.YarnApplicationState

val client = new Client(clientArgs, sparkConf)

// Submit application
val appId = client.submitApplication()

// Monitor periodically  
var continue = true
while (continue) {
  val report = client.getApplicationReport(appId)
  println(s"App state: ${report.getYarnApplicationState}")
  
  report.getYarnApplicationState match {
    case YarnApplicationState.FINISHED | 
         YarnApplicationState.FAILED | 
         YarnApplicationState.KILLED => continue = false
    case _ => 
      Thread.sleep(5000)
      continue = true
  }
}

client.stop()

ClientArguments

Command-line argument parser for YARN client configuration.

class ClientArguments(args: Array[String]) {
  var userJar: String = null
  var userClass: String = null  
  var primaryPyFile: String = null
  var pyFiles: String = null
  var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
  var propertiesFile: String = null
}

Key Fields:

  • userJar: Path to application JAR file
  • userClass: Main class name to execute
  • primaryPyFile: Primary Python file for PySpark applications
  • pyFiles: Additional Python files (comma-separated)
  • userArgs: Arguments to pass to user application
  • propertiesFile: Additional Spark properties file

Supported Arguments:

// JAR and class specification
--jar /path/to/app.jar
--class com.example.MainClass

// Python applications  
--primary-py-file /path/to/main.py
--py-files file1.py,file2.py

// Application arguments
--arg value1
--arg value2

// Additional configuration
--properties-file /path/to/spark.properties

Usage Example:

val args = Array(
  "--jar", "/path/to/analytics.jar",
  "--class", "com.company.analytics.DataProcessor", 
  "--arg", "input-path=/data/input",
  "--arg", "output-path=/data/output",
  "--arg", "date=2024-01-01",
  "--properties-file", "/etc/spark/analytics.properties"
)

val clientArgs = new ClientArguments(args)
println(s"Main class: ${clientArgs.userClass}")
println(s"Arguments: ${clientArgs.userArgs.mkString(", ")}")

ApplicationMaster

YARN ApplicationMaster implementation that manages Spark applications in both cluster and client modes.

class ApplicationMaster(args: ApplicationMasterArguments) {
  def getAttemptId(): ApplicationAttemptId
  final def run(): Int
  final def getDefaultFinalStatus(): FinalApplicationStatus
  final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit
  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit
  // Internal implementation - handles driver execution (cluster mode) or coordination (client mode)
}

object ApplicationMaster {
  def main(args: Array[String]): Unit
  private[spark] def sparkContextInitialized(sc: SparkContext): Unit
  private[spark] def getAttemptId(): ApplicationAttemptId
  private[spark] def getHistoryServerAddress(sparkConf: SparkConf, yarnConf: YarnConfiguration, appId: ApplicationId, attemptId: ApplicationAttemptId): String
}

Core Methods:

getAttemptId(): ApplicationAttemptId

  • Returns current YARN application attempt identifier
  • Used for tracking and logging purposes

run(): Int

  • Main execution method for ApplicationMaster
  • Returns exit code (0 for success, non-zero for failure)
  • Handles both cluster and client mode execution paths

getDefaultFinalStatus(): FinalApplicationStatus

  • Determines default final status based on execution outcome
  • Returns SUCCEEDED, FAILED, or KILLED status

unregister(status, diagnostics): Unit

  • Unregisters ApplicationMaster from YARN ResourceManager
  • Reports final status and diagnostic information
  • Performs cleanup operations

finish(status, code, msg): Unit

  • Finalizes ApplicationMaster execution
  • Sets exit code and status message
  • Initiates shutdown procedures

Companion Object Methods:

main(args: Array[String]): Unit

  • Entry point for ApplicationMaster execution
  • Parses command line arguments and starts ApplicationMaster

sparkContextInitialized(sc: SparkContext): Unit

  • Called when SparkContext is initialized in cluster mode
  • Sets up application-specific configuration and monitoring

Responsibilities:

  • Cluster Mode: Runs the Spark driver within the ApplicationMaster
  • Client Mode: Coordinates with external driver and manages executors
  • Resource Management: Requests and manages executor containers
  • Security: Handles credential renewal and security context
  • Monitoring: Reports application progress to YARN ResourceManager

ApplicationMasterArguments:

class ApplicationMasterArguments(args: Array[String]) {
  var userJar: String = null
  var userClass: String = null
  var primaryPyFile: String = null  
  var userArgs: String = null
  var propertiesFile: String = null
  // Additional AM-specific arguments
}

Resource Staging

Internal resource staging functionality for preparing application files.

// Key staging methods (internal to Client)
def prepareLocalResources(destDir: Path, pySparkArchives: Seq[String]): HashMap[String, LocalResource]
def createApplicationSubmissionContext(newApp: YarnClientApplication, containerContext: ContainerLaunchContext, appStagingBaseDir: String): ApplicationSubmissionContext

Staging Process:

  1. Creates staging directory in HDFS/distributed filesystem
  2. Uploads application JAR and dependencies
  3. Prepares executor launch scripts and configuration
  4. Sets appropriate file permissions and visibility
  5. Creates LocalResource references for YARN

Application Monitoring

Application state tracking and reporting functionality.

case class YarnAppReport(
  appState: YarnApplicationState,
  finalState: FinalApplicationStatus, 
  diagnostics: String
)

Application States:

  • NEW: Application submitted but not yet accepted
  • SUBMITTED: Application accepted by ResourceManager
  • RUNNING: ApplicationMaster started and running
  • FINISHED: Application completed successfully
  • FAILED: Application failed with errors
  • KILLED: Application terminated by user or system

Final States:

  • SUCCEEDED: Application completed successfully
  • FAILED: Application failed
  • KILLED: Application was terminated
  • UNDEFINED: Final state not yet determined

Usage Example:

def waitForCompletion(client: Client, appId: ApplicationId): Unit = {
  var finalReport: YarnAppReport = null
  
  do {
    Thread.sleep(2000)
    finalReport = client.monitorApplication(appId, returnOnRunning = false, logApplicationReport = true)
    
    finalReport.appState match {
      case YarnApplicationState.RUNNING =>
        println("Application is running...")
      case YarnApplicationState.FINISHED =>
        println(s"Application completed: ${finalReport.finalState}")
      case YarnApplicationState.FAILED =>
        println(s"Application failed: ${finalReport.diagnostics}")
      case YarnApplicationState.KILLED =>
        println("Application was killed")
      case _ =>
        println(s"Application state: ${finalReport.appState}")
    }
  } while (!finalReport.appState.toString.matches("FINISHED|FAILED|KILLED"))
}

Client Constants

object Client {
  // Constants
  val APP_JAR_NAME: String = "__app__.jar"
  val LOCAL_SCHEME: String = "local"  
  val SPARK_STAGING: String = ".sparkStaging"
  val ENV_DIST_CLASSPATH: String = "SPARK_DIST_CLASSPATH"
  
  // File permissions
  val STAGING_DIR_PERMISSION: FsPermission
  val APP_FILE_PERMISSION: FsPermission
  
  // Utility methods
  def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]): Unit
  def populateClasspath(conf: SparkConf, hadoopConf: Configuration, env: HashMap[String, String], extraClassPath: Option[String]): Unit
  def getUserClasspath(conf: SparkConf): Array[URI]
  def getClusterPath(conf: SparkConf, path: String): String
  def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean
  def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean
  def buildPath(components: String*): String
  def isLocalUri(uri: String): Boolean
  def createAppReport(report: ApplicationReport): YarnAppReport
  def createLibraryPathPrefix(libpath: String, conf: SparkConf): String
}

Key Constants:

  • APP_JAR_NAME: Standard name for uploaded application JAR
  • LOCAL_SCHEME: URI scheme for local filesystem resources
  • SPARK_STAGING: Default staging directory name in distributed filesystem

Error Handling

Common exceptions during application deployment:

// Application submission failures
throw new IOException("Failed to upload application resources")
throw new YarnException("ResourceManager rejected application")

// Invalid arguments
throw new IllegalArgumentException("Missing required argument: --jar")
throw new SparkException("Cannot specify both --jar and --primary-py-file")

// Resource staging failures  
throw new IOException("Failed to create staging directory")
throw new AccessControlException("Insufficient permissions for staging")

Integration Patterns

Configuration-driven Deployment:

val sparkConf = new SparkConf()
  .setAppName("DataPipeline")
  .set("spark.yarn.queue", "analytics")
  .set("spark.yarn.maxAppAttempts", "3")
  .set("spark.dynamicAllocation.enabled", "true")
  .set("spark.dynamicAllocation.minExecutors", "2")
  .set("spark.dynamicAllocation.maxExecutors", "100")

val client = new Client(clientArgs, sparkConf)
val appId = client.submitApplication()

Custom Resource Management:

val sparkConf = new SparkConf()
  .set("spark.executor.memory", "8g")
  .set("spark.executor.cores", "4") 
  .set("spark.executor.instances", "10")
  .set("spark.yarn.executor.memoryOverhead", "1g")
  .set("spark.yarn.am.memory", "2g")
  .set("spark.yarn.am.cores", "2")

Deploy Mode Considerations

AspectClient ModeCluster Mode
Driver LocationClient machineYARN cluster
Network AccessDriver needs cluster accessSelf-contained
Resource UsageClient machine resourcesCluster resources only
Failure HandlingClient failure kills appMore resilient
Interactive UseSuitable for shells/notebooksBatch processing