or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-deployment.mdcluster-management.mdcommand-building.mdconfiguration-system.mdextension-points.mdindex.mdresource-management.mdsecurity-integration.mdyarn-shuffle-service.md
tile.json

tessl/maven-org-apache-spark--spark-yarn_2-11

Apache Spark YARN resource manager integration component that enables Spark applications to run on Hadoop YARN clusters

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-yarn_2.11@2.4.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-yarn_2-11@2.4.0

index.mddocs/

Apache Spark YARN

Apache Spark YARN resource manager integration component that enables Spark applications to run on Hadoop YARN clusters. This module provides cluster managers, schedulers, and backends specifically designed for YARN environments, enabling seamless integration between Spark's distributed computing capabilities and YARN's resource management.

Package Information

  • Package Name: spark-yarn_2.11
  • Package Type: Maven
  • Language: Scala
  • Installation:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-yarn_2.11</artifactId>
      <version>2.4.8</version>
    </dependency>

Core Imports

import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAllocator}
import org.apache.spark.scheduler.cluster.{YarnClusterManager, SchedulerExtensionService, SchedulerExtensionServiceBinding}
import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationAttemptId, Container, ContainerRequest}
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.spark.util.Clock

For Java shuffle service integration:

import org.apache.spark.network.yarn.YarnShuffleService;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;

Basic Usage

Client Mode Deployment

import org.apache.spark.{SparkConf, SparkContext}

// Configure Spark for YARN client mode
val conf = new SparkConf()
  .setAppName("MyApp")
  .setMaster("yarn")
  .set("spark.submit.deployMode", "client")
  .set("spark.yarn.queue", "default")

val sc = new SparkContext(conf)
// Use SparkContext normally
sc.stop()

Cluster Mode Deployment

import org.apache.spark.{SparkConf, SparkContext}

// Configure Spark for YARN cluster mode  
val conf = new SparkConf()
  .setAppName("MyApp")
  .setMaster("yarn")
  .set("spark.submit.deployMode", "cluster")
  .set("spark.yarn.queue", "production")

val sc = new SparkContext(conf)
// Use SparkContext normally
sc.stop()

Programmatic Application Submission

import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.SparkConf

val sparkConf = new SparkConf()
  .setAppName("MyYarnApp")
  .set("spark.yarn.queue", "default")

val args = Array(
  "--jar", "/path/to/my-app.jar",
  "--class", "com.example.MyMainClass"
)

val clientArgs = new ClientArguments(args)
val client = new Client(clientArgs, sparkConf)

val applicationId = client.submitApplication()
println(s"Application submitted with ID: $applicationId")

Architecture

The YARN integration follows Spark's pluggable cluster manager architecture:

  • External Cluster Manager: YarnClusterManager registered as service provider
  • Scheduler Backends: Separate implementations for client (YarnClientSchedulerBackend) and cluster (YarnClusterSchedulerBackend) modes
  • Application Master: Handles both cluster mode driver execution and client mode coordination
  • Resource Management: YarnAllocator manages container allocation and executor lifecycle
  • Security Integration: Credential providers and delegation token management for secure clusters

Capabilities

Cluster Management

Core cluster manager integration that enables Spark to run on YARN clusters through the external cluster manager SPI.

class YarnClusterManager extends ExternalClusterManager {
  def canCreate(masterURL: String): Boolean
  def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
  def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend
  def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}

Cluster Management

Application Deployment

Client API for submitting and managing YARN applications programmatically, supporting both client and cluster deployment modes.

class Client(args: ClientArguments, sparkConf: SparkConf) {
  def submitApplication(): ApplicationId
  def run(): Unit
  def stop(): Unit
  def monitorApplication(appId: ApplicationId, returnOnRunning: Boolean, logApplicationReport: Boolean): YarnAppReport
  def getApplicationReport(appId: ApplicationId): ApplicationReport
}

Application Deployment

Resource Management

Container allocation and executor lifecycle management within YARN resource constraints and scheduling policies.

