CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-mesos-2-13

Apache Mesos cluster manager integration for Apache Spark enabling distributed computation on Mesos clusters.

Pending
Overview
Eval results
Files

Spark Mesos

Spark Mesos provides Apache Mesos cluster manager integration for Apache Spark, enabling Spark applications to run on Mesos clusters with both coarse-grained and fine-grained scheduling modes. It supports dynamic resource allocation, fault tolerance, and provides utilities for Mesos-specific configuration and monitoring.

Package Information

  • Package Name: spark-mesos_2.13
  • Package Type: maven
  • Language: Scala
  • Group ID: org.apache.spark
  • Installation: Add to your Maven dependencies or include in Spark's classpath

Core Imports

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.cluster.mesos.MesosProtoUtils
import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackendUtil
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient
import org.apache.spark.deploy.mesos.config._

For Java applications:

import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.executor.MesosExecutorBackend;

Basic Usage

import org.apache.spark.{SparkConf, SparkContext}

// Configure Spark to use Mesos
val conf = new SparkConf()
  .setAppName("MySparkApp")
  .setMaster("mesos://mesos-master:5050")  // Connect to Mesos master
  .set("spark.mesos.coarse", "true")       // Use coarse-grained mode
  .set("spark.cores.max", "4")             // Limit maximum cores
  .set("spark.mesos.executor.docker.image", "spark-executor:latest")

val sc = new SparkContext(conf)

// Your Spark application logic here
val data = sc.parallelize(1 to 100)
val result = data.map(_ * 2).collect()

sc.stop()

Architecture

Spark Mesos integration consists of several key components:

  • Cluster Manager: MesosClusterManager automatically selected when using mesos:// URLs
  • Scheduler Backends: Coarse-grained and fine-grained backends for different resource sharing modes
  • Cluster Mode Components: MesosClusterScheduler and dispatcher for cluster mode deployments
  • Scheduler Utilities: MesosSchedulerBackendUtil for container and volume management
  • Configuration System: Extensive configuration options for Mesos-specific settings
  • Security Management: MesosSecretConfig for handling secrets and authentication
  • External Shuffle Service: Optional shuffle service integration for coarse-grained mode
  • Protocol Utilities: Helper functions for working with Mesos protocol buffers

Capabilities

Cluster Configuration

Configure Spark to connect to and run on Mesos clusters using master URL and configuration options.

// Master URL format
val master: String = "mesos://host:port"

// Key configuration options
val conf = new SparkConf()
  .setMaster(master)
  .set("spark.mesos.coarse", "true")  // Enable coarse-grained mode

Execution Modes

Choose between coarse-grained and fine-grained execution modes depending on your resource sharing requirements.

Coarse-grained Mode (default):

  • Spark acquires long-lived Mesos tasks on each machine
  • Lower latency, predictable resource allocation
  • Better for batch processing and long-running applications

Fine-grained Mode:

  • Each Spark task maps to a separate Mesos task
  • Dynamic resource sharing between applications
  • Better for resource utilization in multi-tenant environments
// Coarse-grained mode (default)
conf.set("spark.mesos.coarse", "true")

// Fine-grained mode  
conf.set("spark.mesos.coarse", "false")

Resource Management

Control resource allocation and constraints for Spark executors running on Mesos.

// Core and memory configuration
conf.set("spark.cores.max", "8")                    // Maximum cores across all executors
conf.set("spark.executor.cores", "2")               // Cores per executor
conf.set("spark.executor.memory", "2g")             // Memory per executor
conf.set("spark.mesos.executor.memoryOverhead", "512") // Additional memory overhead

// GPU support
conf.set("spark.mesos.gpus.max", "2")               // Maximum GPU resources

// Resource constraints
conf.set("spark.mesos.constraints", "os:centos")    // Attribute-based constraints
conf.set("spark.mesos.role", "spark-framework")     // Mesos framework role

Docker Integration

Run Spark executors in Docker containers with full configuration support.

// Docker executor configuration
conf.set("spark.mesos.executor.docker.image", "spark:3.5.6")
conf.set("spark.mesos.executor.docker.forcePullImage", "true")
conf.set("spark.mesos.containerizer", "mesos")  // Use Mesos containerizer

