or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cluster-deployment.mdcluster-management.mdcoarse-grained-scheduling.mdfine-grained-scheduling.mdindex.mdresource-configuration.md
tile.json

cluster-deployment.mddocs/

Cluster Deployment

Cluster mode deployment functionality for submitting Spark drivers as Mesos tasks, including driver lifecycle management, recovery, and web UI integration.

Capabilities

MesosClusterScheduler

Mesos scheduler responsible for launching submitted Spark drivers in cluster mode as Mesos tasks.

/**
 * Mesos cluster scheduler for driver submission and management
 */
class MesosClusterScheduler(
  engineFactory: MesosClusterPersistenceEngineFactory,
  conf: SparkConf
) extends Scheduler with MesosSchedulerUtils {

  /**
   * Submits a driver for execution on the Mesos cluster
   * @param desc Driver description with submission parameters
   * @return CreateSubmissionResponse with submission ID and status
   */
  def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse
  
  /**
   * Kills a submitted driver
   * @param submissionId Driver submission ID to kill
   * @return KillSubmissionResponse with operation status
   */
  def killDriver(submissionId: String): KillSubmissionResponse
  
  /**
   * Gets the status of a submitted driver
   * @param submissionId Driver submission ID
   * @return SubmissionStatusResponse with current driver state
   */
  def getDriverStatus(submissionId: String): SubmissionStatusResponse
  
  /**
   * Gets detailed driver state for web UI display
   * @param submissionId Driver submission ID
   * @return Option[MesosDriverState] with detailed state information
   */
  def getDriverState(submissionId: String): Option[MesosDriverState]
  
  /**
   * Starts the cluster scheduler and registers with Mesos
   */
  def start(): Unit
  
  /**
   * Stops the cluster scheduler
   */
  def stop(): Unit
  
  /**
   * Gets complete scheduler state for monitoring
   * @return MesosClusterSchedulerState with all driver states
   */
  def getSchedulerState(): MesosClusterSchedulerState
  
  /**
   * Returns number of queued drivers
   * @return Int count of queued drivers
   */
  def getQueuedDriversSize: Int
  
  /**
   * Returns number of launched drivers  
   * @return Int count of launched drivers
   */
  def getLaunchedDriversSize: Int
  
  /**
   * Returns number of pending retry drivers
   * @return Int count of pending retry drivers
   */
  def getPendingRetryDriversSize: Int
  
  /**
   * Escapes shell arguments for driver command construction
   * @param value String value to escape
   * @return Escaped string safe for shell execution
   */
  def shellEscape(value: String): String
}

MesosClusterDispatcher

Dispatcher daemon responsible for managing the cluster scheduler and providing REST API access.

/**
 * Dispatcher for managing and launching drivers in Mesos cluster mode
 */
class MesosClusterDispatcher(
  args: MesosClusterDispatcherArguments,
  conf: SparkConf
) {
  /**
   * Starts the dispatcher, web UI, and cluster scheduler
   */
  def start(): Unit
  
  /**
   * Stops all dispatcher components
   */
  def stop(): Unit
  
  /**
   * Waits for dispatcher shutdown
   */
  def awaitShutdown(): Unit
}

Driver Task Management

Methods for creating and managing driver tasks on Mesos.

/**
 * Creates a Mesos TaskInfo for a driver submission
 * @param desc Driver description
 * @param offer Resource offer to use
 * @return TaskInfo for launching the driver
 */
private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo

/**
 * Builds driver command line for execution
 * @param desc Driver description
 * @return CommandInfo with complete driver command
 */
private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo

/**
 * Generates command-line options for spark-submit
 * @param desc Driver description
 * @param sandboxPath Path to Mesos sandbox
 * @return Sequence of command-line options
 */
private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String]

/**
 * Gets driver environment variables
 * @param desc Driver description
 * @return Environment with driver-specific variables
 */
private def getDriverEnvironment(desc: MesosDriverDescription): Environment

/**
 * Gets URIs needed by the driver
 * @param desc Driver description
 * @return List of URIs to fetch
 */
private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI]

State Management Classes

Data classes for tracking driver and scheduler state.

/**
 * Tracks current state of a Mesos task running a Spark driver
 */
class MesosClusterSubmissionState(
  val driverDescription: MesosDriverDescription,
  val taskId: TaskID,
  val slaveId: SlaveID,
  var mesosTaskStatus: Option[TaskStatus],
  var startDate: Date,
  var finishDate: Option[Date],
  val frameworkId: String
) extends Serializable {
  def copy(): MesosClusterSubmissionState
}

/**
 * Tracks retry state of a failed driver
 */
class MesosClusterRetryState(
  val lastFailureStatus: TaskStatus,
  val retries: Int,
  val nextRetry: Date,
  val waitTime: Int
) extends Serializable {
  def copy(): MesosClusterRetryState
}

/**
 * Complete state of the cluster scheduler
 */
