or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-yarn_2.11@1.6.x
tile.json

tessl/maven-org-apache-spark--spark-yarn_2-11

tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0

Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads

index.mddocs/

Apache Spark YARN Module

Apache Spark YARN integration module that enables Spark applications to run on YARN (Yet Another Resource Negotiator) clusters, providing cluster manager functionality for distributed Spark computing workloads. The module includes scheduler backends, resource allocation and management components, client interfaces for submitting applications to YARN, container placement strategies, and delegation token management for secure authentication in enterprise environments.

Package Information

  • Package Name: spark-yarn_2.11
  • Package Type: maven
  • Language: Scala
  • Group ID: org.apache.spark
  • Artifact ID: spark-yarn_2.11
  • Installation: Add to Maven dependencies:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-yarn_2.11</artifactId>
      <version>1.6.3</version>
    </dependency>

Core Imports

import org.apache.spark.deploy.yarn.Client
import org.apache.spark.deploy.yarn.ApplicationMaster
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend
import org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend

Basic Usage

import org.apache.spark.SparkConf
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.hadoop.conf.Configuration

// Configure SparkConf for YARN deployment
val sparkConf = new SparkConf()
  .setAppName("MySparkApp")
  .set("spark.yarn.am.memory", "512m")
  .set("spark.yarn.am.cores", "1")
  .set("spark.executor.memory", "1g")
  .set("spark.executor.cores", "2")

// Set up client arguments for YARN submission
val args = Array(
  "--jar", "/path/to/my-app.jar",
  "--class", "com.example.MyMainClass",
  "--num-executors", "2",
  "--executor-memory", "1g",
  "--executor-cores", "2"
)

// Create and run YARN client
val clientArgs = new ClientArguments(args, sparkConf)
val hadoopConf = new Configuration()
val client = new Client(clientArgs, hadoopConf, sparkConf)

// Submit application to YARN
val applicationId = client.submitApplication()
client.run()

Architecture

Apache Spark YARN module is built around several key components:

  • Client System: Client class handles application submission, monitoring, and communication with YARN ResourceManager
  • Application Master: ApplicationMaster manages the Spark application lifecycle within YARN containers
  • Scheduler Backends: Different backends (YarnClientSchedulerBackend, YarnClusterSchedulerBackend) for client and cluster deployment modes
  • Resource Management: YarnAllocator handles dynamic container allocation and executor management
  • Security Integration: Token management and delegation for secure HDFS access in enterprise environments
  • Container Management: Executor launching, placement strategies, and distributed cache management

Capabilities

Application Submission and Management

Core functionality for submitting Spark applications to YARN clusters, including client-side submission, application monitoring, and resource management.

private[spark] class Client(
  args: ClientArguments,
  hadoopConf: Configuration,
  sparkConf: SparkConf
) extends Logging {
  def this(clientArgs: ClientArguments, spConf: SparkConf)
  def submitApplication(): ApplicationId
  def run(): Unit
  def monitorApplication(
    appId: ApplicationId,
    returnOnRunning: Boolean = false,
    logApplicationReport: Boolean = true
  ): (YarnApplicationState, FinalApplicationStatus)
  def stop(): Unit
  def getApplicationReport(appId: ApplicationId): ApplicationReport
}

Application Management

YARN Application Master

ApplicationMaster implementation that manages Spark application execution within YARN, handling resource allocation, executor management, and communication with ResourceManager.

class ApplicationMaster(
  args: ApplicationMasterArguments,
  client: YarnRMClient
) {
  def run(): Int
  def finish(status: FinalApplicationStatus, code: Int, msg: String): Unit
  def unregister(status: FinalApplicationStatus, diagnostics: String): Unit
}

object ApplicationMaster {
  def main(args: Array[String]): Unit
  def sparkContextInitialized(sc: SparkContext): Unit
  def sparkContextStopped(sc: SparkContext): Boolean
}

Application Master