// Volume mounts
conf.set("spark.mesos.executor.docker.volumes", 
  "/host/data:/container/data:ro,/host/logs:/container/logs:rw")

// Docker parameters
conf.set("spark.mesos.executor.docker.parameters", 
  "memory-swap=-1,ulimit=nofile=65536:65536")

Authentication and Security

Configure authentication credentials for secure Mesos clusters.

// Principal and secret authentication
conf.set("spark.mesos.principal", "spark-framework")
conf.set("spark.mesos.secret", "secret-value")

// File-based authentication
conf.set("spark.mesos.principal.file", "/path/to/principal.txt")
conf.set("spark.mesos.secret.file", "/path/to/secret.txt")

// Driver secrets
conf.set("spark.mesos.driver.secret.names", "secret1,secret2")
conf.set("spark.mesos.driver.secret.values", "value1,value2")
conf.set("spark.mesos.driver.secret.envkeys", "SECRET1_ENV,SECRET2_ENV")

Network Configuration

Configure networking options for containerized and multi-tenant environments.

// Named network attachment
conf.set("spark.mesos.network.name", "spark-network")
conf.set("spark.mesos.network.labels", "env:production,team:data")

// Service URLs
conf.set("spark.mesos.driver.webui.url", "http://driver-host:4040")
conf.set("spark.mesos.dispatcher.webui.url", "http://dispatcher:8080")

Task Management and Labeling

Add metadata and labels to Mesos tasks for monitoring and organization.

// Task labels for monitoring
conf.set("spark.mesos.task.labels", "app:spark,env:prod,team:analytics")
conf.set("spark.mesos.driver.labels", "type:driver,priority:high")

// Task constraints and placement
conf.set("spark.mesos.driver.constraints", "zone:us-west-1")
conf.set("spark.mesos.rejectOfferDuration", "120s")

External Shuffle Service

Configure external shuffle service for improved performance in coarse-grained mode.

/**
 * Client for communicating with external shuffle service in Mesos coarse-grained mode
 */
public class MesosExternalBlockStoreClient extends ExternalBlockStoreClient {
  
  /**
   * Creates a Mesos external shuffle client
   * @param conf Transport configuration
   * @param secretKeyHolder Secret key holder for authentication
   * @param authEnabled Whether authentication is enabled
   * @param registrationTimeoutMs Timeout for registration in milliseconds
   */
  public MesosExternalBlockStoreClient(
      TransportConf conf,
      SecretKeyHolder secretKeyHolder,
      boolean authEnabled,
      long registrationTimeoutMs);

  /**
   * Register driver with the shuffle service
   * @param host Shuffle service host
   * @param port Shuffle service port  
   * @param heartbeatTimeoutMs Heartbeat timeout in milliseconds
   * @param heartbeatIntervalMs Heartbeat interval in milliseconds
   */
  public void registerDriverWithShuffleService(
      String host,
      int port,
      long heartbeatTimeoutMs,
      long heartbeatIntervalMs) throws IOException, InterruptedException;
}

Protocol Utilities

Utility functions for working with Mesos protocol buffers and labels.

/**
 * Utilities for working with Mesos protocol buffers
 */
object MesosProtoUtils {
  /**
   * Parses a label string into Mesos Labels protobuf
   * @param labelsStr Label string in format "key1:value1,key2:value2"
   * @return Mesos Labels.Builder for constructing protobuf messages
   */
  def mesosLabels(labelsStr: String): org.apache.mesos.Protos.Labels.Builder
}

Scheduler Backend Utilities

Utilities for container and volume management in Mesos environments.

/**
 * Utility object providing helper methods for Mesos scheduler backends
 */
object MesosSchedulerBackendUtil {
  /**
   * Parse volume specifications for container mounts
   * @param volumes Sequence of volume specifications in format "hostPath:containerPath:mode"
   * @return List of Mesos Volume objects
   */
  def parseVolumesSpec(volumes: Seq[String]): List[Volume]
  
  /**
   * Parse port mapping specifications for Docker containers
   * @param portmaps Sequence of port mapping specifications in format "hostPort:containerPort:protocol"
   * @return List of DockerInfo.PortMapping objects
   */
  def parsePortMappingsSpec(portmaps: Seq[String]): List[DockerInfo.PortMapping]
  
