or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cluster-deployment.mdcluster-management.mdcoarse-grained-scheduling.mdfine-grained-scheduling.mdindex.mdresource-configuration.md
tile.json

index.mddocs/

Apache Spark Mesos Resource Manager

Apache Spark Mesos resource manager provides integration between Apache Spark and Apache Mesos, enabling Spark applications to run on Mesos clusters with flexible resource allocation. It supports both coarse-grained mode (long-running executors with fixed resources) and fine-grained mode (dynamic resource allocation with individual tasks as Mesos tasks).

Package Information

  • Package Name: spark-mesos_2.11
  • Package Type: Maven
  • Language: Scala
  • Version: 2.2.3
  • GroupId: org.apache.spark
  • ArtifactId: spark-mesos_2.11
  • Installation: Maven dependency or SBT dependency inclusion
  • License: Apache License 2.0

Core Imports

The Spark Mesos integration is automatically discovered through the ServiceLoader mechanism when the spark-mesos jar is on the classpath. No direct imports are needed for basic usage.

// Standard Spark imports for Mesos integration
import org.apache.spark.{SparkConf, SparkContext}

// Configuration constants (if needed for advanced configuration)
import org.apache.spark.deploy.mesos.config._

Note: The core Mesos integration classes (MesosClusterManager, scheduler backends, etc.) are internal to Spark and marked as private[spark]. They are automatically used when you set the master URL to start with "mesos://".

Basic Usage

import org.apache.spark.{SparkConf, SparkContext}

// Configure Spark to use Mesos with coarse-grained mode
val conf = new SparkConf()
  .setAppName("MySparkApp")
  .setMaster("mesos://mesos-master:5050")      // Mesos master URL
  .set("spark.mesos.coarse", "true")           // Enable coarse-grained mode (default)
  .set("spark.mesos.executor.home", "/opt/spark") // Required: Spark installation path
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "2")
  .set("spark.mesos.executor.memoryOverhead", "512m") // Additional memory overhead

// Optional: Configure constraints and Docker
// .set("spark.mesos.constraints", "os:linux")
// .set("spark.mesos.executor.docker.image", "spark:latest")

// Create SparkContext - automatically uses MesosClusterManager
val sc = new SparkContext(conf)

// Your Spark application code
val data = sc.parallelize(1 to 1000)
val result = data.map(_ * 2).collect()

sc.stop()

Fine-grained mode example:

val conf = new SparkConf()
  .setAppName("MySparkApp")
  .setMaster("mesos://mesos-master:5050")
  .set("spark.mesos.coarse", "false")           // Enable fine-grained mode
  .set("spark.mesos.executor.home", "/opt/spark")
  .set("spark.mesos.mesosExecutor.cores", "1.0") // Cores for executor backend

val sc = new SparkContext(conf)
// Application code...
sc.stop()

Configuration

Spark Mesos integration supports extensive configuration through spark.mesos.* properties:

Core Configuration

  • spark.mesos.coarse (default: true): Enable coarse-grained mode vs fine-grained mode
  • spark.mesos.executor.home: Spark installation directory on Mesos slaves (required)
  • spark.mesos.executor.cores: Number of cores per executor (coarse-grained mode)
  • spark.mesos.executor.memoryOverhead: Additional memory overhead for executors
  • spark.mesos.mesosExecutor.cores (default: 1.0): CPU cores for fine-grained executor backend

Resource Limits

  • spark.mesos.gpus.max (default: 0): Maximum GPUs to acquire across all executors
  • spark.mesos.extra.cores (default: 0): Extra cores per executor for overhead
  • spark.mesos.maxDrivers (default: 200): Maximum queued drivers in cluster mode
  • spark.mesos.retainedDrivers (default: 200): Number of completed drivers to retain

Security and Authentication

  • spark.mesos.principal: Mesos principal for framework authentication
  • spark.mesos.secret: Secret for framework authentication (requires principal)
  • spark.mesos.role: Mesos role for resource allocation

