Apache Mesos cluster manager integration for Apache Spark enabling distributed computation on Mesos clusters.
npx @tessl/cli install tessl/maven-org-apache-spark--spark-mesos-2-13@3.5.0Spark Mesos provides Apache Mesos cluster manager integration for Apache Spark, enabling Spark applications to run on Mesos clusters with both coarse-grained and fine-grained scheduling modes. It supports dynamic resource allocation, fault tolerance, and provides utilities for Mesos-specific configuration and monitoring.
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.cluster.mesos.MesosProtoUtils
import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackendUtil
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient
import org.apache.spark.deploy.mesos.config._For Java applications:
import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.executor.MesosExecutorBackend;import org.apache.spark.{SparkConf, SparkContext}
// Configure Spark to use Mesos
val conf = new SparkConf()
.setAppName("MySparkApp")
.setMaster("mesos://mesos-master:5050") // Connect to Mesos master
.set("spark.mesos.coarse", "true") // Use coarse-grained mode
.set("spark.cores.max", "4") // Limit maximum cores
.set("spark.mesos.executor.docker.image", "spark-executor:latest")
val sc = new SparkContext(conf)
// Your Spark application logic here
val data = sc.parallelize(1 to 100)
val result = data.map(_ * 2).collect()
sc.stop()Spark Mesos integration consists of several key components:
MesosClusterManager automatically selected when using mesos:// URLsMesosClusterScheduler and dispatcher for cluster mode deploymentsMesosSchedulerBackendUtil for container and volume managementMesosSecretConfig for handling secrets and authenticationConfigure Spark to connect to and run on Mesos clusters using master URL and configuration options.
// Master URL format
val master: String = "mesos://host:port"
// Key configuration options
val conf = new SparkConf()
.setMaster(master)
.set("spark.mesos.coarse", "true") // Enable coarse-grained modeChoose between coarse-grained and fine-grained execution modes depending on your resource sharing requirements.
Coarse-grained Mode (default):
Fine-grained Mode:
// Coarse-grained mode (default)
conf.set("spark.mesos.coarse", "true")
// Fine-grained mode
conf.set("spark.mesos.coarse", "false")Control resource allocation and constraints for Spark executors running on Mesos.
// Core and memory configuration
conf.set("spark.cores.max", "8") // Maximum cores across all executors
conf.set("spark.executor.cores", "2") // Cores per executor
conf.set("spark.executor.memory", "2g") // Memory per executor
conf.set("spark.mesos.executor.memoryOverhead", "512") // Additional memory overhead
// GPU support
conf.set("spark.mesos.gpus.max", "2") // Maximum GPU resources
// Resource constraints
conf.set("spark.mesos.constraints", "os:centos") // Attribute-based constraints
conf.set("spark.mesos.role", "spark-framework") // Mesos framework roleRun Spark executors in Docker containers with full configuration support.
// Docker executor configuration
conf.set("spark.mesos.executor.docker.image", "spark:3.5.6")
conf.set("spark.mesos.executor.docker.forcePullImage", "true")
conf.set("spark.mesos.containerizer", "mesos") // Use Mesos containerizer
// Volume mounts
conf.set("spark.mesos.executor.docker.volumes",
"/host/data:/container/data:ro,/host/logs:/container/logs:rw")
// Docker parameters
conf.set("spark.mesos.executor.docker.parameters",
"memory-swap=-1,ulimit=nofile=65536:65536")Configure authentication credentials for secure Mesos clusters.
// Principal and secret authentication
conf.set("spark.mesos.principal", "spark-framework")
conf.set("spark.mesos.secret", "secret-value")
// File-based authentication
conf.set("spark.mesos.principal.file", "/path/to/principal.txt")
conf.set("spark.mesos.secret.file", "/path/to/secret.txt")
// Driver secrets
conf.set("spark.mesos.driver.secret.names", "secret1,secret2")
conf.set("spark.mesos.driver.secret.values", "value1,value2")
conf.set("spark.mesos.driver.secret.envkeys", "SECRET1_ENV,SECRET2_ENV")Configure networking options for containerized and multi-tenant environments.
// Named network attachment
conf.set("spark.mesos.network.name", "spark-network")
conf.set("spark.mesos.network.labels", "env:production,team:data")
// Service URLs
conf.set("spark.mesos.driver.webui.url", "http://driver-host:4040")
conf.set("spark.mesos.dispatcher.webui.url", "http://dispatcher:8080")Add metadata and labels to Mesos tasks for monitoring and organization.
// Task labels for monitoring
conf.set("spark.mesos.task.labels", "app:spark,env:prod,team:analytics")
conf.set("spark.mesos.driver.labels", "type:driver,priority:high")
// Task constraints and placement
conf.set("spark.mesos.driver.constraints", "zone:us-west-1")
conf.set("spark.mesos.rejectOfferDuration", "120s")Configure external shuffle service for improved performance in coarse-grained mode.
/**
* Client for communicating with external shuffle service in Mesos coarse-grained mode
*/
public class MesosExternalBlockStoreClient extends ExternalBlockStoreClient {
/**
* Creates a Mesos external shuffle client
* @param conf Transport configuration
* @param secretKeyHolder Secret key holder for authentication
* @param authEnabled Whether authentication is enabled
* @param registrationTimeoutMs Timeout for registration in milliseconds
*/
public MesosExternalBlockStoreClient(
TransportConf conf,
SecretKeyHolder secretKeyHolder,
boolean authEnabled,
long registrationTimeoutMs);
/**
* Register driver with the shuffle service
* @param host Shuffle service host
* @param port Shuffle service port
* @param heartbeatTimeoutMs Heartbeat timeout in milliseconds
* @param heartbeatIntervalMs Heartbeat interval in milliseconds
*/
public void registerDriverWithShuffleService(
String host,
int port,
long heartbeatTimeoutMs,
long heartbeatIntervalMs) throws IOException, InterruptedException;
}Utility functions for working with Mesos protocol buffers and labels.
/**
* Utilities for working with Mesos protocol buffers
*/
object MesosProtoUtils {
/**
* Parses a label string into Mesos Labels protobuf
* @param labelsStr Label string in format "key1:value1,key2:value2"
* @return Mesos Labels.Builder for constructing protobuf messages
*/
def mesosLabels(labelsStr: String): org.apache.mesos.Protos.Labels.Builder
}Utilities for container and volume management in Mesos environments.
/**
* Utility object providing helper methods for Mesos scheduler backends
*/
object MesosSchedulerBackendUtil {
/**
* Parse volume specifications for container mounts
* @param volumes Sequence of volume specifications in format "hostPath:containerPath:mode"
* @return List of Mesos Volume objects
*/
def parseVolumesSpec(volumes: Seq[String]): List[Volume]
/**
* Parse port mapping specifications for Docker containers
* @param portmaps Sequence of port mapping specifications in format "hostPort:containerPort:protocol"
* @return List of DockerInfo.PortMapping objects
*/
def parsePortMappingsSpec(portmaps: Seq[String]): List[DockerInfo.PortMapping]
/**
* Build container information from Spark configuration
* @param conf Spark configuration containing container settings
* @return ContainerInfo.Builder for Mesos container setup
*/
def buildContainerInfo(conf: SparkConf): ContainerInfo.Builder
/**
* Convert Spark task state to Mesos task state
* @param state Spark TaskState
* @return Corresponding MesosTaskState
*/
def taskStateToMesos(state: TaskState): MesosTaskState
/**
* Extract secret environment variables from configuration
* @param conf Spark configuration
* @param secretConfig Secret configuration for driver or executor
* @return Sequence of environment variables for secrets
*/
def getSecretEnvVar(conf: SparkConf, secretConfig: MesosSecretConfig): Seq[Variable]
/**
* Extract secret volume from configuration
* @param conf Spark configuration
* @param secretConfig Secret configuration for driver or executor
* @return Optional volume for secret mounting
*/
def getSecretVolume(conf: SparkConf, secretConfig: MesosSecretConfig): Option[Volume]
}Components for managing Spark applications in Mesos cluster mode with dispatcher.
/**
* Scheduler for managing driver lifecycles in Mesos cluster mode
*/
class MesosClusterScheduler extends MesosScheduler {
/**
* Submit a new driver to the Mesos cluster
* @param description Driver description containing application details
* @return Submission response with driver ID and status
*/
def submitDriver(description: MesosDriverDescription): CreateSubmissionResponse
/**
* Kill a running driver
* @param submissionId Driver submission ID
* @return Kill response with success status
*/
def killDriver(submissionId: String): KillSubmissionResponse
/**
* Get status of a driver
* @param submissionId Driver submission ID
* @return Driver status response with current state
*/
def getDriverStatus(submissionId: String): SubmissionStatusResponse
}
/**
* Description of a driver to be submitted to Mesos cluster
*/
class MesosDriverDescription(
jarUrl: String,
mainClass: String,
args: Array[String],
conf: SparkConf,
supervise: Boolean = false) {
def appName: String
def sparkProperties: Map[String, String]
def environmentVariables: Map[String, String]
}
/**
* Configuration helper for managing secrets in Mesos environments
*/
class MesosSecretConfig(taskType: String) {
/**
* Get comma-separated secret names for the specified task type
* @return Secret names configuration value
*/
def secretNames: String
/**
* Get comma-separated secret values for the specified task type
* @return Secret values configuration value
*/
def secretValues: String
/**
* Get comma-separated environment variable keys for secrets
* @return Environment variable keys configuration value
*/
def secretEnvKeys: String
/**
* Get comma-separated secret filenames for file-based secrets
* @return Secret filenames configuration value
*/
def secretFilenames: String
}Configuration for running Spark applications in Mesos cluster mode with dispatcher.
// Cluster mode configuration
conf.set("spark.mesos.maxDrivers", "100") // Maximum concurrent drivers
conf.set("spark.mesos.retainedDrivers", "50") // Number of retained drivers
conf.set("spark.mesos.dispatcher.queue", "default") // Dispatcher queue name
// Failover configuration
conf.set("spark.mesos.driver.failoverTimeout", "600.0") // Driver failover timeout in seconds
conf.set("spark.mesos.cluster.retry.wait.max", "60") // Maximum retry wait time// Execution mode
"spark.mesos.coarse" -> "true" // Boolean: Use coarse-grained mode (default: true)
// Resource allocation
"spark.cores.max" -> "8" // String: Maximum cores across all executors
"spark.mesos.mesosExecutor.cores" -> "1.0" // String: Cores per Mesos executor (fine-grained)
"spark.mesos.extra.cores" -> "0" // String: Extra cores to advertise per executor
"spark.mesos.executor.memoryOverhead" -> "384" // String: Additional memory per executor (MiB)
"spark.mesos.gpus.max" -> "0" // String: Maximum GPU resources
// Constraints and placement
"spark.mesos.constraints" -> "" // String: Attribute-based constraints
"spark.mesos.driver.constraints" -> "" // String: Driver placement constraints
"spark.mesos.role" -> "" // String: Mesos framework role
// Docker configuration
"spark.mesos.executor.docker.image" -> "" // String: Docker image for executors
"spark.mesos.executor.docker.forcePullImage" -> "" // String: Force pull Docker image
"spark.mesos.executor.docker.volumes" -> "" // String: Volume mounts (comma-separated)
"spark.mesos.executor.docker.portmaps" -> "" // String: Port mappings (comma-separated)
"spark.mesos.executor.docker.parameters" -> "" // String: Docker run parameters
"spark.mesos.containerizer" -> "docker" // String: Containerizer type ("docker" or "mesos")// Principal and secret
"spark.mesos.principal" -> "" // String: Kerberos principal name
"spark.mesos.principal.file" -> "" // String: Path to principal file
"spark.mesos.secret" -> "" // String: Authentication secret
"spark.mesos.secret.file" -> "" // String: Path to secret file
// Driver secrets
"spark.mesos.driver.secret.names" -> "" // String: Comma-separated secret names
"spark.mesos.driver.secret.values" -> "" // String: Comma-separated secret values
"spark.mesos.driver.secret.envkeys" -> "" // String: Environment variable names
"spark.mesos.driver.secret.filenames" -> "" // String: Secret file paths
// Executor secrets (same pattern as driver)
"spark.mesos.executor.secret.names" -> ""
"spark.mesos.executor.secret.values" -> ""
"spark.mesos.executor.secret.envkeys" -> ""
"spark.mesos.executor.secret.filenames" -> ""// Networking
"spark.mesos.network.name" -> "" // String: Named network for containers
"spark.mesos.network.labels" -> "" // String: Network labels for CNI
// Web UI URLs
"spark.mesos.driver.webui.url" -> "" // String: Driver web UI URL
"spark.mesos.dispatcher.webui.url" -> "" // String: Dispatcher web UI URL
"spark.mesos.proxy.baseURL" -> "" // String: Proxy base URL
// History server
"spark.mesos.dispatcher.historyServer.url" -> "" // String: History server URL// Offer management
"spark.mesos.rejectOfferDuration" -> "120s" // String: Default reject duration
"spark.mesos.rejectOfferDurationForUnmetConstraints" -> "" // String: Reject duration for unmet constraints
"spark.mesos.rejectOfferDurationForReachedMaxCores" -> "" // String: Reject duration when max cores reached
// Task and executor configuration
"spark.mesos.task.labels" -> "" // String: Labels for tasks
"spark.mesos.driver.labels" -> "" // String: Labels for driver
"spark.mesos.uris" -> "" // String: Comma-separated URIs to download
"spark.executor.uri" -> "" // String: Executor URI
"spark.mesos.executor.home" -> "" // String: Spark home directory on executors
// Fetcher and caching
"spark.mesos.fetcherCache.enable" -> "false" // String: Enable Mesos fetcher cache
"spark.mesos.appJar.local.resolution.mode" -> "host" // String: Local JAR resolution mode
// Cluster mode
"spark.mesos.maxDrivers" -> "200" // String: Maximum concurrent drivers
"spark.mesos.retainedDrivers" -> "200" // String: Number of retained completed drivers
"spark.mesos.driver.failoverTimeout" -> "0.0" // String: Driver failover timeout (seconds)Common exceptions and error scenarios:
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf()
.setAppName("Mesos Example")
.setMaster("mesos://mesos-master:5050")
.set("spark.mesos.coarse", "true")
.set("spark.cores.max", "4")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 1000)
val sum = rdd.reduce(_ + _)
println(s"Sum: $sum")
sc.stop()val conf = new SparkConf()
.setAppName("Dockerized Spark")
.setMaster("mesos://mesos-master:5050")
.set("spark.mesos.executor.docker.image", "apache/spark:3.5.6")
.set("spark.mesos.executor.docker.volumes", "/data:/spark-data:ro")
.set("spark.mesos.containerizer", "mesos")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
val sc = new SparkContext(conf)
// Application logic here
sc.stop()import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.sasl.SecretKeyHolder;
// Create and configure client
TransportConf transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle");
SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {
@Override
public String getSaslUser(String appId) { return "spark"; }
@Override
public String getSecretKey(String appId) { return "secret"; }
};
MesosExternalBlockStoreClient client = new MesosExternalBlockStoreClient(
transportConf, secretKeyHolder, true, 5000);
// Register with shuffle service
client.registerDriverWithShuffleService("shuffle-host", 7337, 120000, 30000);import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
import org.apache.spark.deploy.mesos.MesosDriverDescription
// Create cluster scheduler
val clusterScheduler = new MesosClusterScheduler()
// Configure driver description
val driverConf = new SparkConf()
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
.set("spark.mesos.coarse", "true")
val driverDescription = new MesosDriverDescription(
jarUrl = "hdfs://namenode:9000/spark-apps/my-app.jar",
mainClass = "com.example.MySparkApp",
args = Array("--input", "/data/input", "--output", "/data/output"),
conf = driverConf,
supervise = true
)
// Submit driver to cluster
val submissionResponse = clusterScheduler.submitDriver(driverDescription)
println(s"Driver submitted with ID: ${submissionResponse.submissionId}")
// Monitor driver status
val statusResponse = clusterScheduler.getDriverStatus(submissionResponse.submissionId)
println(s"Driver status: ${statusResponse.driverState}")import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackendUtil
val conf = new SparkConf()
.set("spark.mesos.executor.docker.image", "apache/spark:3.5.6-scala2.13")
.set("spark.mesos.executor.docker.volumes",
"/data:/spark-data:ro,/logs:/spark-logs:rw")
.set("spark.mesos.executor.docker.portmaps",
"8080:8080:tcp,8081:8081:tcp")
.set("spark.mesos.containerizer", "mesos")
// Parse volume specifications
val volumes = MesosSchedulerBackendUtil.parseVolumesSpec(
Seq("/data:/spark-data:ro", "/logs:/spark-logs:rw"))
// Parse port mappings
val portMappings = MesosSchedulerBackendUtil.parsePortMappingsSpec(
Seq("8080:8080:tcp", "8081:8081:tcp"))
// Build container info
val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(conf)// Configuration type aliases
type ConfigKey = String
type ConfigValue = String
type MasterURL = String // Format: "mesos://host:port"
// Label parsing
type LabelString = String // Format: "key1:value1,key2:value2"
// Cluster mode types
type SubmissionId = String
type DriverState = String // SUBMITTED, RUNNING, FINISHED, FAILED, KILLED
type FrameworkId = String
// Container and volume types
type VolumeSpec = String // Format: "hostPath:containerPath:mode"
type PortMapSpec = String // Format: "hostPort:containerPort:protocol"
// Response types for cluster operations
trait CreateSubmissionResponse {
def submissionId: String
def success: Boolean
}
trait KillSubmissionResponse {
def submissionId: String
def success: Boolean
}
trait SubmissionStatusResponse {
def submissionId: String
def driverState: String
def success: Boolean
}
// Mesos protocol buffer types (from Apache Mesos)
import org.apache.mesos.Protos.{Volume, ContainerInfo, TaskState}
import org.apache.mesos.v1.Protos.{Variable, Labels}