Apache Spark Mesos resource manager that enables Spark applications to run on Apache Mesos clusters with both coarse-grained and fine-grained scheduling modes
npx @tessl/cli install tessl/maven-org-apache-spark--spark-mesos_2-11@2.2.0Apache Spark Mesos resource manager provides integration between Apache Spark and Apache Mesos, enabling Spark applications to run on Mesos clusters with flexible resource allocation. It supports both coarse-grained mode (long-running executors with fixed resources) and fine-grained mode (dynamic resource allocation with individual tasks as Mesos tasks).
The Spark Mesos integration is automatically discovered through the ServiceLoader mechanism when the spark-mesos jar is on the classpath. No direct imports are needed for basic usage.
// Standard Spark imports for Mesos integration
import org.apache.spark.{SparkConf, SparkContext}
// Configuration constants (if needed for advanced configuration)
import org.apache.spark.deploy.mesos.config._Note: The core Mesos integration classes (MesosClusterManager, scheduler backends, etc.) are internal to Spark and marked as private[spark]. They are automatically used when you set the master URL to start with "mesos://".
import org.apache.spark.{SparkConf, SparkContext}
// Configure Spark to use Mesos with coarse-grained mode
val conf = new SparkConf()
.setAppName("MySparkApp")
.setMaster("mesos://mesos-master:5050") // Mesos master URL
.set("spark.mesos.coarse", "true") // Enable coarse-grained mode (default)
.set("spark.mesos.executor.home", "/opt/spark") // Required: Spark installation path
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
.set("spark.mesos.executor.memoryOverhead", "512m") // Additional memory overhead
// Optional: Configure constraints and Docker
// .set("spark.mesos.constraints", "os:linux")
// .set("spark.mesos.executor.docker.image", "spark:latest")
// Create SparkContext - automatically uses MesosClusterManager
val sc = new SparkContext(conf)
// Your Spark application code
val data = sc.parallelize(1 to 1000)
val result = data.map(_ * 2).collect()
sc.stop()Fine-grained mode example:
val conf = new SparkConf()
.setAppName("MySparkApp")
.setMaster("mesos://mesos-master:5050")
.set("spark.mesos.coarse", "false") // Enable fine-grained mode
.set("spark.mesos.executor.home", "/opt/spark")
.set("spark.mesos.mesosExecutor.cores", "1.0") // Cores for executor backend
val sc = new SparkContext(conf)
// Application code...
sc.stop()Spark Mesos integration supports extensive configuration through spark.mesos.* properties:
true): Enable coarse-grained mode vs fine-grained mode1.0): CPU cores for fine-grained executor backend0): Maximum GPUs to acquire across all executors0): Extra cores per executor for overhead200): Maximum queued drivers in cluster mode200): Number of completed drivers to retain/host:/container:mode)host:container:protocol)docker): Container type (docker or mesos)false): Enable Mesos fetcher cache120s): Duration to reject unsuitable offers10s): Graceful shutdown timeout60): Maximum retry wait time in secondsThe Spark Mesos integration consists of several key components:
MesosClusterManager handles integration with Spark's scheduler systemNote: The following API descriptions document the internal implementation classes that provide Mesos integration functionality. These classes are automatically used by Spark when you configure it to use Mesos (master URL starting with "mesos://"). They are documented here for completeness and understanding of the underlying functionality.
Core cluster management functionality that integrates Spark with Mesos resource negotiation and task scheduling. Automatically discovered via ServiceLoader.
private[spark] class MesosClusterManager extends ExternalClusterManager {
def canCreate(masterURL: String): Boolean
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
def createSchedulerBackend(
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler
): SchedulerBackend
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}Long-running executor mode that holds onto Mesos resources for the duration of the Spark job, providing lower latency and more predictable performance.
private[spark] class MesosCoarseGrainedSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
master: String,
securityManager: SecurityManager
) extends CoarseGrainedSchedulerBackend with org.apache.mesos.Scheduler {
def start(): Unit
def stop(): Unit
def applicationId(): String
def sufficientResourcesRegistered(): Boolean
def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean]
def doKillExecutors(executorIds: Seq[String]): Future[Boolean]
}Dynamic resource allocation mode where each Spark task maps to a separate Mesos task, allowing for efficient resource sharing across multiple applications.
private[spark] class MesosFineGrainedSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
master: String
) extends SchedulerBackend with org.apache.mesos.Scheduler {
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit
def killTask(taskId: Long, executorId: String, interruptThread: Boolean, reason: String): Unit
def defaultParallelism(): Int
def applicationId(): String
}Cluster mode deployment functionality for submitting Spark drivers as Mesos tasks, including driver lifecycle management and recovery.
private[spark] class MesosClusterScheduler(
engineFactory: MesosClusterPersistenceEngineFactory,
conf: SparkConf
) extends Scheduler with MesosSchedulerUtils {
def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse
def killDriver(submissionId: String): KillSubmissionResponse
def getDriverStatus(submissionId: String): SubmissionStatusResponse
def start(): Unit
def stop(): Unit
}
private[mesos] class MesosClusterDispatcher(
args: MesosClusterDispatcherArguments,
conf: SparkConf
) {
def start(): Unit
def stop(): Unit
def awaitShutdown(): Unit
}Utilities for Mesos resource negotiation, constraint matching, and configuration management.
trait MesosSchedulerUtils {
def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None
): SchedulerDriver
def getResource(res: JList[Resource], name: String): Double
def partitionResources(
resources: JList[Resource],
resourceName: String,
amountToUse: Double
): (List[Resource], List[Resource])
def matchesAttributeRequirements(
slaveOfferConstraints: Map[String, Set[String]],
offerAttributes: Map[String, GeneratedMessage]
): Boolean
}Resource and Configuration Management
Mesos-specific external shuffle service that provides shuffle data persistence and cleanup when drivers terminate.
private[mesos] class MesosExternalShuffleService extends ExternalShuffleService {
def start(): Unit
def stop(): Unit
}
private[mesos] class MesosExternalShuffleBlockHandler(
transportConf: TransportConf,
cleanerIntervalS: Long
) extends ExternalShuffleBlockHandler with Logging {
def registerApplication(appShuffleInfo: ApplicationShuffleInfo): Unit
def applicationRemoved(appId: String, cleanupLocalDirs: Boolean): Unit
}
## Types
```scala { .api }
// Required imports for types
import java.util.Date
import org.apache.spark.deploy.Command
import org.apache.mesos.Protos.{TaskID, SlaveID, TaskStatus}
import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
import org.apache.spark.internal.config.ConfigEntry
// Driver submission and state (internal classes)
private[spark] class MesosDriverDescription(
name: String,
jarUrl: String,
mem: Int,
cores: Double,
supervise: Boolean,
command: Command,
schedulerProperties: Map[String, String],
submissionId: String,
submissionDate: Date,
retryState: Option[MesosClusterRetryState] = None
)
private[spark] class MesosClusterSubmissionState(
val driverDescription: MesosDriverDescription,
val taskId: TaskID,
val slaveId: SlaveID,
var mesosTaskStatus: Option[TaskStatus],
var startDate: Date,
var finishDate: Option[Date],
val frameworkId: String
) extends Serializable
// Configuration objects
package object config {
val RECOVERY_MODE: ConfigEntry[String]
val DISPATCHER_WEBUI_URL: ConfigEntry[Option[String]]
val ZOOKEEPER_URL: ConfigEntry[Option[String]]
val HISTORY_SERVER_URL: ConfigEntry[Option[String]]
val DRIVER_CONSTRAINTS: ConfigEntry[String]
}