Docker Support

  • spark.mesos.executor.docker.image: Docker image for executors
  • spark.mesos.executor.docker.forcePullImage: Always pull Docker image
  • spark.mesos.executor.docker.volumes: Volume mappings (format: /host:/container:mode)
  • spark.mesos.executor.docker.portmaps: Port mappings (format: host:container:protocol)
  • spark.mesos.executor.docker.parameters: Custom Docker parameters
  • spark.mesos.containerizer (default: docker): Container type (docker or mesos)

Networking and URIs

  • spark.mesos.network.name: Custom network name for containers
  • spark.mesos.uris: Comma-separated URIs to download to sandbox
  • spark.mesos.fetcherCache.enable (default: false): Enable Mesos fetcher cache

Constraints and Filtering

  • spark.mesos.constraints: Attribute constraints for resource offers
  • spark.mesos.rejectOfferDuration (default: 120s): Duration to reject unsuitable offers
  • spark.mesos.rejectOfferDurationForUnmetConstraints: Reject duration for constraint mismatches
  • spark.mesos.rejectOfferDurationForReachedMaxCores: Reject duration when max cores reached

Cluster Mode Configuration

  • spark.mesos.dispatcher.webui.url: Dispatcher web UI URL
  • spark.mesos.dispatcher.historyServer.url: History server URL for driver links
  • spark.mesos.driver.constraints: Constraints for driver placement
  • spark.mesos.driver.webui.url: Driver web UI URL
  • spark.mesos.driver.frameworkId: Framework ID for driver correlation
  • spark.mesos.driverEnv.*: Environment variables for driver
  • spark.mesos.dispatcher.driverDefault.*: Default configuration for submitted drivers

Advanced Configuration

  • spark.mesos.coarse.shutdownTimeout (default: 10s): Graceful shutdown timeout
  • spark.mesos.task.labels: Labels to apply to Mesos tasks
  • spark.mesos.cluster.retry.wait.max (default: 60): Maximum retry wait time in seconds

Architecture

The Spark Mesos integration consists of several key components:

  • Cluster Manager: MesosClusterManager handles integration with Spark's scheduler system
  • Scheduler Backends: Two modes of operation - coarse-grained and fine-grained scheduling
  • Cluster Dispatcher: For cluster deployment mode, manages driver submission and lifecycle
  • Executor Backend: Runs on Mesos slaves to execute Spark tasks
  • Utilities: Shared components for Mesos integration, resource management, and configuration

Capabilities

Note: The following API descriptions document the internal implementation classes that provide Mesos integration functionality. These classes are automatically used by Spark when you configure it to use Mesos (master URL starting with "mesos://"). They are documented here for completeness and understanding of the underlying functionality.

Cluster Management

Core cluster management functionality that integrates Spark with Mesos resource negotiation and task scheduling. Automatically discovered via ServiceLoader.

private[spark] class MesosClusterManager extends ExternalClusterManager {
  def canCreate(masterURL: String): Boolean
  def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
  def createSchedulerBackend(
    sc: SparkContext, 
    masterURL: String, 
    scheduler: TaskScheduler
  ): SchedulerBackend
  def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}

Cluster Management

Coarse-Grained Scheduling

Long-running executor mode that holds onto Mesos resources for the duration of the Spark job, providing lower latency and more predictable performance.

private[spark] class MesosCoarseGrainedSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext,
  master: String,
  securityManager: SecurityManager
) extends CoarseGrainedSchedulerBackend with org.apache.mesos.Scheduler {
  def start(): Unit
  def stop(): Unit
  def applicationId(): String
  def sufficientResourcesRegistered(): Boolean
  def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean]
  def doKillExecutors(executorIds: Seq[String]): Future[Boolean]
}

Coarse-Grained Scheduling

Fine-Grained Scheduling

Dynamic resource allocation mode where each Spark task maps to a separate Mesos task, allowing for efficient resource sharing across multiple applications.

