Cluster mode deployment functionality for submitting Spark drivers as Mesos tasks, including driver lifecycle management, recovery, and web UI integration.
Mesos scheduler responsible for launching submitted Spark drivers in cluster mode as Mesos tasks.
/**
* Mesos cluster scheduler for driver submission and management
*/
class MesosClusterScheduler(
engineFactory: MesosClusterPersistenceEngineFactory,
conf: SparkConf
) extends Scheduler with MesosSchedulerUtils {
/**
* Submits a driver for execution on the Mesos cluster
* @param desc Driver description with submission parameters
* @return CreateSubmissionResponse with submission ID and status
*/
def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse
/**
* Kills a submitted driver
* @param submissionId Driver submission ID to kill
* @return KillSubmissionResponse with operation status
*/
def killDriver(submissionId: String): KillSubmissionResponse
/**
* Gets the status of a submitted driver
* @param submissionId Driver submission ID
* @return SubmissionStatusResponse with current driver state
*/
def getDriverStatus(submissionId: String): SubmissionStatusResponse
/**
* Gets detailed driver state for web UI display
* @param submissionId Driver submission ID
* @return Option[MesosDriverState] with detailed state information
*/
def getDriverState(submissionId: String): Option[MesosDriverState]
/**
* Starts the cluster scheduler and registers with Mesos
*/
def start(): Unit
/**
* Stops the cluster scheduler
*/
def stop(): Unit
/**
* Gets complete scheduler state for monitoring
* @return MesosClusterSchedulerState with all driver states
*/
def getSchedulerState(): MesosClusterSchedulerState
/**
* Returns number of queued drivers
* @return Int count of queued drivers
*/
def getQueuedDriversSize: Int
/**
* Returns number of launched drivers
* @return Int count of launched drivers
*/
def getLaunchedDriversSize: Int
/**
* Returns number of pending retry drivers
* @return Int count of pending retry drivers
*/
def getPendingRetryDriversSize: Int
/**
* Escapes shell arguments for driver command construction
* @param value String value to escape
* @return Escaped string safe for shell execution
*/
def shellEscape(value: String): String
}Dispatcher daemon responsible for managing the cluster scheduler and providing REST API access.
/**
* Dispatcher for managing and launching drivers in Mesos cluster mode
*/
class MesosClusterDispatcher(
args: MesosClusterDispatcherArguments,
conf: SparkConf
) {
/**
* Starts the dispatcher, web UI, and cluster scheduler
*/
def start(): Unit
/**
* Stops all dispatcher components
*/
def stop(): Unit
/**
* Waits for dispatcher shutdown
*/
def awaitShutdown(): Unit
}Methods for creating and managing driver tasks on Mesos.
/**
* Creates a Mesos TaskInfo for a driver submission
* @param desc Driver description
* @param offer Resource offer to use
* @return TaskInfo for launching the driver
*/
private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo
/**
* Builds driver command line for execution
* @param desc Driver description
* @return CommandInfo with complete driver command
*/
private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo
/**
* Generates command-line options for spark-submit
* @param desc Driver description
* @param sandboxPath Path to Mesos sandbox
* @return Sequence of command-line options
*/
private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String]
/**
* Gets driver environment variables
* @param desc Driver description
* @return Environment with driver-specific variables
*/
private def getDriverEnvironment(desc: MesosDriverDescription): Environment
/**
* Gets URIs needed by the driver
* @param desc Driver description
* @return List of URIs to fetch
*/
private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI]Data classes for tracking driver and scheduler state.
/**
* Tracks current state of a Mesos task running a Spark driver
*/
class MesosClusterSubmissionState(
val driverDescription: MesosDriverDescription,
val taskId: TaskID,
val slaveId: SlaveID,
var mesosTaskStatus: Option[TaskStatus],
var startDate: Date,
var finishDate: Option[Date],
val frameworkId: String
) extends Serializable {
def copy(): MesosClusterSubmissionState
}
/**
* Tracks retry state of a failed driver
*/
class MesosClusterRetryState(
val lastFailureStatus: TaskStatus,
val retries: Int,
val nextRetry: Date,
val waitTime: Int
) extends Serializable {
def copy(): MesosClusterRetryState
}
/**
* Complete state of the cluster scheduler
*/
class MesosClusterSchedulerState(
val frameworkId: String,
val masterUrl: Option[String],
val queuedDrivers: Iterable[MesosDriverDescription],
val launchedDrivers: Iterable[MesosClusterSubmissionState],
val finishedDrivers: Iterable[MesosClusterSubmissionState],
val pendingRetryDrivers: Iterable[MesosDriverDescription]
)
/**
* Driver state for web UI display
*/
class MesosDriverState(
val state: String,
val description: MesosDriverDescription,
val submissionState: Option[MesosClusterSubmissionState] = None
)Recovery mechanisms for cluster scheduler state.
/**
* Recovers scheduler state from persistence engine
*/
private def recoverState(): Unit
/**
* Persistence engine factory for different recovery modes
*/
trait MesosClusterPersistenceEngineFactory {
def createEngine(name: String): PersistenceEngine
}
// Recovery modes
class BlackHoleMesosClusterPersistenceEngineFactory // No persistence
class ZookeeperMesosClusterPersistenceEngineFactory // ZooKeeper persistenceUsage Examples:
import org.apache.spark.deploy.mesos.{MesosClusterDispatcher, MesosClusterDispatcherArguments}
import org.apache.spark.SparkConf
// Start cluster dispatcher
val conf = new SparkConf()
.setMaster("mesos://master:5050")
.setAppName("MesosDispatcher")
.set("spark.mesos.dispatcher.webui.url", "http://dispatcher:8080")
.set("spark.deploy.recoveryMode", "ZOOKEEPER")
.set("spark.deploy.zookeeper.url", "zk1:2181,zk2:2181,zk3:2181")
val args = new MesosClusterDispatcherArguments(Array(
"--master", "mesos://master:5050",
"--zk", "zk1:2181,zk2:2181,zk3:2181"
), conf)
val dispatcher = new MesosClusterDispatcher(args, conf)
dispatcher.start()
// Submit driver via REST API (typically done via spark-submit)
// POST to http://dispatcher:7077/v1/submissions/create
// {
// "appResource": "hdfs://namenode:8020/app.jar",
// "sparkProperties": {
// "spark.app.name": "MyApp",
// "spark.master": "mesos://master:5050",
// "spark.submit.deployMode": "cluster"
// },
// "clientSparkVersion": "2.2.3",
// "mainClass": "com.example.MyApp",
// "environmentVariables": {},
// "appArgs": []
// }
// Driver recovery example
val recoveryConf = new SparkConf()
.set("spark.deploy.recoveryMode", "ZOOKEEPER")
.set("spark.deploy.zookeeper.url", "zk1:2181")
.set("spark.mesos.retainedDrivers", "1000")
.set("spark.mesos.maxDrivers", "200")
.set("spark.mesos.cluster.retry.wait.max", "300") // 5 minutes max retry waitKey configuration properties for cluster deployment:
spark.deploy.recoveryMode - Recovery mode: "NONE" or "ZOOKEEPER"spark.deploy.zookeeper.url - ZooKeeper URLs for recoveryspark.mesos.dispatcher.webui.url - Dispatcher web UI URLspark.mesos.maxDrivers - Maximum queued drivers (default: 200)spark.mesos.retainedDrivers - Number of finished drivers to retain (default: 200)spark.mesos.cluster.retry.wait.max - Maximum retry wait time (default: 60s)spark.mesos.driver.constraints - Driver placement constraintsspark.mesos.dispatcher.historyServer.url - History server URL for linking// Exponential backoff for failed drivers
val (retries, waitTimeSec) = retryState
.map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
.getOrElse{ (1, 1) }
// Driver failure classification
private def shouldRelaunch(state: MesosTaskState): Boolean = {
state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST
}
// Shell escaping for security
private def shellEscape(value: String): String = {
val WrappedInQuotes = """^(".+"|'.+')$""".r
val ShellSpecialChars = (""".*([ '<>&|\?\*;!#\\(\)"$`]).*""").r
value match {
case WrappedInQuotes(c) => value
case ShellSpecialChars(c) => "\"" + value.replaceAll("""(["`\$\\])""", """\\$1""") + "\""
case _: String => value
}
}The cluster deployment functionality provides a complete solution for running Spark applications in cluster mode on Mesos, with features like driver lifecycle management, fault tolerance, recovery, and monitoring capabilities.