or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cluster-deployment.mdcluster-management.mdcoarse-grained-scheduling.mdfine-grained-scheduling.mdindex.mdresource-configuration.md
tile.json

resource-configuration.mddocs/

Resource and Configuration Management

Utilities for Mesos resource negotiation, constraint matching, configuration management, and container orchestration.

Capabilities

MesosSchedulerUtils

Shared trait providing common utilities for Mesos scheduler implementations.

/**
 * Shared utilities for Mesos scheduler implementations
 */
trait MesosSchedulerUtils extends Logging {

  /**
   * Creates a new MesosSchedulerDriver for communicating with Mesos master
   * @param masterUrl URL to connect to Mesos master
   * @param scheduler Scheduler class to receive callbacks
   * @param sparkUser User to impersonate when running tasks  
   * @param appName Framework name to display on Mesos UI
   * @param conf Spark configuration
   * @param webuiUrl WebUI URL to link from Mesos UI
   * @param checkpoint Option to checkpoint tasks for failover
   * @param failoverTimeout Duration master expects scheduler to reconnect
   * @param frameworkId ID of the framework
   * @return SchedulerDriver instance
   * @throws SparkException if secret provided without principal
   */
  protected def createSchedulerDriver(
    masterUrl: String,
    scheduler: Scheduler,
    sparkUser: String,
    appName: String,
    conf: SparkConf,
    webuiUrl: Option[String] = None,
    checkpoint: Option[Boolean] = None,
    failoverTimeout: Option[Double] = None,
    frameworkId: Option[String] = None
  ): SchedulerDriver

  /**
   * Starts MesosSchedulerDriver and waits for registration
   * @param newDriver SchedulerDriver to start
   */
  def startScheduler(newDriver: SchedulerDriver): Unit

  /**
   * Gets scalar resource value from resource list
   * @param res List of Mesos resources
   * @param name Resource name to extract
   * @return Sum of all resources with the given name
   */
  def getResource(res: JList[Resource], name: String): Double

  /**
   * Gets range resource as list of (begin, end) tuples
   * @param res List of Mesos resources
   * @param name Resource name to extract
   * @return List of range tuples
   */
  protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)]

  /**
   * Creates a Mesos resource protobuf
   * @param name Resource name
   * @param amount Resource amount
   * @param role Resource role (optional)
   * @return Mesos Resource
   */
  def createResource(name: String, amount: Double, role: Option[String] = None): Resource

  /**
   * Partitions resources into remaining and requested portions
   * @param resources Available resources
   * @param resourceName Name of resource to partition
   * @param amountToUse Amount of resource to extract
   * @return Tuple of (remaining resources, used resources)
   */
  def partitionResources(
    resources: JList[Resource],
    resourceName: String,
    amountToUse: Double
  ): (List[Resource], List[Resource])

  /**
   * Signal that scheduler has registered with Mesos
   */
  protected def markRegistered(): Unit

  /**
   * Signal scheduler error occurred
   */  
  protected def markErr(): Unit
}

Resource Management

Methods for handling Mesos resource offers and requirements.

/**
 * Calculates executor memory requirements including overhead
 * @param sc SparkContext for configuration access
 * @return Total memory requirement in MB
 */
def executorMemory(sc: SparkContext): Int

/**
 * Sets up URIs for CommandInfo builder
 * @param uris Comma-separated URI string
 * @param builder CommandInfo builder to modify
 * @param useFetcherCache Whether to enable fetcher cache
 */
def setupUris(
  uris: String,
  builder: CommandInfo.Builder,
  useFetcherCache: Boolean = false
): Unit

/**
 * Checks if required ports are available in offered port ranges
 * @param conf Spark configuration
 * @param ports Available port ranges
 * @return true if all required ports are available
 */
protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): Boolean

/**
 * Partitions port resources from offers
 * @param requestedPorts Specific ports to allocate
 * @param offeredResources Available resources
 * @return Tuple of (remaining resources, port resources to use)
 */