private[spark] class MesosFineGrainedSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext,
  master: String
) extends SchedulerBackend with org.apache.mesos.Scheduler {
  def start(): Unit
  def stop(): Unit
  def reviveOffers(): Unit
  def killTask(taskId: Long, executorId: String, interruptThread: Boolean, reason: String): Unit
  def defaultParallelism(): Int
  def applicationId(): String
}

Fine-Grained Scheduling

Cluster Deployment

Cluster mode deployment functionality for submitting Spark drivers as Mesos tasks, including driver lifecycle management and recovery.

private[spark] class MesosClusterScheduler(
  engineFactory: MesosClusterPersistenceEngineFactory,
  conf: SparkConf
) extends Scheduler with MesosSchedulerUtils {
  def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse
  def killDriver(submissionId: String): KillSubmissionResponse
  def getDriverStatus(submissionId: String): SubmissionStatusResponse
  def start(): Unit
  def stop(): Unit
}

private[mesos] class MesosClusterDispatcher(
  args: MesosClusterDispatcherArguments,
  conf: SparkConf
) {
  def start(): Unit
  def stop(): Unit
  def awaitShutdown(): Unit
}

Cluster Deployment

Resource and Configuration Management

Utilities for Mesos resource negotiation, constraint matching, and configuration management.

trait MesosSchedulerUtils {
  def createSchedulerDriver(
    masterUrl: String,
    scheduler: Scheduler,
    sparkUser: String,
    appName: String,
    conf: SparkConf,
    webuiUrl: Option[String] = None,
    checkpoint: Option[Boolean] = None,
    failoverTimeout: Option[Double] = None,
    frameworkId: Option[String] = None
  ): SchedulerDriver
  
  def getResource(res: JList[Resource], name: String): Double
  def partitionResources(
    resources: JList[Resource],
    resourceName: String,
    amountToUse: Double
  ): (List[Resource], List[Resource])
  def matchesAttributeRequirements(
    slaveOfferConstraints: Map[String, Set[String]],
    offerAttributes: Map[String, GeneratedMessage]
  ): Boolean
}

Resource and Configuration Management

External Shuffle Service

Mesos-specific external shuffle service that provides shuffle data persistence and cleanup when drivers terminate.

private[mesos] class MesosExternalShuffleService extends ExternalShuffleService {
  def start(): Unit
  def stop(): Unit
}

private[mesos] class MesosExternalShuffleBlockHandler(
  transportConf: TransportConf,
  cleanerIntervalS: Long
) extends ExternalShuffleBlockHandler with Logging {
  def registerApplication(appShuffleInfo: ApplicationShuffleInfo): Unit
  def applicationRemoved(appId: String, cleanupLocalDirs: Boolean): Unit
}

## Types

```scala { .api }
// Required imports for types
import java.util.Date
import org.apache.spark.deploy.Command
import org.apache.mesos.Protos.{TaskID, SlaveID, TaskStatus}
import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
import org.apache.spark.internal.config.ConfigEntry

// Driver submission and state (internal classes)
private[spark] class MesosDriverDescription(
  name: String,
  jarUrl: String,
  mem: Int,
  cores: Double,
  supervise: Boolean,
  command: Command,
  schedulerProperties: Map[String, String],
  submissionId: String,
  submissionDate: Date,
  retryState: Option[MesosClusterRetryState] = None
)

private[spark] class MesosClusterSubmissionState(
  val driverDescription: MesosDriverDescription,
  val taskId: TaskID,
  val slaveId: SlaveID,
  var mesosTaskStatus: Option[TaskStatus],
  var startDate: Date,
  var finishDate: Option[Date],
  val frameworkId: String
) extends Serializable

// Configuration objects
package object config {
  val RECOVERY_MODE: ConfigEntry[String]
  val DISPATCHER_WEBUI_URL: ConfigEntry[Option[String]]
  val ZOOKEEPER_URL: ConfigEntry[Option[String]]
  val HISTORY_SERVER_URL: ConfigEntry[Option[String]]
  val DRIVER_CONSTRAINTS: ConfigEntry[String]
}