Long-running executor mode that holds onto Mesos resources for the duration of the Spark job, providing lower latency and more predictable performance.
A SchedulerBackend that runs tasks on Mesos using "coarse-grained" tasks, holding onto each Mesos node for the duration of the Spark job.
/**
* Coarse-grained Mesos scheduler backend
*/
private[spark] class MesosCoarseGrainedSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
master: String,
securityManager: SecurityManager
) extends CoarseGrainedSchedulerBackend
with org.apache.mesos.Scheduler
with MesosSchedulerUtils {
/**
* Starts the scheduler backend and registers with Mesos master
*/
def start(): Unit
/**
* Stops the scheduler backend gracefully
*/
def stop(): Unit
/**
* Returns the application ID (framework ID)
* @return Framework ID or warning if not initialized
*/
def applicationId(): String
/**
* Checks if sufficient resources are registered
* @return true if total cores >= required cores * minimum ratio
*/
def sufficientResourcesRegistered(): Boolean
/**
* Requests a specific total number of executors
* @param requestedTotal Target number of executors
* @return Future[Boolean] indicating success
*/
def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean]
/**
* Kills the specified executors
* @param executorIds Executor IDs to kill
* @return Future[Boolean] indicating success
*/
def doKillExecutors(executorIds: Seq[String]): Future[Boolean]
/**
* Generates a new unique Mesos task ID
* @return String task ID
*/
def newMesosTaskId(): String
/**
* Creates command info for launching an executor
* @param offer Mesos resource offer
* @param numCores Number of cores for the executor
* @param taskId Task ID for the executor
* @return CommandInfo for Mesos task
*/
def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo
}Implementation of the Mesos Scheduler interface methods for handling cluster events.
/**
* Called when framework is registered with Mesos master
*/
def registered(
driver: org.apache.mesos.SchedulerDriver,
frameworkId: FrameworkID,
masterInfo: MasterInfo
): Unit
/**
* Handles resource offers from Mesos master
* @param driver Scheduler driver
* @param offers List of resource offers
*/
def resourceOffers(
driver: org.apache.mesos.SchedulerDriver,
offers: JList[Offer]
): Unit
/**
* Handles task status updates
* @param driver Scheduler driver
* @param status Task status update
*/
def statusUpdate(
driver: org.apache.mesos.SchedulerDriver,
status: TaskStatus
): Unit
/**
* Handles scheduler errors
* @param driver Scheduler driver
* @param message Error message
*/
def error(driver: org.apache.mesos.SchedulerDriver, message: String): Unit
/**
* Handles offer rescission
*/
def offerRescinded(driver: org.apache.mesos.SchedulerDriver, offerId: OfferID): Unit
/**
* Handles disconnection from Mesos master
*/
def disconnected(driver: org.apache.mesos.SchedulerDriver): Unit
/**
* Handles reregistration with Mesos master
*/
def reregistered(driver: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo): Unit
/**
* Handles slave loss events
*/
def slaveLost(driver: org.apache.mesos.SchedulerDriver, slaveId: SlaveID): Unit
/**
* Handles executor loss events
*/
def executorLost(
driver: org.apache.mesos.SchedulerDriver,
executorId: ExecutorID,
slaveId: SlaveID,
status: Int
): Unit
/**
* Handles framework messages (no-op in current implementation)
*/
def frameworkMessage(
driver: org.apache.mesos.SchedulerDriver,
executorId: ExecutorID,
slaveId: SlaveID,
message: Array[Byte]
): UnitMethods for managing Mesos resource offers and executor allocation.
/**
* Builds Mesos tasks from resource offers
* @param offers Available resource offers
* @return Map from OfferID to list of tasks to launch
*/
private def buildMesosTasks(offers: mutable.Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]]
/**
* Checks if a task can be launched on the given slave
* @param slaveId Slave identifier
* @param resources Available resources
* @return true if task can be launched
*/
private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean
/**
* Determines executor cores based on offer and configuration
* @param offerCPUs Available CPUs in offer
* @return Number of cores for executor
*/
private def executorCores(offerCPUs: Int): Int
/**
* Partitions task resources from available resources
* @param resources Available resources
* @param taskCPUs Required CPU cores
* @param taskMemory Required memory
* @param taskGPUs Required GPUs
* @return Tuple of (remaining resources, resources to use)
*/
private def partitionTaskResources(
resources: JList[Resource],
taskCPUs: Int,
taskMemory: Int,
taskGPUs: Int
): (List[Resource], List[Resource])Usage Examples:
import org.apache.spark.{SparkConf, SparkContext}
// Basic coarse-grained setup
val conf = new SparkConf()
.setAppName("CoarseGrainedApp")
.setMaster("mesos://master:5050")
.set("spark.mesos.coarse", "true") // Enable coarse-grained mode
.set("spark.executor.cores", "2") // Cores per executor
.set("spark.executor.memory", "2g") // Memory per executor
.set("spark.cores.max", "16") // Maximum total cores
val sc = new SparkContext(conf)
// Advanced configuration
val advancedConf = new SparkConf()
.setAppName("AdvancedCoarseApp")
.setMaster("mesos://master:5050")
.set("spark.mesos.coarse", "true")
.set("spark.mesos.constraints", "os:centos7") // Constraint matching
.set("spark.mesos.role", "spark") // Mesos role
.set("spark.mesos.principal", "spark-principal") // Authentication
.set("spark.mesos.executor.docker.image", "spark:latest") // Docker image
.set("spark.mesos.fetcherCache.enable", "true") // Enable fetcher cache
.set("spark.mesos.gpus.max", "4") // GPU support
// Dynamic executor scaling
val dynamicConf = new SparkConf()
.setAppName("DynamicApp")
.setMaster("mesos://master:5050")
.set("spark.mesos.coarse", "true")
.set("spark.dynamicAllocation.enabled", "true") // Enable dynamic allocation
.set("spark.dynamicAllocation.minExecutors", "1")
.set("spark.dynamicAllocation.maxExecutors", "10")
.set("spark.dynamicAllocation.initialExecutors", "3")Key configuration properties for coarse-grained mode:
spark.mesos.coarse - Set to "true" to enable coarse-grained modespark.executor.cores - Number of cores per executorspark.executor.memory - Memory per executorspark.cores.max - Maximum total cores to acquirespark.mesos.constraints - Mesos slave constraintsspark.mesos.coarse.shutdownTimeout - Executor shutdown timeoutspark.mesos.executor.docker.image - Docker image for executorsspark.mesos.fetcherCache.enable - Enable Mesos fetcher cachespark.mesos.gpus.max - Maximum GPUs to acquireThe coarse-grained scheduler backend provides predictable resource allocation and lower task launch latency by maintaining long-running executors throughout the application lifecycle.