Scheduler Backends

YARN-specific scheduler backends that integrate Spark's task scheduling with YARN's resource management for both client and cluster deployment modes.

class YarnClientSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
  def start(): Unit
  def stop(): Unit
  def applicationId(): String
}

class YarnClusterSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) {
  def start(): Unit
  def applicationId(): String
  def applicationAttemptId(): Option[String]
  def getDriverLogUrls: Option[Map[String, String]]
}

Scheduler Backends

Resource Allocation and Container Management

Dynamic resource allocation system that manages executor containers, handles container placement strategies, and provides optimal resource utilization on YARN clusters.

private[yarn] class YarnAllocator(
  driverUrl: String,
  driverRef: RpcEndpointRef,
  conf: Configuration,
  sparkConf: SparkConf,
  amClient: AMRMClient[ContainerRequest],
  appAttemptId: ApplicationAttemptId,
  args: ApplicationMasterArguments,
  securityMgr: SecurityManager
) extends Logging {
  def allocateResources(): Unit
  def killExecutor(executorId: String): Unit
  def updateResourceRequests(): Unit
  def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit
  def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit
  def getNumExecutorsRunning: Int
  def getNumExecutorsFailed: Int
}

Resource Management

Security and Authentication

Comprehensive security system for YARN deployments including Kerberos authentication, delegation token management, and secure communication with Hadoop services.

class YarnSparkHadoopUtil extends SparkHadoopUtil {
  def obtainTokensForNamenodes(
    paths: Set[Path],
    conf: Configuration,
    creds: Credentials
  ): Unit
  def obtainTokenForHiveMetastore(conf: Configuration): Option[Credentials]
  def startExecutorDelegationTokenRenewer(sparkConf: SparkConf): Unit
  def stopExecutorDelegationTokenRenewer(): Unit
  def getContainerId: ContainerId
}

class ExecutorDelegationTokenUpdater(
  sparkConf: SparkConf,
  hadoopConf: Configuration
) {
  def updateCredentialsIfRequired(): Unit
  def stop(): Unit
}

Security and Authentication

Configuration and Utilities

YARN-specific configuration properties and utility functions for environment setup, command building, and integration with Hadoop ecosystem components.

object YarnSparkHadoopUtil {
  val MEMORY_OVERHEAD_FACTOR: Double
  val MEMORY_OVERHEAD_MIN: Int
  val ANY_HOST: String
  val DEFAULT_NUMBER_EXECUTORS: Int
  
  def get: YarnSparkHadoopUtil
  def addPathToEnvironment(
    env: HashMap[String, String],
    key: String,
    value: String,
    classPathSeparator: String
  ): Unit
  def getInitialTargetExecutorNumber(
    conf: SparkConf,
    numExecutors: Int
  ): Int
}

Configuration and Utilities

Configuration Properties

Key YARN-specific Spark configuration properties:

  • spark.yarn.queue: YARN queue name to submit the application to
  • spark.yarn.am.memory: Amount of memory to use for the ApplicationMaster
  • spark.yarn.am.cores: Number of cores to use for the ApplicationMaster
  • spark.yarn.executor.memoryOverhead: Amount of non-heap memory per executor
  • spark.yarn.maxAppAttempts: Maximum number of application attempts
  • spark.yarn.submit.waitAppCompletion: Whether to wait for completion
  • spark.yarn.principal: Kerberos principal for secure clusters
  • spark.yarn.keytab: Kerberos keytab file path
  • spark.yarn.tags: Comma-separated list of YARN application tags

For complete configuration reference, see Configuration and Utilities.

Entry Points

Main entry points for interacting with YARN functionality:

  1. Client.main() - Submit Spark applications to YARN
  2. ApplicationMaster.main() - ApplicationMaster process entry point
  3. ExecutorLauncher.main() - Client mode ApplicationMaster entry point
  4. YarnSparkHadoopUtil.get - Access YARN-specific utilities