or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-spark--spark-mesos-2-13

Apache Mesos cluster manager integration for Apache Spark enabling distributed computation on Mesos clusters.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-mesos_2.13@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-mesos-2-13@3.5.0

index.mddocs/

Spark Mesos

Spark Mesos provides Apache Mesos cluster manager integration for Apache Spark, enabling Spark applications to run on Mesos clusters with both coarse-grained and fine-grained scheduling modes. It supports dynamic resource allocation, fault tolerance, and provides utilities for Mesos-specific configuration and monitoring.

Package Information

  • Package Name: spark-mesos_2.13
  • Package Type: maven
  • Language: Scala
  • Group ID: org.apache.spark
  • Installation: Add to your Maven dependencies or include in Spark's classpath

Core Imports

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.cluster.mesos.MesosProtoUtils
import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackendUtil
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient
import org.apache.spark.deploy.mesos.config._

For Java applications:

import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.executor.MesosExecutorBackend;

Basic Usage

import org.apache.spark.{SparkConf, SparkContext}

// Configure Spark to use Mesos
val conf = new SparkConf()
  .setAppName("MySparkApp")
  .setMaster("mesos://mesos-master:5050")  // Connect to Mesos master
  .set("spark.mesos.coarse", "true")       // Use coarse-grained mode
  .set("spark.cores.max", "4")             // Limit maximum cores
  .set("spark.mesos.executor.docker.image", "spark-executor:latest")

val sc = new SparkContext(conf)

// Your Spark application logic here
val data = sc.parallelize(1 to 100)
val result = data.map(_ * 2).collect()

sc.stop()

Architecture

Spark Mesos integration consists of several key components:

  • Cluster Manager: MesosClusterManager automatically selected when using mesos:// URLs
  • Scheduler Backends: Coarse-grained and fine-grained backends for different resource sharing modes
  • Cluster Mode Components: MesosClusterScheduler and dispatcher for cluster mode deployments
  • Scheduler Utilities: MesosSchedulerBackendUtil for container and volume management
  • Configuration System: Extensive configuration options for Mesos-specific settings
  • Security Management: MesosSecretConfig for handling secrets and authentication
  • External Shuffle Service: Optional shuffle service integration for coarse-grained mode
  • Protocol Utilities: Helper functions for working with Mesos protocol buffers

Capabilities

Cluster Configuration

Configure Spark to connect to and run on Mesos clusters using master URL and configuration options.

// Master URL format
val master: String = "mesos://host:port"

// Key configuration options
val conf = new SparkConf()
  .setMaster(master)
  .set("spark.mesos.coarse", "true")  // Enable coarse-grained mode

Execution Modes

Choose between coarse-grained and fine-grained execution modes depending on your resource sharing requirements.

Coarse-grained Mode (default):

  • Spark acquires long-lived Mesos tasks on each machine
  • Lower latency, predictable resource allocation
  • Better for batch processing and long-running applications

Fine-grained Mode:

  • Each Spark task maps to a separate Mesos task
  • Dynamic resource sharing between applications
  • Better for resource utilization in multi-tenant environments
// Coarse-grained mode (default)
conf.set("spark.mesos.coarse", "true")

// Fine-grained mode  
conf.set("spark.mesos.coarse", "false")

Resource Management

Control resource allocation and constraints for Spark executors running on Mesos.

// Core and memory configuration
conf.set("spark.cores.max", "8")                    // Maximum cores across all executors
conf.set("spark.executor.cores", "2")               // Cores per executor
conf.set("spark.executor.memory", "2g")             // Memory per executor
conf.set("spark.mesos.executor.memoryOverhead", "512") // Additional memory overhead

// GPU support
conf.set("spark.mesos.gpus.max", "2")               // Maximum GPU resources

// Resource constraints
conf.set("spark.mesos.constraints", "os:centos")    // Attribute-based constraints
conf.set("spark.mesos.role", "spark-framework")     // Mesos framework role

Docker Integration

Run Spark executors in Docker containers with full configuration support.

// Docker executor configuration
conf.set("spark.mesos.executor.docker.image", "spark:3.5.6")
conf.set("spark.mesos.executor.docker.forcePullImage", "true")
conf.set("spark.mesos.containerizer", "mesos")  // Use Mesos containerizer

