Client API for submitting and managing YARN applications programmatically. This module provides comprehensive support for both client and cluster deployment modes, including application lifecycle management and monitoring capabilities.
Main client interface for YARN application submission and management. Handles resource staging, application submission, and optional monitoring.
class Client(args: ClientArguments, sparkConf: SparkConf) {
def submitApplication(): ApplicationId
def run(): Unit
def stop(): Unit
def monitorApplication(appId: ApplicationId, returnOnRunning: Boolean, logApplicationReport: Boolean): YarnAppReport
def getApplicationReport(appId: ApplicationId): ApplicationReport
def reportLauncherState(state: SparkAppHandle.State): Unit
def cleanupStagingDir(appId: ApplicationId): Unit
def createApplicationSubmissionContext(newApp: YarnClientApplication, containerContext: ContainerLaunchContext, appStagingBaseDir: String): ApplicationSubmissionContext
def prepareLocalResources(destDir: Path, pySparkArchives: Seq[String]): HashMap[String, LocalResource]
def copyFileToRemote(destDir: Path, srcPath: Path, replication: Short, symlinkCache: HashMap[String, Path] = null): Path
def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit
}Constructor Parameters:
args: ClientArguments containing application jar, main class, and program argumentssparkConf: SparkConf with YARN-specific configuration settingsCore Methods:
submitApplication(): ApplicationId
run(): Unit
stop(): Unit
monitorApplication(appId, returnOnRunning, logApplicationReport): YarnAppReport
returnOnRunning: If true, returns when app reaches RUNNING statelogApplicationReport: If true, logs periodic status updatesgetApplicationReport(appId: ApplicationId): ApplicationReport
Usage Examples:
Basic Application Submission:
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.SparkConf
val sparkConf = new SparkConf()
.setAppName("MyYarnApp")
.set("spark.yarn.queue", "default")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
val args = Array(
"--jar", "/path/to/my-app.jar",
"--class", "com.example.MyMainClass",
"--arg", "arg1",
"--arg", "arg2"
)
val clientArgs = new ClientArguments(args)
val client = new Client(clientArgs, sparkConf)
try {
val applicationId = client.submitApplication()
println(s"Application submitted with ID: $applicationId")
// Monitor until completion
val finalReport = client.monitorApplication(applicationId, returnOnRunning = false, logApplicationReport = true)
println(s"Application finished with state: ${finalReport.appState}")
} finally {
client.stop()
}Submit and Monitor Separately:
import org.apache.hadoop.yarn.api.records.YarnApplicationState
val client = new Client(clientArgs, sparkConf)
// Submit application
val appId = client.submitApplication()
// Monitor periodically
var continue = true
while (continue) {
val report = client.getApplicationReport(appId)
println(s"App state: ${report.getYarnApplicationState}")
report.getYarnApplicationState match {
case YarnApplicationState.FINISHED |
YarnApplicationState.FAILED |
YarnApplicationState.KILLED => continue = false
case _ =>
Thread.sleep(5000)
continue = true
}
}
client.stop()Command-line argument parser for YARN client configuration.
class ClientArguments(args: Array[String]) {
var userJar: String = null
var userClass: String = null
var primaryPyFile: String = null
var pyFiles: String = null
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var propertiesFile: String = null
}Key Fields:
userJar: Path to application JAR fileuserClass: Main class name to executeprimaryPyFile: Primary Python file for PySpark applicationspyFiles: Additional Python files (comma-separated)userArgs: Arguments to pass to user applicationpropertiesFile: Additional Spark properties fileSupported Arguments:
// JAR and class specification
--jar /path/to/app.jar
--class com.example.MainClass
// Python applications
--primary-py-file /path/to/main.py
--py-files file1.py,file2.py
// Application arguments
--arg value1
--arg value2
// Additional configuration
--properties-file /path/to/spark.propertiesUsage Example:
val args = Array(
"--jar", "/path/to/analytics.jar",
"--class", "com.company.analytics.DataProcessor",
"--arg", "input-path=/data/input",
"--arg", "output-path=/data/output",
"--arg", "date=2024-01-01",
"--properties-file", "/etc/spark/analytics.properties"
)
val clientArgs = new ClientArguments(args)
println(s"Main class: ${clientArgs.userClass}")
println(s"Arguments: ${clientArgs.userArgs.mkString(", ")}")YARN ApplicationMaster implementation that manages Spark applications in both cluster and client modes.
class ApplicationMaster(args: ApplicationMasterArguments) {
def getAttemptId(): ApplicationAttemptId
final def run(): Int
final def getDefaultFinalStatus(): FinalApplicationStatus
final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit
final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit
// Internal implementation - handles driver execution (cluster mode) or coordination (client mode)
}
object ApplicationMaster {
def main(args: Array[String]): Unit
private[spark] def sparkContextInitialized(sc: SparkContext): Unit
private[spark] def getAttemptId(): ApplicationAttemptId
private[spark] def getHistoryServerAddress(sparkConf: SparkConf, yarnConf: YarnConfiguration, appId: ApplicationId, attemptId: ApplicationAttemptId): String
}Core Methods:
getAttemptId(): ApplicationAttemptId
run(): Int
getDefaultFinalStatus(): FinalApplicationStatus
unregister(status, diagnostics): Unit
finish(status, code, msg): Unit
Companion Object Methods:
main(args: Array[String]): Unit
sparkContextInitialized(sc: SparkContext): Unit
Responsibilities:
ApplicationMasterArguments:
class ApplicationMasterArguments(args: Array[String]) {
var userJar: String = null
var userClass: String = null
var primaryPyFile: String = null
var userArgs: String = null
var propertiesFile: String = null
// Additional AM-specific arguments
}Internal resource staging functionality for preparing application files.
// Key staging methods (internal to Client)
def prepareLocalResources(destDir: Path, pySparkArchives: Seq[String]): HashMap[String, LocalResource]
def createApplicationSubmissionContext(newApp: YarnClientApplication, containerContext: ContainerLaunchContext, appStagingBaseDir: String): ApplicationSubmissionContextStaging Process:
Application state tracking and reporting functionality.
case class YarnAppReport(
appState: YarnApplicationState,
finalState: FinalApplicationStatus,
diagnostics: String
)Application States:
NEW: Application submitted but not yet acceptedSUBMITTED: Application accepted by ResourceManagerRUNNING: ApplicationMaster started and runningFINISHED: Application completed successfullyFAILED: Application failed with errorsKILLED: Application terminated by user or systemFinal States:
SUCCEEDED: Application completed successfullyFAILED: Application failedKILLED: Application was terminatedUNDEFINED: Final state not yet determinedUsage Example:
def waitForCompletion(client: Client, appId: ApplicationId): Unit = {
var finalReport: YarnAppReport = null
do {
Thread.sleep(2000)
finalReport = client.monitorApplication(appId, returnOnRunning = false, logApplicationReport = true)
finalReport.appState match {
case YarnApplicationState.RUNNING =>
println("Application is running...")
case YarnApplicationState.FINISHED =>
println(s"Application completed: ${finalReport.finalState}")
case YarnApplicationState.FAILED =>
println(s"Application failed: ${finalReport.diagnostics}")
case YarnApplicationState.KILLED =>
println("Application was killed")
case _ =>
println(s"Application state: ${finalReport.appState}")
}
} while (!finalReport.appState.toString.matches("FINISHED|FAILED|KILLED"))
}object Client {
// Constants
val APP_JAR_NAME: String = "__app__.jar"
val LOCAL_SCHEME: String = "local"
val SPARK_STAGING: String = ".sparkStaging"
val ENV_DIST_CLASSPATH: String = "SPARK_DIST_CLASSPATH"
// File permissions
val STAGING_DIR_PERMISSION: FsPermission
val APP_FILE_PERMISSION: FsPermission
// Utility methods
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]): Unit
def populateClasspath(conf: SparkConf, hadoopConf: Configuration, env: HashMap[String, String], extraClassPath: Option[String]): Unit
def getUserClasspath(conf: SparkConf): Array[URI]
def getClusterPath(conf: SparkConf, path: String): String
def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean
def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean
def buildPath(components: String*): String
def isLocalUri(uri: String): Boolean
def createAppReport(report: ApplicationReport): YarnAppReport
def createLibraryPathPrefix(libpath: String, conf: SparkConf): String
}Key Constants:
APP_JAR_NAME: Standard name for uploaded application JARLOCAL_SCHEME: URI scheme for local filesystem resourcesSPARK_STAGING: Default staging directory name in distributed filesystemCommon exceptions during application deployment:
// Application submission failures
throw new IOException("Failed to upload application resources")
throw new YarnException("ResourceManager rejected application")
// Invalid arguments
throw new IllegalArgumentException("Missing required argument: --jar")
throw new SparkException("Cannot specify both --jar and --primary-py-file")
// Resource staging failures
throw new IOException("Failed to create staging directory")
throw new AccessControlException("Insufficient permissions for staging")Configuration-driven Deployment:
val sparkConf = new SparkConf()
.setAppName("DataPipeline")
.set("spark.yarn.queue", "analytics")
.set("spark.yarn.maxAppAttempts", "3")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", "2")
.set("spark.dynamicAllocation.maxExecutors", "100")
val client = new Client(clientArgs, sparkConf)
val appId = client.submitApplication()Custom Resource Management:
val sparkConf = new SparkConf()
.set("spark.executor.memory", "8g")
.set("spark.executor.cores", "4")
.set("spark.executor.instances", "10")
.set("spark.yarn.executor.memoryOverhead", "1g")
.set("spark.yarn.am.memory", "2g")
.set("spark.yarn.am.cores", "2")| Aspect | Client Mode | Cluster Mode |
|---|---|---|
| Driver Location | Client machine | YARN cluster |
| Network Access | Driver needs cluster access | Self-contained |
| Resource Usage | Client machine resources | Cluster resources only |
| Failure Handling | Client failure kills app | More resilient |
| Interactive Use | Suitable for shells/notebooks | Batch processing |