Utilities for Mesos resource negotiation, constraint matching, configuration management, and container orchestration.
Shared trait providing common utilities for Mesos scheduler implementations.
/**
* Shared utilities for Mesos scheduler implementations
*/
trait MesosSchedulerUtils extends Logging {
/**
* Creates a new MesosSchedulerDriver for communicating with Mesos master
* @param masterUrl URL to connect to Mesos master
* @param scheduler Scheduler class to receive callbacks
* @param sparkUser User to impersonate when running tasks
* @param appName Framework name to display on Mesos UI
* @param conf Spark configuration
* @param webuiUrl WebUI URL to link from Mesos UI
* @param checkpoint Option to checkpoint tasks for failover
* @param failoverTimeout Duration master expects scheduler to reconnect
* @param frameworkId ID of the framework
* @return SchedulerDriver instance
* @throws SparkException if secret provided without principal
*/
protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None
): SchedulerDriver
/**
* Starts MesosSchedulerDriver and waits for registration
* @param newDriver SchedulerDriver to start
*/
def startScheduler(newDriver: SchedulerDriver): Unit
/**
* Gets scalar resource value from resource list
* @param res List of Mesos resources
* @param name Resource name to extract
* @return Sum of all resources with the given name
*/
def getResource(res: JList[Resource], name: String): Double
/**
* Gets range resource as list of (begin, end) tuples
* @param res List of Mesos resources
* @param name Resource name to extract
* @return List of range tuples
*/
protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)]
/**
* Creates a Mesos resource protobuf
* @param name Resource name
* @param amount Resource amount
* @param role Resource role (optional)
* @return Mesos Resource
*/
def createResource(name: String, amount: Double, role: Option[String] = None): Resource
/**
* Partitions resources into remaining and requested portions
* @param resources Available resources
* @param resourceName Name of resource to partition
* @param amountToUse Amount of resource to extract
* @return Tuple of (remaining resources, used resources)
*/
def partitionResources(
resources: JList[Resource],
resourceName: String,
amountToUse: Double
): (List[Resource], List[Resource])
/**
* Signal that scheduler has registered with Mesos
*/
protected def markRegistered(): Unit
/**
* Signal scheduler error occurred
*/
protected def markErr(): Unit
}Methods for handling Mesos resource offers and requirements.
/**
* Calculates executor memory requirements including overhead
* @param sc SparkContext for configuration access
* @return Total memory requirement in MB
*/
def executorMemory(sc: SparkContext): Int
/**
* Sets up URIs for CommandInfo builder
* @param uris Comma-separated URI string
* @param builder CommandInfo builder to modify
* @param useFetcherCache Whether to enable fetcher cache
*/
def setupUris(
uris: String,
builder: CommandInfo.Builder,
useFetcherCache: Boolean = false
): Unit
/**
* Checks if required ports are available in offered port ranges
* @param conf Spark configuration
* @param ports Available port ranges
* @return true if all required ports are available
*/
protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): Boolean
/**
* Partitions port resources from offers
* @param requestedPorts Specific ports to allocate
* @param offeredResources Available resources
* @return Tuple of (remaining resources, port resources to use)
*/
def partitionPortResources(
requestedPorts: List[Long],
offeredResources: List[Resource]
): (List[Resource], List[Resource])
/**
* Gets non-zero port values from configuration
* @param conf Spark configuration
* @return List of configured port values
*/
def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long]Methods for matching resource offers against placement constraints.
/**
* Converts offer attributes to a map for constraint matching
* @param offerAttributes List of Mesos attributes
* @return Map from attribute name to attribute value
*/
protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage]
/**
* Matches resource offer attributes against requirements
* @param slaveOfferConstraints Map of required attributes
* @param offerAttributes Map of offered attributes
* @return true if offer satisfies all constraints
*/
def matchesAttributeRequirements(
slaveOfferConstraints: Map[String, Set[String]],
offerAttributes: Map[String, GeneratedMessage]
): Boolean
/**
* Parses constraint string into structured format
* @param constraintsVal Constraint string (e.g., "os:centos7;zone:us-east-1a,us-east-1b")
* @return Map of constraint name to set of acceptable values
* @throws IllegalArgumentException if constraint string is malformed
*/
def parseConstraintString(constraintsVal: String): Map[String, Set[String]]
/**
* Gets attribute key-value pairs from Mesos attribute
* @param attr Mesos attribute protobuf
* @return Tuple of (attribute name, set of values)
*/
protected def getAttribute(attr: Attribute): (String, Set[String])Utilities for converting between Spark and Mesos task states.
/**
* Converts Mesos task state to Spark task state
* @param state Mesos task state
* @return Corresponding Spark TaskState
*/
def mesosToTaskState(state: MesosTaskState): TaskState.TaskState
/**
* Converts Spark task state to Mesos task state
* @param state Spark task state
* @return Corresponding Mesos task state
*/
def taskStateToMesos(state: TaskState.TaskState): MesosTaskState
/**
* Unsets framework ID from SparkContext configuration
* @param sc SparkContext to modify
*/
def unsetFrameworkID(sc: SparkContext): UnitMethods for declining and managing Mesos resource offers.
/**
* Declines a Mesos resource offer with optional reason and duration
* @param driver Scheduler driver
* @param offer Offer to decline
* @param reason Optional reason for declining
* @param refuseSeconds Optional duration to refuse similar offers
*/
protected def declineOffer(
driver: org.apache.mesos.SchedulerDriver,
offer: Offer,
reason: Option[String] = None,
refuseSeconds: Option[Long] = None
): Unit
/**
* Gets reject offer duration from configuration
* @param conf Spark configuration
* @return Reject duration in seconds
*/
protected def getRejectOfferDuration(conf: SparkConf): Long
/**
* Gets reject duration for offers with unmet constraints
* @param conf Spark configuration
* @return Reject duration in seconds
*/
protected def getRejectOfferDurationForUnmetConstraints(conf: SparkConf): Long
/**
* Gets reject duration when max cores reached
* @param conf Spark configuration
* @return Reject duration in seconds
*/
protected def getRejectOfferDurationForReachedMaxCores(conf: SparkConf): LongContainer and Docker configuration utilities.
/**
* Utility functions for Mesos scheduler backends
*/
object MesosSchedulerBackendUtil extends Logging {
/**
* Creates container info from Spark configuration
* @param conf Spark configuration
* @return ContainerInfo for Mesos task
*/
def containerInfo(conf: SparkConf): ContainerInfo
/**
* Parses volume specification string
* @param volumes Volume spec: "[host-dir:]container-dir[:rw|:ro](, ...)"
* @return List of Mesos Volume objects
*/
def parseVolumesSpec(volumes: String): List[Volume]
/**
* Parses port mapping specification string
* @param portmaps Port mapping spec: "host_port:container_port[:udp|:tcp](, ...)"
* @return List of Docker port mapping objects
*/
def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping]
/**
* Parses Docker parameter specification string
* @param params Parameter spec: "key=value(, ...)"
* @return List of Docker Parameter objects
*/
private def parseParamsSpec(params: String): List[Parameter]
}Usage Examples:
import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerUtils
import org.apache.spark.SparkConf
// Constraint matching example
val constraints = parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b;gpu:tesla")
// Results in: Map(
// "os" -> Set("centos7"),
// "zone" -> Set("us-east-1a", "us-east-1b"),
// "gpu" -> Set("tesla")
// )
// Resource partitioning example
val cpuResources = List(/* Mesos CPU resources */)
val (remainingCpus, usedCpus) = partitionResources(cpuResources.asJava, "cpus", 4.0)
// Container configuration example
val containerConf = new SparkConf()
.set("spark.mesos.executor.docker.image", "my-spark:latest")
.set("spark.mesos.executor.docker.volumes", "/host/data:/container/data:ro,/tmp:/tmp:rw")
.set("spark.mesos.executor.docker.portmaps", "8080:8080:tcp,9090:9090:udp")
.set("spark.mesos.executor.docker.parameters", "memory-swap=-1,oom-kill-disable=true")
val containerInfo = MesosSchedulerBackendUtil.containerInfo(containerConf)
// Executor memory calculation
val totalMemory = executorMemory(sparkContext)
// Includes base executor memory + overhead (max of 10% or 384MB)
// Port requirement checking
val requiredPorts = nonZeroPortValuesFromConfig(conf) // Gets non-zero port configs
val portRanges = List((8000L, 9000L), (10000L, 11000L))
val portsAvailable = checkPorts(conf, portRanges)Key configuration properties for resource and container management:
spark.mesos.executor.memoryOverhead - Executor memory overheadspark.mesos.rejectOfferDuration - Default offer rejection durationspark.mesos.rejectOfferDurationForUnmetConstraints - Rejection duration for constraint mismatchesspark.mesos.rejectOfferDurationForReachedMaxCores - Rejection duration when max cores reachedspark.mesos.constraints - Slave attribute constraintsspark.mesos.role - Mesos framework rolespark.mesos.principal - Authentication principalspark.mesos.secret - Authentication secretspark.mesos.executor.docker.image - Docker image for executorsspark.mesos.executor.docker.volumes - Volume mountsspark.mesos.executor.docker.portmaps - Port mappingsspark.mesos.executor.docker.parameters - Docker parametersspark.mesos.executor.docker.forcePullImage - Force image pullspark.mesos.containerizer - Containerizer type ("docker" or "mesos")spark.mesos.network.name - Network name for CNIspark.executor.port - Executor portspark.blockManager.port - Block manager port// Constraint parsing validation
try {
val constraints = parseConstraintString(constraintsVal)
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e)
}
// Authentication validation
if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
throw new SparkException(
"spark.mesos.principal must be configured when spark.mesos.secret is set")
}
// Volume parsing error handling
volumes.split(",").map(_.split(":")).flatMap { spec =>
spec match {
case Array(container_path) => Some(vol.setContainerPath(container_path))
case Array(host_path, container_path, mode) => /* ... */
case spec =>
logWarning(s"Unable to parse volume specs: $volumes. " +
"Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"")
None
}
}The resource and configuration management utilities provide comprehensive support for Mesos integration, including constraint matching, resource allocation, container orchestration, and configuration validation.