// Volume mounts
conf.set("spark.mesos.executor.docker.volumes", 
  "/host/data:/container/data:ro,/host/logs:/container/logs:rw")

// Docker parameters
conf.set("spark.mesos.executor.docker.parameters", 
  "memory-swap=-1,ulimit=nofile=65536:65536")

Authentication and Security

Configure authentication credentials for secure Mesos clusters.

// Principal and secret authentication
conf.set("spark.mesos.principal", "spark-framework")
conf.set("spark.mesos.secret", "secret-value")

// File-based authentication
conf.set("spark.mesos.principal.file", "/path/to/principal.txt")
conf.set("spark.mesos.secret.file", "/path/to/secret.txt")

// Driver secrets
conf.set("spark.mesos.driver.secret.names", "secret1,secret2")
conf.set("spark.mesos.driver.secret.values", "value1,value2")
conf.set("spark.mesos.driver.secret.envkeys", "SECRET1_ENV,SECRET2_ENV")

Network Configuration

Configure networking options for containerized and multi-tenant environments.

// Named network attachment
conf.set("spark.mesos.network.name", "spark-network")
conf.set("spark.mesos.network.labels", "env:production,team:data")

// Service URLs
conf.set("spark.mesos.driver.webui.url", "http://driver-host:4040")
conf.set("spark.mesos.dispatcher.webui.url", "http://dispatcher:8080")

Task Management and Labeling

Add metadata and labels to Mesos tasks for monitoring and organization.

// Task labels for monitoring
conf.set("spark.mesos.task.labels", "app:spark,env:prod,team:analytics")
conf.set("spark.mesos.driver.labels", "type:driver,priority:high")

// Task constraints and placement
conf.set("spark.mesos.driver.constraints", "zone:us-west-1")
conf.set("spark.mesos.rejectOfferDuration", "120s")

External Shuffle Service

Configure external shuffle service for improved performance in coarse-grained mode.

/**
 * Client for communicating with external shuffle service in Mesos coarse-grained mode
 */
public class MesosExternalBlockStoreClient extends ExternalBlockStoreClient {
  
  /**
   * Creates a Mesos external shuffle client
   * @param conf Transport configuration
   * @param secretKeyHolder Secret key holder for authentication
   * @param authEnabled Whether authentication is enabled
   * @param registrationTimeoutMs Timeout for registration in milliseconds
   */
  public MesosExternalBlockStoreClient(
      TransportConf conf,
      SecretKeyHolder secretKeyHolder,
      boolean authEnabled,
      long registrationTimeoutMs);

  /**
   * Register driver with the shuffle service
   * @param host Shuffle service host
   * @param port Shuffle service port  
   * @param heartbeatTimeoutMs Heartbeat timeout in milliseconds
   * @param heartbeatIntervalMs Heartbeat interval in milliseconds
   */
  public void registerDriverWithShuffleService(
      String host,
      int port,
      long heartbeatTimeoutMs,
      long heartbeatIntervalMs) throws IOException, InterruptedException;
}

Protocol Utilities

Utility functions for working with Mesos protocol buffers and labels.

/**
 * Utilities for working with Mesos protocol buffers
 */
object MesosProtoUtils {
  /**
   * Parses a label string into Mesos Labels protobuf
   * @param labelsStr Label string in format "key1:value1,key2:value2"
   * @return Mesos Labels.Builder for constructing protobuf messages
   */
  def mesosLabels(labelsStr: String): org.apache.mesos.Protos.Labels.Builder
}

Scheduler Backend Utilities

Utilities for container and volume management in Mesos environments.

/**
 * Utility object providing helper methods for Mesos scheduler backends
 */
object MesosSchedulerBackendUtil {
  /**
   * Parse volume specifications for container mounts
   * @param volumes Sequence of volume specifications in format "hostPath:containerPath:mode"
   * @return List of Mesos Volume objects
   */
  def parseVolumesSpec(volumes: Seq[String]): List[Volume]
  
  /**
   * Parse port mapping specifications for Docker containers
   * @param portmaps Sequence of port mapping specifications in format "hostPort:containerPort:protocol"
   * @return List of DockerInfo.PortMapping objects
   */
  def parsePortMappingsSpec(portmaps: Seq[String]): List[DockerInfo.PortMapping]
  