  /**
   * Build container information from Spark configuration
   * @param conf Spark configuration containing container settings
   * @return ContainerInfo.Builder for Mesos container setup
   */
  def buildContainerInfo(conf: SparkConf): ContainerInfo.Builder
  
  /**
   * Convert Spark task state to Mesos task state
   * @param state Spark TaskState
   * @return Corresponding MesosTaskState
   */
  def taskStateToMesos(state: TaskState): MesosTaskState
  
  /**
   * Extract secret environment variables from configuration
   * @param conf Spark configuration
   * @param secretConfig Secret configuration for driver or executor
   * @return Sequence of environment variables for secrets
   */
  def getSecretEnvVar(conf: SparkConf, secretConfig: MesosSecretConfig): Seq[Variable]
  
  /**
   * Extract secret volume from configuration
   * @param conf Spark configuration
   * @param secretConfig Secret configuration for driver or executor
   * @return Optional volume for secret mounting
   */
  def getSecretVolume(conf: SparkConf, secretConfig: MesosSecretConfig): Option[Volume]
}

Cluster Mode Management

Components for managing Spark applications in Mesos cluster mode with dispatcher.

/**
 * Scheduler for managing driver lifecycles in Mesos cluster mode
 */
class MesosClusterScheduler extends MesosScheduler {
  /**
   * Submit a new driver to the Mesos cluster
   * @param description Driver description containing application details
   * @return Submission response with driver ID and status
   */
  def submitDriver(description: MesosDriverDescription): CreateSubmissionResponse
  
  /**
   * Kill a running driver
   * @param submissionId Driver submission ID
   * @return Kill response with success status
   */
  def killDriver(submissionId: String): KillSubmissionResponse
  
  /**
   * Get status of a driver
   * @param submissionId Driver submission ID
   * @return Driver status response with current state
   */
  def getDriverStatus(submissionId: String): SubmissionStatusResponse
}

/**
 * Description of a driver to be submitted to Mesos cluster
 */
class MesosDriverDescription(
  jarUrl: String,
  mainClass: String, 
  args: Array[String],
  conf: SparkConf,
  supervise: Boolean = false) {
  
  def appName: String
  def sparkProperties: Map[String, String]
  def environmentVariables: Map[String, String]
}

/**
 * Configuration helper for managing secrets in Mesos environments
 */
class MesosSecretConfig(taskType: String) {
  /**
   * Get comma-separated secret names for the specified task type
   * @return Secret names configuration value
   */
  def secretNames: String
  
  /**
   * Get comma-separated secret values for the specified task type
   * @return Secret values configuration value
   */
  def secretValues: String
  
  /**
   * Get comma-separated environment variable keys for secrets
   * @return Environment variable keys configuration value
   */
  def secretEnvKeys: String
  
  /**
   * Get comma-separated secret filenames for file-based secrets
   * @return Secret filenames configuration value
   */
  def secretFilenames: String
}

Cluster Mode and Dispatcher

Configuration for running Spark applications in Mesos cluster mode with dispatcher.

// Cluster mode configuration
conf.set("spark.mesos.maxDrivers", "100")           // Maximum concurrent drivers
conf.set("spark.mesos.retainedDrivers", "50")       // Number of retained drivers
conf.set("spark.mesos.dispatcher.queue", "default") // Dispatcher queue name

// Failover configuration
conf.set("spark.mesos.driver.failoverTimeout", "600.0") // Driver failover timeout in seconds
conf.set("spark.mesos.cluster.retry.wait.max", "60")    // Maximum retry wait time

Configuration Reference

Core Configuration Options

// Execution mode
"spark.mesos.coarse" -> "true"  // Boolean: Use coarse-grained mode (default: true)

// Resource allocation
"spark.cores.max" -> "8"                              // String: Maximum cores across all executors
"spark.mesos.mesosExecutor.cores" -> "1.0"            // String: Cores per Mesos executor (fine-grained)
"spark.mesos.extra.cores" -> "0"                      // String: Extra cores to advertise per executor
"spark.mesos.executor.memoryOverhead" -> "384"        // String: Additional memory per executor (MiB)
"spark.mesos.gpus.max" -> "0"                         // String: Maximum GPU resources