def partitionPortResources(
  requestedPorts: List[Long], 
  offeredResources: List[Resource]
): (List[Resource], List[Resource])

/**
 * Gets non-zero port values from configuration
 * @param conf Spark configuration
 * @return List of configured port values
 */
def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long]

Constraint Matching

Methods for matching resource offers against placement constraints.

/**
 * Converts offer attributes to a map for constraint matching
 * @param offerAttributes List of Mesos attributes
 * @return Map from attribute name to attribute value
 */
protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage]

/**
 * Matches resource offer attributes against requirements
 * @param slaveOfferConstraints Map of required attributes
 * @param offerAttributes Map of offered attributes  
 * @return true if offer satisfies all constraints
 */
def matchesAttributeRequirements(
  slaveOfferConstraints: Map[String, Set[String]],
  offerAttributes: Map[String, GeneratedMessage]
): Boolean

/**
 * Parses constraint string into structured format
 * @param constraintsVal Constraint string (e.g., "os:centos7;zone:us-east-1a,us-east-1b")
 * @return Map of constraint name to set of acceptable values
 * @throws IllegalArgumentException if constraint string is malformed
 */
def parseConstraintString(constraintsVal: String): Map[String, Set[String]]

/**
 * Gets attribute key-value pairs from Mesos attribute
 * @param attr Mesos attribute protobuf
 * @return Tuple of (attribute name, set of values)
 */
protected def getAttribute(attr: Attribute): (String, Set[String])

State Conversion

Utilities for converting between Spark and Mesos task states.

/**
 * Converts Mesos task state to Spark task state
 * @param state Mesos task state
 * @return Corresponding Spark TaskState
 */
def mesosToTaskState(state: MesosTaskState): TaskState.TaskState

/**
 * Converts Spark task state to Mesos task state  
 * @param state Spark task state
 * @return Corresponding Mesos task state
 */
def taskStateToMesos(state: TaskState.TaskState): MesosTaskState

/**
 * Unsets framework ID from SparkContext configuration
 * @param sc SparkContext to modify
 */
def unsetFrameworkID(sc: SparkContext): Unit

Offer Management

Methods for declining and managing Mesos resource offers.

/**
 * Declines a Mesos resource offer with optional reason and duration
 * @param driver Scheduler driver
 * @param offer Offer to decline
 * @param reason Optional reason for declining
 * @param refuseSeconds Optional duration to refuse similar offers
 */
protected def declineOffer(
  driver: org.apache.mesos.SchedulerDriver,
  offer: Offer,
  reason: Option[String] = None,
  refuseSeconds: Option[Long] = None
): Unit

/**
 * Gets reject offer duration from configuration
 * @param conf Spark configuration
 * @return Reject duration in seconds
 */
protected def getRejectOfferDuration(conf: SparkConf): Long

/**
 * Gets reject duration for offers with unmet constraints
 * @param conf Spark configuration  
 * @return Reject duration in seconds
 */
protected def getRejectOfferDurationForUnmetConstraints(conf: SparkConf): Long

/**
 * Gets reject duration when max cores reached
 * @param conf Spark configuration
 * @return Reject duration in seconds  
 */
protected def getRejectOfferDurationForReachedMaxCores(conf: SparkConf): Long

MesosSchedulerBackendUtil

Container and Docker configuration utilities.

/**
 * Utility functions for Mesos scheduler backends
 */
object MesosSchedulerBackendUtil extends Logging {

  /**
   * Creates container info from Spark configuration
   * @param conf Spark configuration
   * @return ContainerInfo for Mesos task
   */
  def containerInfo(conf: SparkConf): ContainerInfo

  /**
   * Parses volume specification string
   * @param volumes Volume spec: "[host-dir:]container-dir[:rw|:ro](, ...)"
   * @return List of Mesos Volume objects
   */
  def parseVolumesSpec(volumes: String): List[Volume]