  /**
   * Build container information from Spark configuration
   * @param conf Spark configuration containing container settings
   * @return ContainerInfo.Builder for Mesos container setup
   */
  def buildContainerInfo(conf: SparkConf): ContainerInfo.Builder
  
  /**
   * Convert Spark task state to Mesos task state
   * @param state Spark TaskState
   * @return Corresponding MesosTaskState
   */
  def taskStateToMesos(state: TaskState): MesosTaskState
  
  /**
   * Extract secret environment variables from configuration
   * @param conf Spark configuration
   * @param secretConfig Secret configuration for driver or executor
   * @return Sequence of environment variables for secrets
   */
  def getSecretEnvVar(conf: SparkConf, secretConfig: MesosSecretConfig): Seq[Variable]
  
  /**
   * Extract secret volume from configuration
   * @param conf Spark configuration
   * @param secretConfig Secret configuration for driver or executor
   * @return Optional volume for secret mounting
   */
  def getSecretVolume(conf: SparkConf, secretConfig: MesosSecretConfig): Option[Volume]
}

Cluster Mode Management

Components for managing Spark applications in Mesos cluster mode with dispatcher.

/**
 * Scheduler for managing driver lifecycles in Mesos cluster mode
 */
class MesosClusterScheduler extends MesosScheduler {
  /**
   * Submit a new driver to the Mesos cluster
   * @param description Driver description containing application details
   * @return Submission response with driver ID and status
   */
  def submitDriver(description: MesosDriverDescription): CreateSubmissionResponse
  
  /**
   * Kill a running driver
   * @param submissionId Driver submission ID
   * @return Kill response with success status
   */
  def killDriver(submissionId: String): KillSubmissionResponse
  
  /**
   * Get status of a driver
   * @param submissionId Driver submission ID
   * @return Driver status response with current state
   */
  def getDriverStatus(submissionId: String): SubmissionStatusResponse
}

/**
 * Description of a driver to be submitted to Mesos cluster
 */
class MesosDriverDescription(
  jarUrl: String,
  mainClass: String, 
  args: Array[String],
  conf: SparkConf,
  supervise: Boolean = false) {
  
  def appName: String
  def sparkProperties: Map[String, String]
  def environmentVariables: Map[String, String]
}

/**
 * Configuration helper for managing secrets in Mesos environments
 */
class MesosSecretConfig(taskType: String) {
  /**
   * Get comma-separated secret names for the specified task type
   * @return Secret names configuration value
   */
  def secretNames: String
  
  /**
   * Get comma-separated secret values for the specified task type
   * @return Secret values configuration value
   */
  def secretValues: String
  
  /**
   * Get comma-separated environment variable keys for secrets
   * @return Environment variable keys configuration value
   */
  def secretEnvKeys: String
  
  /**
   * Get comma-separated secret filenames for file-based secrets
   * @return Secret filenames configuration value
   */
  def secretFilenames: String
}

Cluster Mode and Dispatcher

Configuration for running Spark applications in Mesos cluster mode with dispatcher.

// Cluster mode configuration
conf.set("spark.mesos.maxDrivers", "100")           // Maximum concurrent drivers
conf.set("spark.mesos.retainedDrivers", "50")       // Number of retained drivers
conf.set("spark.mesos.dispatcher.queue", "default") // Dispatcher queue name

// Failover configuration
conf.set("spark.mesos.driver.failoverTimeout", "600.0") // Driver failover timeout in seconds
conf.set("spark.mesos.cluster.retry.wait.max", "60")    // Maximum retry wait time

Configuration Reference

Core Configuration Options

// Execution mode
"spark.mesos.coarse" -> "true"  // Boolean: Use coarse-grained mode (default: true)

// Resource allocation
"spark.cores.max" -> "8"                              // String: Maximum cores across all executors
"spark.mesos.mesosExecutor.cores" -> "1.0"            // String: Cores per Mesos executor (fine-grained)
"spark.mesos.extra.cores" -> "0"                      // String: Extra cores to advertise per executor
"spark.mesos.executor.memoryOverhead" -> "384"        // String: Additional memory per executor (MiB)
"spark.mesos.gpus.max" -> "0"                         // String: Maximum GPU resources