// Constraints and placement
"spark.mesos.constraints" -> ""                       // String: Attribute-based constraints
"spark.mesos.driver.constraints" -> ""                // String: Driver placement constraints
"spark.mesos.role" -> ""                              // String: Mesos framework role

// Docker configuration
"spark.mesos.executor.docker.image" -> ""             // String: Docker image for executors
"spark.mesos.executor.docker.forcePullImage" -> ""    // String: Force pull Docker image
"spark.mesos.executor.docker.volumes" -> ""           // String: Volume mounts (comma-separated)
"spark.mesos.executor.docker.portmaps" -> ""          // String: Port mappings (comma-separated)
"spark.mesos.executor.docker.parameters" -> ""        // String: Docker run parameters
"spark.mesos.containerizer" -> "docker"               // String: Containerizer type ("docker" or "mesos")

Authentication Configuration

// Principal and secret
"spark.mesos.principal" -> ""                    // String: Kerberos principal name
"spark.mesos.principal.file" -> ""               // String: Path to principal file
"spark.mesos.secret" -> ""                       // String: Authentication secret
"spark.mesos.secret.file" -> ""                  // String: Path to secret file

// Driver secrets
"spark.mesos.driver.secret.names" -> ""          // String: Comma-separated secret names
"spark.mesos.driver.secret.values" -> ""         // String: Comma-separated secret values
"spark.mesos.driver.secret.envkeys" -> ""        // String: Environment variable names
"spark.mesos.driver.secret.filenames" -> ""      // String: Secret file paths

// Executor secrets (same pattern as driver)
"spark.mesos.executor.secret.names" -> ""
"spark.mesos.executor.secret.values" -> ""
"spark.mesos.executor.secret.envkeys" -> ""
"spark.mesos.executor.secret.filenames" -> ""

Network and Service Configuration

// Networking
"spark.mesos.network.name" -> ""                 // String: Named network for containers
"spark.mesos.network.labels" -> ""               // String: Network labels for CNI

// Web UI URLs
"spark.mesos.driver.webui.url" -> ""             // String: Driver web UI URL
"spark.mesos.dispatcher.webui.url" -> ""         // String: Dispatcher web UI URL
"spark.mesos.proxy.baseURL" -> ""                // String: Proxy base URL

// History server
"spark.mesos.dispatcher.historyServer.url" -> "" // String: History server URL

Advanced Configuration

// Offer management
"spark.mesos.rejectOfferDuration" -> "120s"                          // String: Default reject duration
"spark.mesos.rejectOfferDurationForUnmetConstraints" -> ""           // String: Reject duration for unmet constraints
"spark.mesos.rejectOfferDurationForReachedMaxCores" -> ""            // String: Reject duration when max cores reached

// Task and executor configuration
"spark.mesos.task.labels" -> ""                  // String: Labels for tasks
"spark.mesos.driver.labels" -> ""                // String: Labels for driver
"spark.mesos.uris" -> ""                         // String: Comma-separated URIs to download
"spark.executor.uri" -> ""                       // String: Executor URI
"spark.mesos.executor.home" -> ""                // String: Spark home directory on executors

// Fetcher and caching
"spark.mesos.fetcherCache.enable" -> "false"    // String: Enable Mesos fetcher cache
"spark.mesos.appJar.local.resolution.mode" -> "host" // String: Local JAR resolution mode

// Cluster mode
"spark.mesos.maxDrivers" -> "200"                // String: Maximum concurrent drivers
"spark.mesos.retainedDrivers" -> "200"           // String: Number of retained completed drivers
"spark.mesos.driver.failoverTimeout" -> "0.0"   // String: Driver failover timeout (seconds)

Error Handling

Common exceptions and error scenarios:

  • SparkException: Thrown for malformed configuration or connection issues
  • IOException: Network-related errors when connecting to Mesos master or shuffle service
  • IllegalArgumentException: Invalid configuration values or parameters

Usage Examples

Basic Mesos Application

import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf()
  .setAppName("Mesos Example")
  .setMaster("mesos://mesos-master:5050")
  .set("spark.mesos.coarse", "true")
  .set("spark.cores.max", "4")

val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 1000)
val sum = rdd.reduce(_ + _)
println(s"Sum: $sum")
sc.stop()

Docker-based Execution