class YarnAllocator(
  driverUrl: String,
  driverRef: RpcEndpointRef,
  conf: YarnConfiguration,
  sparkConf: SparkConf,
  amClient: AMRMClient[ContainerRequest],
  appAttemptId: ApplicationAttemptId,
  securityMgr: SecurityManager,
  localResources: Map[String, LocalResource],
  resolver: SparkRackResolver,
  clock: Clock = new SystemClock()
) {
  def getNumExecutorsRunning: Int
  def getNumExecutorsFailed: Int
  def numContainersPendingAllocate: Int
  def allocateResources(): Unit
  def killExecutor(executorId: String): Unit
  def stop(): Unit
}

Resource Management

YARN Shuffle Service

External shuffle service that runs on YARN NodeManagers to provide shuffle data management for Spark applications, improving executor stability and enabling dynamic allocation.

public class YarnShuffleService extends AuxiliaryService {
  protected void serviceInit(Configuration conf) throws Exception;
  protected void serviceStart() throws Exception;
  protected void serviceStop() throws Exception;
  public void initializeApplication(ApplicationInitializationContext context) throws Exception;
  public void stopApplication(ApplicationTerminationContext context) throws Exception;
}

YARN Shuffle Service

Security Integration

Security credential management and delegation token handling for secure YARN clusters with Kerberos authentication.

trait ServiceCredentialProvider {
  def serviceName: String
  def credentialsRequired(hadoopConf: Configuration): Boolean
  def obtainCredentials(hadoopConf: Configuration, sparkConf: SparkConf, creds: Credentials): Option[Long]
}

Security Integration

Extension Points

Pluggable extension system for custom scheduler services and functionality in YARN deployments.

trait SchedulerExtensionService {
  def start(binding: SchedulerExtensionServiceBinding): Unit
  def stop(): Unit
}

Extension Points

Configuration System

YARN-specific configuration options for controlling resource allocation, security, and deployment behavior.

// Key configuration entries
val APPLICATION_TAGS: ConfigEntry[Seq[String]]
val MAX_APP_ATTEMPTS: ConfigEntry[Int] 
val QUEUE_NAME: ConfigEntry[String]
val SPARK_ARCHIVE: OptionalConfigEntry[String]
val USER_CLASS_PATH_FIRST: ConfigEntry[Boolean]

Configuration System

Command Building Utilities

YARN-specific utilities for building container launch commands and managing Spark distribution.

object YarnSparkHadoopUtil {
  def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit
  val MEMORY_OVERHEAD_FACTOR: Double
  val MEMORY_OVERHEAD_MIN: Long  
  val RM_REQUEST_PRIORITY: Priority
}

object YarnCommandBuilderUtils {
  def quoteForBatchScript(arg: String): String
  def findJarsDir(sparkHome: String): String
}

Command Building Utilities

Common Integration Patterns

Custom Credential Provider

import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider

class MyCredentialProvider extends ServiceCredentialProvider {
  override def serviceName: String = "my-service"
  
  override def credentialsRequired(hadoopConf: Configuration): Boolean = {
    // Check if credentials are needed
    hadoopConf.get("my.service.enabled", "false").toBoolean
  }
  
  override def obtainCredentials(
      hadoopConf: Configuration, 
      sparkConf: SparkConf, 
      creds: Credentials): Option[Long] = {
    // Obtain and add credentials
    // Return renewal time in milliseconds, or None if no renewal needed
    None
  }
}

Custom Scheduler Extension

import org.apache.spark.scheduler.cluster.SchedulerExtensionService

class MySchedulerExtension extends SchedulerExtensionService {
  override def start(binding: SchedulerExtensionServiceBinding): Unit = {
    // Initialize extension with access to scheduler components
  }
  
  override def stop(): Unit = {
    // Cleanup extension resources
  }
}

Error Handling

Common exceptions thrown by YARN integration:

  • SparkException: Thrown for unsupported deploy modes or configuration errors
  • IOException: File system operations during staging and cleanup
  • YarnException: YARN-specific errors during application submission or management
  • SecurityException: Credential or authentication failures in secure clusters

Integration Requirements

  • Hadoop/YARN: Compatible Hadoop YARN cluster (2.6+)
  • Scala Version: Built for Scala 2.11 binary compatibility
  • Spark Core: Requires matching spark-core_2.11 dependency
  • Security: Optional Kerberos configuration for secure clusters