or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-deployment.mdcluster-management.mdcommand-building.mdconfiguration-system.mdextension-points.mdindex.mdresource-management.mdsecurity-integration.mdyarn-shuffle-service.md
tile.json

cluster-management.mddocs/

Cluster Management

Core cluster manager integration that enables Spark to run on YARN clusters through the external cluster manager SPI. This module handles the lifecycle of YARN-based Spark applications and provides appropriate schedulers and backends for different deployment modes.

Capabilities

YarnClusterManager

Main entry point for YARN cluster management, registered as an ExternalClusterManager service provider. Automatically activated when master = "yarn" is specified in SparkConf.

class YarnClusterManager extends ExternalClusterManager {
  def canCreate(masterURL: String): Boolean
  def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler  
  def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend
  def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}

Parameters:

  • masterURL: Must be "yarn" for this cluster manager to be selected
  • sc: SparkContext instance for the application
  • scheduler: TaskScheduler instance to be initialized
  • backend: SchedulerBackend instance to be initialized

Usage Example:

import org.apache.spark.{SparkConf, SparkContext}

// YarnClusterManager is automatically selected when master is "yarn"
val conf = new SparkConf()
  .setAppName("MyApp")
  .setMaster("yarn")  // This triggers YarnClusterManager selection

val sc = new SparkContext(conf)

Task Schedulers

YARN-specific task schedulers that provide rack awareness and optimal task placement within YARN clusters.

class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
  override def getRackForHost(hostPort: String): Option[String]
}

class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc)

YarnScheduler:

  • Used in client deployment mode
  • Provides YARN rack awareness for better locality
  • Extends standard TaskSchedulerImpl with YARN-specific optimizations

YarnClusterScheduler:

  • Used in cluster deployment mode
  • Inherits all YarnScheduler functionality
  • Identical behavior to YarnScheduler in current implementation

Usage Example:

// Schedulers are created automatically based on deploy mode
val conf = new SparkConf()
  .setMaster("yarn")
  .set("spark.submit.deployMode", "client")  // Uses YarnScheduler
  // .set("spark.submit.deployMode", "cluster") // Uses YarnClusterScheduler

Scheduler Backends

YARN-specific scheduler backends that manage the communication between Spark and YARN ResourceManager.

abstract class YarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext) 
  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
  
  def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit
  override def start(): Unit
  override def stop(): Unit
  override def minRegisteredRatio: Double  // Returns 0.8
}

Common Methods:

  • bindToYarn: Associates backend with YARN application and attempt IDs
  • start(): Initializes the backend and begins resource management
  • stop(): Cleanly shuts down the backend and releases resources
  • minRegisteredRatio: Returns 0.8 (80%) minimum executor registration ratio for YARN

YarnClientSchedulerBackend

class YarnClientSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext) 
  extends YarnSchedulerBackend(scheduler, sc) {
  
  override def start(): Unit
  def waitForApplication(): Unit
  override def stop(): Unit
}

Client Mode Specific:

  • start(): Creates and submits YARN application via Client
  • waitForApplication(): Blocks until application reaches RUNNING state
  • Used when spark.submit.deployMode = "client"

Usage Example:

import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf()
  .setMaster("yarn")
  .set("spark.submit.deployMode", "client")
  .set("spark.yarn.queue", "default")

// YarnClientSchedulerBackend is created automatically
val sc = new SparkContext(conf)

YarnClusterSchedulerBackend

class YarnClusterSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext) 
  extends YarnSchedulerBackend(scheduler, sc) {
  
  override def start(): Unit
  def getDriverLogUrls: Option[Map[String, String]]
  override def stop(): Unit
}

Cluster Mode Specific:

  • start(): Binds to existing YARN application (running inside ApplicationMaster)
  • getDriverLogUrls: Returns driver log URLs from YARN for monitoring
  • Used when spark.submit.deployMode = "cluster"

Usage Example:

import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf()
  .setMaster("yarn")
  .set("spark.submit.deployMode", "cluster")
  .set("spark.yarn.queue", "production")

// YarnClusterSchedulerBackend is created automatically
val sc = new SparkContext(conf)

Service Registration

The YARN cluster manager is automatically registered through Java's ServiceLoader mechanism:

META-INF/services/org.apache.spark.scheduler.ExternalClusterManager:

org.apache.spark.scheduler.cluster.YarnClusterManager

This enables automatic discovery when master = "yarn" without requiring explicit class registration.

Deploy Mode Differences

ComponentClient ModeCluster Mode
TaskSchedulerYarnSchedulerYarnClusterScheduler
SchedulerBackendYarnClientSchedulerBackendYarnClusterSchedulerBackend
Driver LocationClient machineYARN ApplicationMaster
Application SubmissionClient submits to YARNPre-submitted by spark-submit

Error Handling

Common exceptions in cluster management:

// Unsupported deploy mode
throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")

// Backend initialization failure  
throw new SparkException("Failed to initialize YARN scheduler backend")

Integration Points

  • SparkContext: Automatic cluster manager selection based on master URL
  • Configuration: Driven by spark.submit.deployMode and spark.master settings
  • Resource Management: Integrates with YarnAllocator for container management
  • Security: Coordinates with security managers and credential providers