CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-yarn-2-12

Apache Spark YARN resource manager integration module that enables Spark applications to run on YARN clusters

Pending
Overview
Eval results
Files

application-management.mddocs/

Application Management

Core components for submitting and managing Spark applications on YARN clusters. These classes handle application submission, monitoring, lifecycle management, and interaction with the YARN ResourceManager.

Capabilities

Client

Primary entry point for submitting and monitoring Spark applications on YARN. Handles application submission to YARN ResourceManager and provides monitoring capabilities.

/**
 * Client for submitting and monitoring Spark applications on YARN
 * @param args Client arguments containing application details
 * @param sparkConf Spark configuration
 * @param rpcEnv RPC environment for communication
 */
private[spark] class Client(
  val args: ClientArguments,
  val sparkConf: SparkConf,
  val rpcEnv: RpcEnv
) {
  /** Submit application to YARN ResourceManager */
  def submitApplication(): Unit
  
  /** Submit and monitor application until completion */
  def run(): Unit
  
  /** Stop the client and cleanup resources */
  def stop(): Unit
  
  /** Get YARN application ID */
  def getApplicationId(): ApplicationId
  
  /** Monitor application status and return report */
  def monitorApplication(
    returnOnRunning: Boolean = false,
    logApplicationReport: Boolean = true,
    interval: Long = sparkConf.get(REPORT_INTERVAL)
  ): YarnAppReport
  
  /** Report launcher state for external monitoring */
  def reportLauncherState(state: SparkAppHandle.State): Unit
  
  /** Get application report from ResourceManager */
  def getApplicationReport(): ApplicationReport
}

Usage Example:

import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rpc.RpcEnv

val sparkConf = new SparkConf()
  .setAppName("MySparkApp")
  .set("spark.yarn.queue", "default")

val rpcEnv = RpcEnv.create("sparkYarnClient", "localhost", -1, sparkConf, 
  new SecurityManager(sparkConf))

val clientArgs = new ClientArguments(Array(
  "--jar", "/path/to/app.jar",
  "--class", "com.example.MainClass"
))

val client = new Client(clientArgs, sparkConf, rpcEnv)
client.submitApplication()
val appId = client.getApplicationId()
println(s"Application submitted with ID: $appId")

Client Companion Object

Utility methods and constants for YARN client operations.

object Client {
  /** Application JAR file name in YARN */
  val APP_JAR_NAME: String = "__app__.jar"
  
  /** Staging directory name */
  val SPARK_STAGING: String = ".sparkStaging"
  
  /** Localized configuration directory name */
  val LOCALIZED_CONF_DIR: String = "__spark_conf__"
  
  /** Localized library directory name */
  val LOCALIZED_LIB_DIR: String = "__spark_libs__"
  
  /** Localized Python files directory name */
  val LOCALIZED_PYTHON_DIR: String = "__pyfiles__"
  
  /**
   * Get user classpath URIs from Spark configuration
   * @param conf Spark configuration
   * @return Array of classpath URIs
   */
  def getUserClasspath(conf: SparkConf): Array[URI]
  
  /**
   * Get user classpath URLs
   * @param conf Spark configuration
   * @param useClusterPath Whether to use cluster-side paths
   * @return Array of classpath URLs
   */
  def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL]
  
  /**
   * Build file system path from components
   * @param components Path components to join
   * @return Combined path string
   */
  def buildPath(components: String*): String
  
  /**
   * Convert gateway path to cluster path
   * @param conf Spark configuration
   * @param path Original path
   * @return Cluster-side path
   */
  def getClusterPath(conf: SparkConf, path: String): String
}

ApplicationMaster

YARN ApplicationMaster for managing Spark application lifecycle within YARN containers. Coordinates between Spark driver and YARN ResourceManager.

/**
 * YARN ApplicationMaster for managing Spark application lifecycle
 * @param args ApplicationMaster arguments
 * @param sparkConf Spark configuration
 * @param yarnConf YARN configuration
 */
private[spark] class ApplicationMaster(
  args: ApplicationMasterArguments,
  sparkConf: SparkConf,
  yarnConf: YarnConfiguration
) {
  /** Main execution method for ApplicationMaster */
  def run(): Int
  
  /** Run in unmanaged mode (client mode) */
  def runUnmanaged(
    clientRpcEnv: RpcEnv,
    appAttemptId: ApplicationAttemptId,
    stagingDir: Path,
    cachedResourcesConf: SparkConf
  ): Unit
  
  /** Stop unmanaged ApplicationMaster */
  def stopUnmanaged(stagingDir: Path): Unit
  
  /** Finish application with final status */
  def finish(status: FinalApplicationStatus, code: Int, msg: String): Unit
  
  /** Unregister from ResourceManager */
  def unregister(status: FinalApplicationStatus, diagnostics: String): Unit
}