// Constraints and placement
"spark.mesos.constraints" -> ""                       // String: Attribute-based constraints
"spark.mesos.driver.constraints" -> ""                // String: Driver placement constraints
"spark.mesos.role" -> ""                              // String: Mesos framework role

// Docker configuration
"spark.mesos.executor.docker.image" -> ""             // String: Docker image for executors
"spark.mesos.executor.docker.forcePullImage" -> ""    // String: Force pull Docker image
"spark.mesos.executor.docker.volumes" -> ""           // String: Volume mounts (comma-separated)
"spark.mesos.executor.docker.portmaps" -> ""          // String: Port mappings (comma-separated)
"spark.mesos.executor.docker.parameters" -> ""        // String: Docker run parameters
"spark.mesos.containerizer" -> "docker"               // String: Containerizer type ("docker" or "mesos")

Authentication Configuration

// Principal and secret
"spark.mesos.principal" -> ""                    // String: Kerberos principal name
"spark.mesos.principal.file" -> ""               // String: Path to principal file
"spark.mesos.secret" -> ""                       // String: Authentication secret
"spark.mesos.secret.file" -> ""                  // String: Path to secret file

// Driver secrets
"spark.mesos.driver.secret.names" -> ""          // String: Comma-separated secret names
"spark.mesos.driver.secret.values" -> ""         // String: Comma-separated secret values
"spark.mesos.driver.secret.envkeys" -> ""        // String: Environment variable names
"spark.mesos.driver.secret.filenames" -> ""      // String: Secret file paths

// Executor secrets (same pattern as driver)
"spark.mesos.executor.secret.names" -> ""
"spark.mesos.executor.secret.values" -> ""
"spark.mesos.executor.secret.envkeys" -> ""
"spark.mesos.executor.secret.filenames" -> ""

Network and Service Configuration

// Networking
"spark.mesos.network.name" -> ""                 // String: Named network for containers
"spark.mesos.network.labels" -> ""               // String: Network labels for CNI

// Web UI URLs
"spark.mesos.driver.webui.url" -> ""             // String: Driver web UI URL
"spark.mesos.dispatcher.webui.url" -> ""         // String: Dispatcher web UI URL
"spark.mesos.proxy.baseURL" -> ""                // String: Proxy base URL

// History server
"spark.mesos.dispatcher.historyServer.url" -> "" // String: History server URL

Advanced Configuration

// Offer management
"spark.mesos.rejectOfferDuration" -> "120s"                          // String: Default reject duration
"spark.mesos.rejectOfferDurationForUnmetConstraints" -> ""           // String: Reject duration for unmet constraints
"spark.mesos.rejectOfferDurationForReachedMaxCores" -> ""            // String: Reject duration when max cores reached

// Task and executor configuration
"spark.mesos.task.labels" -> ""                  // String: Labels for tasks
"spark.mesos.driver.labels" -> ""                // String: Labels for driver
"spark.mesos.uris" -> ""                         // String: Comma-separated URIs to download
"spark.executor.uri" -> ""                       // String: Executor URI
"spark.mesos.executor.home" -> ""                // String: Spark home directory on executors

// Fetcher and caching
"spark.mesos.fetcherCache.enable" -> "false"    // String: Enable Mesos fetcher cache
"spark.mesos.appJar.local.resolution.mode" -> "host" // String: Local JAR resolution mode

// Cluster mode
"spark.mesos.maxDrivers" -> "200"                // String: Maximum concurrent drivers
"spark.mesos.retainedDrivers" -> "200"           // String: Number of retained completed drivers
"spark.mesos.driver.failoverTimeout" -> "0.0"   // String: Driver failover timeout (seconds)

Error Handling

Common exceptions and error scenarios:

  • SparkException: Thrown for malformed configuration or connection issues
  • IOException: Network-related errors when connecting to Mesos master or shuffle service
  • IllegalArgumentException: Invalid configuration values or parameters

Usage Examples

Basic Mesos Application

import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf()
  .setAppName("Mesos Example")
  .setMaster("mesos://mesos-master:5050")
  .set("spark.mesos.coarse", "true")
  .set("spark.cores.max", "4")

