Dynamic resource allocation mode where each Spark task maps to a separate Mesos task, allowing for efficient resource sharing across multiple applications.
A SchedulerBackend for running fine-grained tasks on Mesos, where each Spark task is mapped to a separate Mesos task for maximum resource sharing.
/**
* Fine-grained Mesos scheduler backend
*/
class MesosFineGrainedSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
master: String
) extends SchedulerBackend
with org.apache.mesos.Scheduler
with MesosSchedulerUtils {
/**
* Starts the scheduler backend
*/
def start(): Unit
/**
* Stops the scheduler backend
*/
def stop(): Unit
/**
* Revives offers from Mesos master
*/
def reviveOffers(): Unit
/**
* Kills a specific task
* @param taskId Task ID to kill
* @param executorId Executor ID (slave ID in fine-grained mode)
* @param interruptThread Whether to interrupt the thread
* @param reason Reason for killing the task
*/
def killTask(taskId: Long, executorId: String, interruptThread: Boolean, reason: String): Unit
/**
* Returns default parallelism for the application
* @return Default parallelism value
*/
def defaultParallelism(): Int
/**
* Returns the application ID (framework ID)
* @return Framework ID or warning if not initialized
*/
def applicationId(): String
}Methods for creating and managing Mesos executors in fine-grained mode.
/**
* Creates a MesosExecutorInfo for launching a Mesos executor
* @param availableResources Available resources from Mesos offer
* @param execId Executor ID to assign
* @return Tuple of (MesosExecutorInfo, remaining resources)
* @throws SparkException if executor Spark home is not configured
*/
def createExecutorInfo(
availableResources: JList[Resource],
execId: String
): (MesosExecutorInfo, JList[Resource])
/**
* Creates a Mesos task from a Spark TaskDescription
* @param task Spark task description
* @param resources Available resources
* @param slaveId Slave ID where task will run
* @return Tuple of (MesosTaskInfo, remaining resources)
*/
def createMesosTask(
task: TaskDescription,
resources: JList[Resource],
slaveId: String
): (MesosTaskInfo, JList[Resource])
/**
* Creates executor arguments serialized for Mesos
* @return Serialized executor arguments as byte array
*/
private def createExecArg(): Array[Byte]Fine-grained resource offer processing with dynamic task allocation.
/**
* Handles resource offers from Mesos master
* Processes offers by priority, fills nodes in round-robin manner
* @param driver Scheduler driver
* @param offers List of resource offers from Mesos
*/
def resourceOffers(driver: org.apache.mesos.SchedulerDriver, offers: JList[Offer]): Unit
/**
* Handles task status updates from running tasks
* @param driver Scheduler driver
* @param status Task status update
*/
def statusUpdate(driver: org.apache.mesos.SchedulerDriver, status: TaskStatus): Unit
/**
* Removes executor when slave is lost or task fails
* @param slaveId Slave ID to remove
* @param reason Reason for removal
*/
private def removeExecutor(slaveId: String, reason: String): UnitInternal state tracking for executors and tasks.
// Executor state tracking
val slaveIdToExecutorInfo: HashMap[String, MesosExecutorInfo]
val taskIdToSlaveId: HashMap[Long, String]
// Configuration values
private[mesos] val mesosExecutorCores: Double // Cores allocated to Mesos executorUsage Examples:
import org.apache.spark.{SparkConf, SparkContext}
// Basic fine-grained setup
val conf = new SparkConf()
.setAppName("FineGrainedApp")
.setMaster("mesos://master:5050")
.set("spark.mesos.coarse", "false") // Enable fine-grained mode
.set("spark.mesos.mesosExecutor.cores", "0.5") // Cores for Mesos executor overhead
val sc = new SparkContext(conf)
// Fine-grained with constraints
val constrainedConf = new SparkConf()
.setAppName("ConstrainedFineApp")
.setMaster("mesos://master:5050")
.set("spark.mesos.coarse", "false")
.set("spark.mesos.constraints", "zone:us-west-1")
.set("spark.mesos.rejectOfferDurationForUnmetConstraints", "30s")
// Multi-tenant fine-grained setup
val multiTenantConf = new SparkConf()
.setAppName("MultiTenantApp")
.setMaster("mesos://master:5050")
.set("spark.mesos.coarse", "false")
.set("spark.mesos.role", "analytics") // Specific role for resource allocation
.set("spark.mesos.principal", "analytics-user")
.set("spark.mesos.mesosExecutor.cores", "1.0")
.set("spark.executor.memory", "1g")Fine-grained mode uses a different resource allocation strategy:
spark.mesos.mesosExecutor.cores// Offer constraint matching
val (offersMatchingConstraints, offersNotMatchingConstraints) =
offers.asScala.partition { offer =>
val offerAttributes = toAttributeMap(offer.getAttributesList)
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
}
// Resource requirement checking
val meetsMemoryRequirements = mem >= executorMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
val meetsRequirements =
(meetsMemoryRequirements && meetsCPURequirements) ||
(slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)Key configuration properties for fine-grained mode:
spark.mesos.coarse - Set to "false" to enable fine-grained modespark.mesos.mesosExecutor.cores - CPU overhead for Mesos executor (default: 1.0)spark.mesos.constraints - Mesos slave attribute constraintsspark.mesos.rejectOfferDurationForUnmetConstraints - Reject duration for unmatched offersspark.executor.memory - Memory per executorspark.default.parallelism - Default parallelism (used if not auto-detected)// Executor creation validation
if (uri.isEmpty) {
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome())
.getOrElse {
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
}
// Task failure handling
if (TaskState.isFailed(mesosToTaskState(status.getState)) && taskIdToSlaveId.contains(tid)) {
removeExecutor(taskIdToSlaveId(tid), "Lost executor")
}Fine-grained scheduling provides maximum resource efficiency and sharing but with higher task launch latency compared to coarse-grained mode. It's ideal for clusters with multiple frameworks and applications requiring dynamic resource allocation.