ApplicationMaster Companion Object

Static utilities and entry point for ApplicationMaster.

object ApplicationMaster {
  /** Main entry point for ApplicationMaster */
  def main(args: Array[String]): Unit
  
  /** Signal SparkContext initialization */
  def sparkContextInitialized(sc: SparkContext): Unit
  
  /** Get current application attempt ID */
  def getAttemptId(): ApplicationAttemptId
  
  /** Get Spark history server address */
  def getHistoryServerAddress(
    sparkConf: SparkConf,
    yarnConf: YarnConfiguration,
    appId: String,
    attemptId: String
  ): String
}

YarnClusterApplication

Application entry point for yarn-cluster mode deployment.

/**
 * Application entry point for yarn-cluster mode
 */
class YarnClusterApplication extends SparkApplication {
  /**
   * Start application in cluster mode
   * @param args Application arguments
   * @param conf Spark configuration
   */
  def start(args: Array[String], conf: SparkConf): Unit
}

Usage Example:

import org.apache.spark.deploy.yarn.YarnClusterApplication
import org.apache.spark.SparkConf

val conf = new SparkConf()
  .setAppName("MyClusterApp")
  .set("spark.yarn.queue", "default")

val app = new YarnClusterApplication()
app.start(Array("arg1", "arg2"), conf)

ExecutorLauncher

Entry point for client mode executor launcher.

object ExecutorLauncher {
  /** Main entry point for executor launcher */
  def main(args: Array[String]): Unit
}

Types

YarnAppReport

Container for YARN application status information.

/**
 * Container for YARN application status information
 * @param appState Current YARN application state
 * @param finalState Final application status
 * @param diagnostics Optional diagnostic messages
 */
case class YarnAppReport(
  appState: YarnApplicationState,
  finalState: FinalApplicationStatus,
  diagnostics: Option[String]
)

Argument Classes

/**
 * Argument parser for YARN client
 * @param args Command line arguments
 */
class ClientArguments(args: Array[String]) {
  /** User application JAR file */
  var userJar: String
  
  /** Main class to execute */
  var userClass: String
  
  /** Primary Python file for PySpark */
  var primaryPyFile: String
  
  /** Primary R file for SparkR */
  var primaryRFile: String
  
  /** User application arguments */
  var userArgs: ArrayBuffer[String]
  
  /** Enable verbose logging */
  var verbose: Boolean
}

/**
 * Argument parser for ApplicationMaster
 * @param args Command line arguments
 */
class ApplicationMasterArguments(args: Array[String]) {
  /** User application JAR file path */
  var userJar: String
  
  /** Main class name to execute */
  var userClass: String
  
  /** Primary Python file for PySpark */
  var primaryPyFile: String
  
  /** Primary R file for SparkR */
  var primaryRFile: String
  
  /** User application arguments */
  var userArgs: Seq[String]
  
  /** Spark properties file path */
  var propertiesFile: String
  
  /** Distributed cache configuration */
  var distCacheConf: String
}

Integration Patterns

Client Mode Integration

In client mode, the Spark driver runs on the local machine (outside YARN), while executors run on YARN.

// Client mode configuration
val conf = new SparkConf()
  .setMaster("yarn")
  .setDeployMode("client")
  .setAppName("ClientModeApp")

// SparkContext creation automatically uses YARN client mode
val sc = new SparkContext(conf)

Cluster Mode Integration

In cluster mode, both driver and executors run on YARN cluster.

// Cluster mode configuration
val conf = new SparkConf()
  .setMaster("yarn")
  .setDeployMode("cluster")
  .setAppName("ClusterModeApp")

// For cluster mode, typically submitted via spark-submit
// or programmatically via YarnClusterApplication

Programmatic Submission

import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.SparkConf
import org.apache.spark.rpc.RpcEnv

def submitApplication(jarPath: String, mainClass: String): ApplicationId = {
  val conf = new SparkConf()
    .setAppName("ProgrammaticSubmission")
    .set("spark.yarn.queue", "default")
    .set("spark.yarn.am.memory", "1g")

  val rpcEnv = RpcEnv.create("sparkYarnClient", "localhost", -1, conf,
    new SecurityManager(conf))

  val args = new ClientArguments(Array(
    "--jar", jarPath,
    "--class", mainClass
  ))

  val client = new Client(args, conf, rpcEnv)
  client.submitApplication()
  client.getApplicationId()
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-yarn-2-12

docs

application-management.md

configuration.md

index.md

resource-management.md

scheduler-integration.md

tile.json