val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 1000)
val sum = rdd.reduce(_ + _)
println(s"Sum: $sum")
sc.stop()

Docker-based Execution

val conf = new SparkConf()
  .setAppName("Dockerized Spark")
  .setMaster("mesos://mesos-master:5050")
  .set("spark.mesos.executor.docker.image", "apache/spark:3.5.6")
  .set("spark.mesos.executor.docker.volumes", "/data:/spark-data:ro")
  .set("spark.mesos.containerizer", "mesos")
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "2")

val sc = new SparkContext(conf)
// Application logic here
sc.stop()

External Shuffle Service Setup

import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.sasl.SecretKeyHolder;

// Create and configure client
TransportConf transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle");
SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {
    @Override
    public String getSaslUser(String appId) { return "spark"; }
    @Override  
    public String getSecretKey(String appId) { return "secret"; }
};

MesosExternalBlockStoreClient client = new MesosExternalBlockStoreClient(
    transportConf, secretKeyHolder, true, 5000);

// Register with shuffle service
client.registerDriverWithShuffleService("shuffle-host", 7337, 120000, 30000);

Cluster Mode Application Submission

import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
import org.apache.spark.deploy.mesos.MesosDriverDescription

// Create cluster scheduler
val clusterScheduler = new MesosClusterScheduler()

// Configure driver description
val driverConf = new SparkConf()
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "2")
  .set("spark.mesos.coarse", "true")

val driverDescription = new MesosDriverDescription(
  jarUrl = "hdfs://namenode:9000/spark-apps/my-app.jar",
  mainClass = "com.example.MySparkApp",
  args = Array("--input", "/data/input", "--output", "/data/output"),
  conf = driverConf,
  supervise = true
)

// Submit driver to cluster
val submissionResponse = clusterScheduler.submitDriver(driverDescription)
println(s"Driver submitted with ID: ${submissionResponse.submissionId}")

// Monitor driver status
val statusResponse = clusterScheduler.getDriverStatus(submissionResponse.submissionId)
println(s"Driver status: ${statusResponse.driverState}")

Advanced Container Configuration

import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackendUtil

val conf = new SparkConf()
  .set("spark.mesos.executor.docker.image", "apache/spark:3.5.6-scala2.13")
  .set("spark.mesos.executor.docker.volumes", 
    "/data:/spark-data:ro,/logs:/spark-logs:rw")
  .set("spark.mesos.executor.docker.portmaps",
    "8080:8080:tcp,8081:8081:tcp")
  .set("spark.mesos.containerizer", "mesos")

// Parse volume specifications
val volumes = MesosSchedulerBackendUtil.parseVolumesSpec(
  Seq("/data:/spark-data:ro", "/logs:/spark-logs:rw"))

// Parse port mappings  
val portMappings = MesosSchedulerBackendUtil.parsePortMappingsSpec(
  Seq("8080:8080:tcp", "8081:8081:tcp"))

// Build container info
val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(conf)

Types

// Configuration type aliases
type ConfigKey = String
type ConfigValue = String
type MasterURL = String  // Format: "mesos://host:port"

// Label parsing
type LabelString = String  // Format: "key1:value1,key2:value2"

// Cluster mode types
type SubmissionId = String
type DriverState = String  // SUBMITTED, RUNNING, FINISHED, FAILED, KILLED
type FrameworkId = String

// Container and volume types
type VolumeSpec = String   // Format: "hostPath:containerPath:mode"
type PortMapSpec = String  // Format: "hostPort:containerPort:protocol"

// Response types for cluster operations
trait CreateSubmissionResponse {
  def submissionId: String
  def success: Boolean
}

trait KillSubmissionResponse {
  def submissionId: String
  def success: Boolean
}

trait SubmissionStatusResponse {
  def submissionId: String
  def driverState: String
  def success: Boolean
}

// Mesos protocol buffer types (from Apache Mesos)
import org.apache.mesos.Protos.{Volume, ContainerInfo, TaskState}
import org.apache.mesos.v1.Protos.{Variable, Labels}

Dependencies

  • Apache Mesos Java libraries (org.apache.mesos:mesos)
  • Apache Spark Core (org.apache.spark:spark-core_2.13)
  • Google Protobuf for Mesos protocol communication