class MesosClusterSchedulerState(
  val frameworkId: String,
  val masterUrl: Option[String],
  val queuedDrivers: Iterable[MesosDriverDescription],
  val launchedDrivers: Iterable[MesosClusterSubmissionState],
  val finishedDrivers: Iterable[MesosClusterSubmissionState],
  val pendingRetryDrivers: Iterable[MesosDriverDescription]
)

/**
 * Driver state for web UI display
 */
class MesosDriverState(
  val state: String,
  val description: MesosDriverDescription,
  val submissionState: Option[MesosClusterSubmissionState] = None
)

Recovery and Persistence

Recovery mechanisms for cluster scheduler state.

/**
 * Recovers scheduler state from persistence engine
 */
private def recoverState(): Unit

/**
 * Persistence engine factory for different recovery modes
 */
trait MesosClusterPersistenceEngineFactory {
  def createEngine(name: String): PersistenceEngine
}

// Recovery modes
class BlackHoleMesosClusterPersistenceEngineFactory  // No persistence
class ZookeeperMesosClusterPersistenceEngineFactory  // ZooKeeper persistence

Usage Examples:

import org.apache.spark.deploy.mesos.{MesosClusterDispatcher, MesosClusterDispatcherArguments}
import org.apache.spark.SparkConf

// Start cluster dispatcher
val conf = new SparkConf()
  .setMaster("mesos://master:5050")
  .setAppName("MesosDispatcher")
  .set("spark.mesos.dispatcher.webui.url", "http://dispatcher:8080")
  .set("spark.deploy.recoveryMode", "ZOOKEEPER")
  .set("spark.deploy.zookeeper.url", "zk1:2181,zk2:2181,zk3:2181")

val args = new MesosClusterDispatcherArguments(Array(
  "--master", "mesos://master:5050",
  "--zk", "zk1:2181,zk2:2181,zk3:2181"
), conf)

val dispatcher = new MesosClusterDispatcher(args, conf)
dispatcher.start()

// Submit driver via REST API (typically done via spark-submit)
// POST to http://dispatcher:7077/v1/submissions/create
// {
//   "appResource": "hdfs://namenode:8020/app.jar",
//   "sparkProperties": {
//     "spark.app.name": "MyApp",
//     "spark.master": "mesos://master:5050",
//     "spark.submit.deployMode": "cluster"
//   },
//   "clientSparkVersion": "2.2.3",
//   "mainClass": "com.example.MyApp",
//   "environmentVariables": {},
//   "appArgs": []
// }

// Driver recovery example
val recoveryConf = new SparkConf()
  .set("spark.deploy.recoveryMode", "ZOOKEEPER")
  .set("spark.deploy.zookeeper.url", "zk1:2181")
  .set("spark.mesos.retainedDrivers", "1000")
  .set("spark.mesos.maxDrivers", "200")
  .set("spark.mesos.cluster.retry.wait.max", "300")  // 5 minutes max retry wait

Driver Submission Process

  1. Submission: Driver submitted via REST API or spark-submit
  2. Queueing: Driver added to queue if resources unavailable
  3. Scheduling: Resource offers processed and driver tasks created
  4. Launching: Driver launched as Mesos task with spark-submit command
  5. Monitoring: Task status tracked and reported via REST API
  6. Recovery: Failed drivers can be retried with exponential backoff

Configuration Properties

Key configuration properties for cluster deployment:

  • spark.deploy.recoveryMode - Recovery mode: "NONE" or "ZOOKEEPER"
  • spark.deploy.zookeeper.url - ZooKeeper URLs for recovery
  • spark.mesos.dispatcher.webui.url - Dispatcher web UI URL
  • spark.mesos.maxDrivers - Maximum queued drivers (default: 200)
  • spark.mesos.retainedDrivers - Number of finished drivers to retain (default: 200)
  • spark.mesos.cluster.retry.wait.max - Maximum retry wait time (default: 60s)
  • spark.mesos.driver.constraints - Driver placement constraints
  • spark.mesos.dispatcher.historyServer.url - History server URL for linking

Error Handling and Retry Logic

// Exponential backoff for failed drivers
val (retries, waitTimeSec) = retryState
  .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
  .getOrElse{ (1, 1) }

// Driver failure classification
private def shouldRelaunch(state: MesosTaskState): Boolean = {
  state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST
}

// Shell escaping for security
private def shellEscape(value: String): String = {
  val WrappedInQuotes = """^(".+"|'.+')$""".r
  val ShellSpecialChars = (""".*([ '<>&|\?\*;!#\\(\)"$`]).*""").r
  value match {
    case WrappedInQuotes(c) => value
    case ShellSpecialChars(c) => "\"" + value.replaceAll("""(["`\$\\])""", """\\$1""") + "\""
    case _: String => value
  }
}

The cluster deployment functionality provides a complete solution for running Spark applications in cluster mode on Mesos, with features like driver lifecycle management, fault tolerance, recovery, and monitoring capabilities.