or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cluster-deployment.mdcluster-management.mdcoarse-grained-scheduling.mdfine-grained-scheduling.mdindex.mdresource-configuration.md
tile.json

fine-grained-scheduling.mddocs/

Fine-Grained Scheduling

Dynamic resource allocation mode where each Spark task maps to a separate Mesos task, allowing for efficient resource sharing across multiple applications.

Capabilities

MesosFineGrainedSchedulerBackend

A SchedulerBackend for running fine-grained tasks on Mesos, where each Spark task is mapped to a separate Mesos task for maximum resource sharing.

/**
 * Fine-grained Mesos scheduler backend
 */
class MesosFineGrainedSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext,
  master: String
) extends SchedulerBackend 
    with org.apache.mesos.Scheduler 
    with MesosSchedulerUtils {

  /**
   * Starts the scheduler backend
   */
  def start(): Unit
  
  /**
   * Stops the scheduler backend
   */
  def stop(): Unit
  
  /**
   * Revives offers from Mesos master
   */
  def reviveOffers(): Unit
  
  /**
   * Kills a specific task
   * @param taskId Task ID to kill
   * @param executorId Executor ID (slave ID in fine-grained mode)
   * @param interruptThread Whether to interrupt the thread
   * @param reason Reason for killing the task
   */
  def killTask(taskId: Long, executorId: String, interruptThread: Boolean, reason: String): Unit
  
  /**
   * Returns default parallelism for the application
   * @return Default parallelism value
   */
  def defaultParallelism(): Int
  
  /**
   * Returns the application ID (framework ID)
   * @return Framework ID or warning if not initialized
   */
  def applicationId(): String
}

Executor Management

Methods for creating and managing Mesos executors in fine-grained mode.

/**
 * Creates a MesosExecutorInfo for launching a Mesos executor
 * @param availableResources Available resources from Mesos offer
 * @param execId Executor ID to assign
 * @return Tuple of (MesosExecutorInfo, remaining resources)
 * @throws SparkException if executor Spark home is not configured
 */
def createExecutorInfo(
  availableResources: JList[Resource],
  execId: String
): (MesosExecutorInfo, JList[Resource])

/**
 * Creates a Mesos task from a Spark TaskDescription
 * @param task Spark task description
 * @param resources Available resources
 * @param slaveId Slave ID where task will run
 * @return Tuple of (MesosTaskInfo, remaining resources)
 */
def createMesosTask(
  task: TaskDescription,
  resources: JList[Resource], 
  slaveId: String
): (MesosTaskInfo, JList[Resource])

/**
 * Creates executor arguments serialized for Mesos
 * @return Serialized executor arguments as byte array
 */
private def createExecArg(): Array[Byte]

Resource Offer Handling

Fine-grained resource offer processing with dynamic task allocation.

/**
 * Handles resource offers from Mesos master
 * Processes offers by priority, fills nodes in round-robin manner
 * @param driver Scheduler driver
 * @param offers List of resource offers from Mesos
 */
def resourceOffers(driver: org.apache.mesos.SchedulerDriver, offers: JList[Offer]): Unit

/**
 * Handles task status updates from running tasks
 * @param driver Scheduler driver  
 * @param status Task status update
 */
def statusUpdate(driver: org.apache.mesos.SchedulerDriver, status: TaskStatus): Unit

/**
 * Removes executor when slave is lost or task fails
 * @param slaveId Slave ID to remove
 * @param reason Reason for removal
 */
private def removeExecutor(slaveId: String, reason: String): Unit

State Management

Internal state tracking for executors and tasks.

// Executor state tracking
val slaveIdToExecutorInfo: HashMap[String, MesosExecutorInfo]
val taskIdToSlaveId: HashMap[Long, String]

// Configuration values
private[mesos] val mesosExecutorCores: Double  // Cores allocated to Mesos executor

Usage Examples:

import org.apache.spark.{SparkConf, SparkContext}

// Basic fine-grained setup
val conf = new SparkConf()
  .setAppName("FineGrainedApp")
  .setMaster("mesos://master:5050")
  .set("spark.mesos.coarse", "false")         // Enable fine-grained mode
  .set("spark.mesos.mesosExecutor.cores", "0.5")  // Cores for Mesos executor overhead

val sc = new SparkContext(conf)

// Fine-grained with constraints
val constrainedConf = new SparkConf()
  .setAppName("ConstrainedFineApp")
  .setMaster("mesos://master:5050")
  .set("spark.mesos.coarse", "false")
  .set("spark.mesos.constraints", "zone:us-west-1")
  .set("spark.mesos.rejectOfferDurationForUnmetConstraints", "30s")

// Multi-tenant fine-grained setup
val multiTenantConf = new SparkConf()
  .setAppName("MultiTenantApp")
  .setMaster("mesos://master:5050")
  .set("spark.mesos.coarse", "false")
  .set("spark.mesos.role", "analytics")        // Specific role for resource allocation
  .set("spark.mesos.principal", "analytics-user")
  .set("spark.mesos.mesosExecutor.cores", "1.0")
  .set("spark.executor.memory", "1g")

Resource Allocation Strategy

Fine-grained mode uses a different resource allocation strategy:

  1. Per-Task Resources: Each Spark task gets its own Mesos task
  2. Dynamic Allocation: Resources are allocated and released dynamically
  3. Executor Overhead: Mesos executor overhead is configured separately via spark.mesos.mesosExecutor.cores
  4. Memory Requirements: Both executor memory and Mesos executor overhead are considered
  5. CPU Requirements: Minimum of (mesosExecutor.cores + task cores) required per offer

Offer Processing Logic

// Offer constraint matching
val (offersMatchingConstraints, offersNotMatchingConstraints) = 
  offers.asScala.partition { offer =>
    val offerAttributes = toAttributeMap(offer.getAttributesList)
    matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
  }

// Resource requirement checking  
val meetsMemoryRequirements = mem >= executorMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
val meetsRequirements = 
  (meetsMemoryRequirements && meetsCPURequirements) ||
  (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)

Configuration Properties

Key configuration properties for fine-grained mode:

  • spark.mesos.coarse - Set to "false" to enable fine-grained mode
  • spark.mesos.mesosExecutor.cores - CPU overhead for Mesos executor (default: 1.0)
  • spark.mesos.constraints - Mesos slave attribute constraints
  • spark.mesos.rejectOfferDurationForUnmetConstraints - Reject duration for unmatched offers
  • spark.executor.memory - Memory per executor
  • spark.default.parallelism - Default parallelism (used if not auto-detected)

Error Handling

// Executor creation validation
if (uri.isEmpty) {
  val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
    .orElse(sc.getSparkHome())
    .getOrElse {
      throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
    }
}

// Task failure handling
if (TaskState.isFailed(mesosToTaskState(status.getState)) && taskIdToSlaveId.contains(tid)) {
  removeExecutor(taskIdToSlaveId(tid), "Lost executor")
}

Fine-grained scheduling provides maximum resource efficiency and sharing but with higher task launch latency compared to coarse-grained mode. It's ideal for clusters with multiple frameworks and applications requiring dynamic resource allocation.