  /**
   * Parses port mapping specification string  
   * @param portmaps Port mapping spec: "host_port:container_port[:udp|:tcp](, ...)"
   * @return List of Docker port mapping objects
   */
  def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping]

  /**
   * Parses Docker parameter specification string
   * @param params Parameter spec: "key=value(, ...)"
   * @return List of Docker Parameter objects
   */
  private def parseParamsSpec(params: String): List[Parameter]
}

Usage Examples:

import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerUtils
import org.apache.spark.SparkConf

// Constraint matching example
val constraints = parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b;gpu:tesla")
// Results in: Map(
//   "os" -> Set("centos7"),
//   "zone" -> Set("us-east-1a", "us-east-1b"), 
//   "gpu" -> Set("tesla")
// )

// Resource partitioning example
val cpuResources = List(/* Mesos CPU resources */)
val (remainingCpus, usedCpus) = partitionResources(cpuResources.asJava, "cpus", 4.0)

// Container configuration example
val containerConf = new SparkConf()
  .set("spark.mesos.executor.docker.image", "my-spark:latest")
  .set("spark.mesos.executor.docker.volumes", "/host/data:/container/data:ro,/tmp:/tmp:rw")
  .set("spark.mesos.executor.docker.portmaps", "8080:8080:tcp,9090:9090:udp")
  .set("spark.mesos.executor.docker.parameters", "memory-swap=-1,oom-kill-disable=true")

val containerInfo = MesosSchedulerBackendUtil.containerInfo(containerConf)

// Executor memory calculation
val totalMemory = executorMemory(sparkContext)
// Includes base executor memory + overhead (max of 10% or 384MB)

// Port requirement checking
val requiredPorts = nonZeroPortValuesFromConfig(conf)  // Gets non-zero port configs
val portRanges = List((8000L, 9000L), (10000L, 11000L))
val portsAvailable = checkPorts(conf, portRanges)

Configuration Properties

Key configuration properties for resource and container management:

Resource Configuration

  • spark.mesos.executor.memoryOverhead - Executor memory overhead
  • spark.mesos.rejectOfferDuration - Default offer rejection duration
  • spark.mesos.rejectOfferDurationForUnmetConstraints - Rejection duration for constraint mismatches
  • spark.mesos.rejectOfferDurationForReachedMaxCores - Rejection duration when max cores reached

Constraint Configuration

  • spark.mesos.constraints - Slave attribute constraints
  • spark.mesos.role - Mesos framework role
  • spark.mesos.principal - Authentication principal
  • spark.mesos.secret - Authentication secret

Container Configuration

  • spark.mesos.executor.docker.image - Docker image for executors
  • spark.mesos.executor.docker.volumes - Volume mounts
  • spark.mesos.executor.docker.portmaps - Port mappings
  • spark.mesos.executor.docker.parameters - Docker parameters
  • spark.mesos.executor.docker.forcePullImage - Force image pull
  • spark.mesos.containerizer - Containerizer type ("docker" or "mesos")
  • spark.mesos.network.name - Network name for CNI

Port Configuration

  • spark.executor.port - Executor port
  • spark.blockManager.port - Block manager port

Error Handling

// Constraint parsing validation
try {
  val constraints = parseConstraintString(constraintsVal)
} catch {
  case NonFatal(e) =>
    throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e)
}

// Authentication validation
if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
  throw new SparkException(
    "spark.mesos.principal must be configured when spark.mesos.secret is set")
}

// Volume parsing error handling
volumes.split(",").map(_.split(":")).flatMap { spec =>
  spec match {
    case Array(container_path) => Some(vol.setContainerPath(container_path))
    case Array(host_path, container_path, mode) => /* ... */
    case spec =>
      logWarning(s"Unable to parse volume specs: $volumes. " +
        "Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"")
      None
  }
}

The resource and configuration management utilities provide comprehensive support for Mesos integration, including constraint matching, resource allocation, container orchestration, and configuration validation.