val conf = new SparkConf()
  .setAppName("Dockerized Spark")
  .setMaster("mesos://mesos-master:5050")
  .set("spark.mesos.executor.docker.image", "apache/spark:3.5.6")
  .set("spark.mesos.executor.docker.volumes", "/data:/spark-data:ro")
  .set("spark.mesos.containerizer", "mesos")
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "2")

val sc = new SparkContext(conf)
// Application logic here
sc.stop()

External Shuffle Service Setup

import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.sasl.SecretKeyHolder;

// Create and configure client
TransportConf transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle");
SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {
    @Override
    public String getSaslUser(String appId) { return "spark"; }
    @Override  
    public String getSecretKey(String appId) { return "secret"; }
};

MesosExternalBlockStoreClient client = new MesosExternalBlockStoreClient(
    transportConf, secretKeyHolder, true, 5000);

// Register with shuffle service
client.registerDriverWithShuffleService("shuffle-host", 7337, 120000, 30000);

Cluster Mode Application Submission

import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
import org.apache.spark.deploy.mesos.MesosDriverDescription

// Create cluster scheduler
val clusterScheduler = new MesosClusterScheduler()

// Configure driver description
val driverConf = new SparkConf()
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "2")
  .set("spark.mesos.coarse", "true")

val driverDescription = new MesosDriverDescription(
  jarUrl = "hdfs://namenode:9000/spark-apps/my-app.jar",
  mainClass = "com.example.MySparkApp",
  args = Array("--input", "/data/input", "--output", "/data/output"),
  conf = driverConf,
  supervise = true
)

// Submit driver to cluster
val submissionResponse = clusterScheduler.submitDriver(driverDescription)
println(s"Driver submitted with ID: ${submissionResponse.submissionId}")

// Monitor driver status
val statusResponse = clusterScheduler.getDriverStatus(submissionResponse.submissionId)
println(s"Driver status: ${statusResponse.driverState}")

Advanced Container Configuration

import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackendUtil

val conf = new SparkConf()
  .set("spark.mesos.executor.docker.image", "apache/spark:3.5.6-scala2.13")
  .set("spark.mesos.executor.docker.volumes", 
    "/data:/spark-data:ro,/logs:/spark-logs:rw")
  .set("spark.mesos.executor.docker.portmaps",
    "8080:8080:tcp,8081:8081:tcp")
  .set("spark.mesos.containerizer", "mesos")

// Parse volume specifications
val volumes = MesosSchedulerBackendUtil.parseVolumesSpec(
  Seq("/data:/spark-data:ro", "/logs:/spark-logs:rw"))

// Parse port mappings  
val portMappings = MesosSchedulerBackendUtil.parsePortMappingsSpec(
  Seq("8080:8080:tcp", "8081:8081:tcp"))

// Build container info
val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(conf)

Types

// Configuration type aliases
type ConfigKey = String
type ConfigValue = String
type MasterURL = String  // Format: "mesos://host:port"

// Label parsing
type LabelString = String  // Format: "key1:value1,key2:value2"

// Cluster mode types
type SubmissionId = String
type DriverState = String  // SUBMITTED, RUNNING, FINISHED, FAILED, KILLED
type FrameworkId = String

// Container and volume types
type VolumeSpec = String   // Format: "hostPath:containerPath:mode"
type PortMapSpec = String  // Format: "hostPort:containerPort:protocol"

// Response types for cluster operations
trait CreateSubmissionResponse {
  def submissionId: String
  def success: Boolean
}

trait KillSubmissionResponse {
  def submissionId: String
  def success: Boolean
}

trait SubmissionStatusResponse {
  def submissionId: String
  def driverState: String
  def success: Boolean
}

// Mesos protocol buffer types (from Apache Mesos)
import org.apache.mesos.Protos.{Volume, ContainerInfo, TaskState}
import org.apache.mesos.v1.Protos.{Variable, Labels}

Dependencies

  • Apache Mesos Java libraries (org.apache.mesos:mesos)
  • Apache Spark Core (org.apache.spark:spark-core_2.13)
  • Google Protobuf for Mesos protocol communication

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-mesos-2-13
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-mesos_2.13@3.5.x
Badge
tessl/maven-org-apache-spark--spark-mesos-2-13 badge