or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cluster-deployment.mdcluster-management.mdcoarse-grained-scheduling.mdfine-grained-scheduling.mdindex.mdresource-configuration.md
tile.json

coarse-grained-scheduling.mddocs/

Coarse-Grained Scheduling

Long-running executor mode that holds onto Mesos resources for the duration of the Spark job, providing lower latency and more predictable performance.

Capabilities

MesosCoarseGrainedSchedulerBackend

A SchedulerBackend that runs tasks on Mesos using "coarse-grained" tasks, holding onto each Mesos node for the duration of the Spark job.

/**
 * Coarse-grained Mesos scheduler backend
 */
private[spark] class MesosCoarseGrainedSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext,
  master: String,
  securityManager: SecurityManager
) extends CoarseGrainedSchedulerBackend 
    with org.apache.mesos.Scheduler 
    with MesosSchedulerUtils {

  /**
   * Starts the scheduler backend and registers with Mesos master
   */
  def start(): Unit
  
  /**
   * Stops the scheduler backend gracefully
   */
  def stop(): Unit
  
  /**
   * Returns the application ID (framework ID)
   * @return Framework ID or warning if not initialized
   */
  def applicationId(): String
  
  /**
   * Checks if sufficient resources are registered
   * @return true if total cores >= required cores * minimum ratio
   */
  def sufficientResourcesRegistered(): Boolean
  
  /**
   * Requests a specific total number of executors
   * @param requestedTotal Target number of executors
   * @return Future[Boolean] indicating success
   */
  def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean]
  
  /**
   * Kills the specified executors
   * @param executorIds Executor IDs to kill
   * @return Future[Boolean] indicating success
   */
  def doKillExecutors(executorIds: Seq[String]): Future[Boolean]
  
  /**
   * Generates a new unique Mesos task ID
   * @return String task ID
   */
  def newMesosTaskId(): String
  
  /**
   * Creates command info for launching an executor
   * @param offer Mesos resource offer
   * @param numCores Number of cores for the executor
   * @param taskId Task ID for the executor
   * @return CommandInfo for Mesos task
   */
  def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo
}

Mesos Scheduler Interface

Implementation of the Mesos Scheduler interface methods for handling cluster events.

/**
 * Called when framework is registered with Mesos master
 */
def registered(
  driver: org.apache.mesos.SchedulerDriver,
  frameworkId: FrameworkID,
  masterInfo: MasterInfo
): Unit

/**
 * Handles resource offers from Mesos master
 * @param driver Scheduler driver
 * @param offers List of resource offers
 */
def resourceOffers(
  driver: org.apache.mesos.SchedulerDriver, 
  offers: JList[Offer]
): Unit

/**
 * Handles task status updates
 * @param driver Scheduler driver
 * @param status Task status update
 */
def statusUpdate(
  driver: org.apache.mesos.SchedulerDriver, 
  status: TaskStatus
): Unit

/**
 * Handles scheduler errors
 * @param driver Scheduler driver
 * @param message Error message
 */
def error(driver: org.apache.mesos.SchedulerDriver, message: String): Unit

/**
 * Handles offer rescission
 */
def offerRescinded(driver: org.apache.mesos.SchedulerDriver, offerId: OfferID): Unit

/**
 * Handles disconnection from Mesos master
 */
def disconnected(driver: org.apache.mesos.SchedulerDriver): Unit

/**
 * Handles reregistration with Mesos master
 */
def reregistered(driver: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo): Unit

/**
 * Handles slave loss events
 */
def slaveLost(driver: org.apache.mesos.SchedulerDriver, slaveId: SlaveID): Unit

/**
 * Handles executor loss events
 */
def executorLost(
  driver: org.apache.mesos.SchedulerDriver, 
  executorId: ExecutorID, 
  slaveId: SlaveID, 
  status: Int
): Unit

/**
 * Handles framework messages (no-op in current implementation)
 */
def frameworkMessage(
  driver: org.apache.mesos.SchedulerDriver,
  executorId: ExecutorID,
  slaveId: SlaveID, 
  message: Array[Byte]
): Unit

Resource Management

Methods for managing Mesos resource offers and executor allocation.

/**
 * Builds Mesos tasks from resource offers
 * @param offers Available resource offers
 * @return Map from OfferID to list of tasks to launch
 */
private def buildMesosTasks(offers: mutable.Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]]

/**
 * Checks if a task can be launched on the given slave
 * @param slaveId Slave identifier
 * @param resources Available resources
 * @return true if task can be launched
 */
private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean

/**
 * Determines executor cores based on offer and configuration
 * @param offerCPUs Available CPUs in offer
 * @return Number of cores for executor
 */
private def executorCores(offerCPUs: Int): Int

/**
 * Partitions task resources from available resources
 * @param resources Available resources
 * @param taskCPUs Required CPU cores
 * @param taskMemory Required memory
 * @param taskGPUs Required GPUs
 * @return Tuple of (remaining resources, resources to use)
 */
private def partitionTaskResources(
  resources: JList[Resource],
  taskCPUs: Int,
  taskMemory: Int, 
  taskGPUs: Int
): (List[Resource], List[Resource])

Usage Examples:

import org.apache.spark.{SparkConf, SparkContext}

// Basic coarse-grained setup
val conf = new SparkConf()
  .setAppName("CoarseGrainedApp")
  .setMaster("mesos://master:5050")
  .set("spark.mesos.coarse", "true")        // Enable coarse-grained mode
  .set("spark.executor.cores", "2")         // Cores per executor
  .set("spark.executor.memory", "2g")       // Memory per executor
  .set("spark.cores.max", "16")             // Maximum total cores

val sc = new SparkContext(conf)

// Advanced configuration
val advancedConf = new SparkConf()
  .setAppName("AdvancedCoarseApp")
  .setMaster("mesos://master:5050")
  .set("spark.mesos.coarse", "true")
  .set("spark.mesos.constraints", "os:centos7")         // Constraint matching
  .set("spark.mesos.role", "spark")                     // Mesos role
  .set("spark.mesos.principal", "spark-principal")      // Authentication
  .set("spark.mesos.executor.docker.image", "spark:latest")  // Docker image
  .set("spark.mesos.fetcherCache.enable", "true")       // Enable fetcher cache
  .set("spark.mesos.gpus.max", "4")                     // GPU support

// Dynamic executor scaling
val dynamicConf = new SparkConf()
  .setAppName("DynamicApp")
  .setMaster("mesos://master:5050")
  .set("spark.mesos.coarse", "true")
  .set("spark.dynamicAllocation.enabled", "true")       // Enable dynamic allocation
  .set("spark.dynamicAllocation.minExecutors", "1")
  .set("spark.dynamicAllocation.maxExecutors", "10")
  .set("spark.dynamicAllocation.initialExecutors", "3")

Configuration Properties

Key configuration properties for coarse-grained mode:

  • spark.mesos.coarse - Set to "true" to enable coarse-grained mode
  • spark.executor.cores - Number of cores per executor
  • spark.executor.memory - Memory per executor
  • spark.cores.max - Maximum total cores to acquire
  • spark.mesos.constraints - Mesos slave constraints
  • spark.mesos.coarse.shutdownTimeout - Executor shutdown timeout
  • spark.mesos.executor.docker.image - Docker image for executors
  • spark.mesos.fetcherCache.enable - Enable Mesos fetcher cache
  • spark.mesos.gpus.max - Maximum GPUs to acquire

The coarse-grained scheduler backend provides predictable resource allocation and lower task launch latency by maintaining long-running executors throughout the application lifecycle.