Apache Spark Kubernetes resource manager that enables running Spark applications on Kubernetes clusters
The utilities layer provides essential helper functions, client management, and common operations that support the Kubernetes integration throughout the Apache Spark ecosystem. These utilities handle everything from configuration parsing to client creation and volume management.
Comprehensive collection of Kubernetes-specific utility functions:
object KubernetesUtils {
def parsePrefixedKeyValuePairs(
sparkConf: SparkConf,
prefix: String
): Map[String, String]
def loadPodFromTemplate(
kubernetesClient: KubernetesClient,
templateFile: File,
containerName: Option[String]
): SparkPod
def selectSparkContainer(
pod: Pod,
containerName: Option[String]
): SparkPod
def requireBothOrNeitherDefined[T](
opt1: Option[T],
opt2: Option[T],
errMessage: String
): Unit
}// Parse configuration properties with a common prefix
def parsePrefixedKeyValuePairs(
sparkConf: SparkConf,
prefix: String
): Map[String, String] = {
sparkConf.getAllWithPrefix(prefix).filter { case (key, value) =>
key.nonEmpty && value.nonEmpty
}.toMap
}
// Usage examples
val driverLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
"spark.kubernetes.driver.label."
)
// Input: spark.kubernetes.driver.label.app=myapp, spark.kubernetes.driver.label.version=1.0
// Output: Map("app" -> "myapp", "version" -> "1.0")
val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
"spark.kubernetes.executor.annotation."
)
// Input: spark.kubernetes.executor.annotation.prometheus.io/scrape=true
// Output: Map("prometheus.io/scrape" -> "true")// Require both options to be defined or both to be None
def requireBothOrNeitherDefined[T](
opt1: Option[T],
opt2: Option[T],
errMessage: String
): Unit = {
(opt1, opt2) match {
case (Some(_), None) | (None, Some(_)) =>
throw new IllegalArgumentException(errMessage)
case _ => // Both defined or both None - valid
}
}
// Usage
KubernetesUtils.requireBothOrNeitherDefined(
conf.get(CLIENT_KEY_FILE),
conf.get(CLIENT_CERT_FILE),
"Both client key file and client cert file must be specified for mutual TLS"
)def generateResourceName(
appName: String,
appId: String,
resourceType: String
): String = {
val sanitizedAppName = appName
.toLowerCase
.replaceAll("[^a-z0-9-]", "-")
.replaceAll("-+", "-")
.take(30)
val shortAppId = appId.take(8)
s"$sanitizedAppName-$shortAppId-$resourceType"
}
// Usage
val driverPodName = KubernetesUtils.generateResourceName(
"My Spark App",
"app-123456789",
"driver"
)
// Result: "my-spark-app-app-1234-driver"def loadPodFromTemplate(
kubernetesClient: KubernetesClient,
templateFile: File,
containerName: Option[String]
): SparkPod = {
require(templateFile.exists(), s"Pod template file does not exist: ${templateFile.getPath}")
val podTemplate = try {
kubernetesClient.pods()
.load(new FileInputStream(templateFile))
.get()
} catch {
case e: Exception =>
throw new RuntimeException(s"Failed to load pod template from ${templateFile.getPath}", e)
}
selectSparkContainer(podTemplate, containerName)
}
// Usage
val templatePod = KubernetesUtils.loadPodFromTemplate(
kubernetesClient,
new File("/templates/driver-pod.yaml"),
Some(Constants.DRIVER_CONTAINER_NAME)
)def selectSparkContainer(
pod: Pod,
containerName: Option[String]
): SparkPod = {
val containers = pod.getSpec.getContainers.asScala
val selectedContainer = containerName match {
case Some(name) =>
containers.find(_.getName == name).getOrElse {
throw new RuntimeException(s"Container '$name' not found in pod template")
}
case None =>
if (containers.size == 1) {
containers.head
} else {
throw new RuntimeException(
s"Pod template contains ${containers.size} containers. " +
"Must specify containerName when template has multiple containers."
)
}
}
SparkPod(pod, selectedContainer)
}Factory for creating configured Kubernetes client instances:
object SparkKubernetesClientFactory {
def createKubernetesClient(
clientType: ClientType,
kubernetesConf: Option[KubernetesConf] = None,
sparkConf: Option[SparkConf] = None,
defaultServiceAccountToken: Option[String] = None,
clientContext: Option[String] = None
): KubernetesClient
sealed trait ClientType
object ClientType {
case object Driver extends ClientType
case object Executor extends ClientType
case object Submission extends ClientType
}
}// Driver client for executor management
val driverClient = SparkKubernetesClientFactory.createKubernetesClient(
ClientType.Driver,
kubernetesConf = Some(driverConf)
)
// Submission client for application deployment
val submissionClient = SparkKubernetesClientFactory.createKubernetesClient(
ClientType.Submission,
sparkConf = Some(conf),
clientContext = Some("my-cluster-context")
)
// Executor client for pod operations
val executorClient = SparkKubernetesClientFactory.createKubernetesClient(
ClientType.Executor,
kubernetesConf = Some(executorConf),
defaultServiceAccountToken = Some(serviceAccountToken)
)def createKubernetesClient(
clientType: ClientType,
kubernetesConf: Option[KubernetesConf] = None,
sparkConf: Option[SparkConf] = None,
defaultServiceAccountToken: Option[String] = None,
clientContext: Option[String] = None
): KubernetesClient = {
val config = new ConfigBuilder()
.withApiVersion("v1")
// Configure API server URL
sparkConf.flatMap(_.getOption("spark.kubernetes.apiserver.host")).foreach { host =>
config.withMasterUrl(host)
}
// Configure authentication
configureAuthentication(config, sparkConf, defaultServiceAccountToken)
// Configure TLS
configureTls(config, sparkConf)
// Configure context
clientContext.foreach(config.withCurrentContext)
DefaultKubernetesClient.fromConfig(config.build())
}
private def configureAuthentication(
config: ConfigBuilder,
sparkConf: Option[SparkConf],
defaultServiceAccountToken: Option[String]
): Unit = {
sparkConf.foreach { conf =>
// OAuth token authentication
conf.getOption(OAUTH_TOKEN.key).orElse {
conf.getOption(OAUTH_TOKEN_FILE.key).map { tokenFile =>
Files.readAllLines(Paths.get(tokenFile)).asScala.mkString
}
}.foreach(config.withOauthToken)
// Username/password authentication
conf.getOption("spark.kubernetes.authenticate.username").foreach { username =>
config.withUsername(username)
conf.getOption("spark.kubernetes.authenticate.password").foreach { password =>
config.withPassword(password)
}
}
// Service account token
conf.getOption(KUBERNETES_SERVICE_ACCOUNT_NAME.key).orElse {
defaultServiceAccountToken
}.foreach(config.withOauthToken)
}
}
private def configureTls(
config: ConfigBuilder,
sparkConf: Option[SparkConf]
): Unit = {
sparkConf.foreach { conf =>
// CA certificate
conf.getOption(CA_CERT_FILE.key).foreach { caCertFile =>
config.withCaCertFile(caCertFile)
}
// Client certificates for mutual TLS
val clientCertFile = conf.getOption(CLIENT_CERT_FILE.key)
val clientKeyFile = conf.getOption(CLIENT_KEY_FILE.key)
(clientCertFile, clientKeyFile) match {
case (Some(certFile), Some(keyFile)) =>
config.withClientCertFile(certFile)
config.withClientKeyFile(keyFile)
case _ => // No client certificates
}
// Trust all certificates (for development only)
if (conf.getBoolean("spark.kubernetes.apiserver.trustCerts", false)) {
config.withTrustCerts(true)
}
}
}Utilities for parsing and handling Kubernetes volume configurations:
object KubernetesVolumeUtils {
def parseVolumesWithPrefix(
sparkConf: SparkConf,
prefix: String
): Seq[KubernetesVolumeSpec]
private def parseVolumeSpec(
volumeName: String,
volumeConf: Map[String, String]
): KubernetesVolumeSpec
}def parseVolumesWithPrefix(
sparkConf: SparkConf,
prefix: String
): Seq[KubernetesVolumeSpec] = {
val volumeConfigs = sparkConf.getAllWithPrefix(prefix)
.groupBy { case (key, _) =>
// Extract volume name from key like "volume.data.hostPath.path"
key.split('.').headOption.getOrElse("")
}
.filter(_._1.nonEmpty)
volumeConfigs.map { case (volumeName, configs) =>
val configMap = configs.map { case (key, value) =>
// Remove volume name prefix: "data.hostPath.path" -> "hostPath.path"
key.substring(volumeName.length + 1) -> value
}.toMap
parseVolumeSpec(volumeName, configMap)
}.toSeq
}
// Usage
val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf,
"spark.kubernetes.driver.volume."
)
val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf,
"spark.kubernetes.executor.volume."
)private def parseVolumeSpec(
volumeName: String,
volumeConf: Map[String, String]
): KubernetesVolumeSpec = {
val mountPath = volumeConf.getOrElse("mount.path",
throw new IllegalArgumentException(s"Volume $volumeName missing mount.path"))
val mountSubPath = volumeConf.getOrElse("mount.subPath", "")
val mountReadOnly = volumeConf.getOrElse("mount.readOnly", "false").toBoolean
val volumeType = determineVolumeType(volumeConf)
val volumeSpecificConf = parseVolumeSpecificConf(volumeType, volumeConf)
KubernetesVolumeSpec(
volumeName = volumeName,
mountPath = mountPath,
mountSubPath = mountSubPath,
mountReadOnly = mountReadOnly,
volumeConf = volumeSpecificConf
)
}
private def determineVolumeType(volumeConf: Map[String, String]): String = {
val volumeTypes = Set("hostPath", "persistentVolumeClaim", "emptyDir", "nfs", "configMap", "secret")
val presentTypes = volumeTypes.filter { volumeType =>
volumeConf.keys.exists(_.startsWith(s"$volumeType."))
}
presentTypes.size match {
case 0 => throw new IllegalArgumentException("No volume type specified")
case 1 => presentTypes.head
case _ => throw new IllegalArgumentException(s"Multiple volume types specified: ${presentTypes.mkString(", ")}")
}
}
private def parseVolumeSpecificConf(
volumeType: String,
volumeConf: Map[String, String]
): KubernetesVolumeSpecificConf = {
volumeType match {
case "hostPath" =>
val hostPath = volumeConf.getOrElse("hostPath.path",
throw new IllegalArgumentException("hostPath volume missing path"))
KubernetesHostPathVolumeConf(hostPath)
case "persistentVolumeClaim" =>
val claimName = volumeConf.getOrElse("persistentVolumeClaim.claimName",
throw new IllegalArgumentException("PVC volume missing claimName"))
KubernetesPVCVolumeConf(claimName)
case "emptyDir" =>
val medium = volumeConf.get("emptyDir.medium")
val sizeLimit = volumeConf.get("emptyDir.sizeLimit")
KubernetesEmptyDirVolumeConf(medium, sizeLimit)
case "nfs" =>
val server = volumeConf.getOrElse("nfs.server",
throw new IllegalArgumentException("NFS volume missing server"))
val path = volumeConf.getOrElse("nfs.path",
throw new IllegalArgumentException("NFS volume missing path"))
KubernetesNFSVolumeConf(server, path)
case _ =>
throw new IllegalArgumentException(s"Unsupported volume type: $volumeType")
}
}// Host path volume configuration
spark.conf.set("spark.kubernetes.volume.data.hostPath.path", "/host/data")
spark.conf.set("spark.kubernetes.volume.data.mount.path", "/data")
spark.conf.set("spark.kubernetes.volume.data.mount.readOnly", "true")
// PVC volume configuration
spark.conf.set("spark.kubernetes.volume.storage.persistentVolumeClaim.claimName", "spark-pvc")
spark.conf.set("spark.kubernetes.volume.storage.mount.path", "/storage")
spark.conf.set("spark.kubernetes.volume.storage.mount.subPath", "spark-data")
// EmptyDir volume with memory backing
spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.medium", "Memory")
spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.sizeLimit", "1Gi")
spark.conf.set("spark.kubernetes.volume.tmp.mount.path", "/tmp")
// NFS volume configuration
spark.conf.set("spark.kubernetes.volume.shared.nfs.server", "nfs-server.example.com")
spark.conf.set("spark.kubernetes.volume.shared.nfs.path", "/shared/data")
spark.conf.set("spark.kubernetes.volume.shared.mount.path", "/shared")object ResourceUtils {
def buildResourceRequirements(
conf: KubernetesConf,
isDriver: Boolean = false
): ResourceRequirements = {
val prefix = if (isDriver) "spark.kubernetes.driver" else "spark.kubernetes.executor"
val limits = mutable.Map[String, Quantity]()
val requests = mutable.Map[String, Quantity]()
// CPU configuration
conf.sparkConf.getOption(s"$prefix.limit.cores").foreach { cores =>
limits("cpu") = new Quantity(cores)
}
conf.sparkConf.getOption(s"$prefix.request.cores").foreach { cores =>
requests("cpu") = new Quantity(cores)
}
// Memory configuration
conf.sparkConf.getOption(s"$prefix.memory").foreach { memory =>
val memoryBytes = Utils.byteStringAsBytes(memory)
val memoryMb = memoryBytes / (1024 * 1024)
// Add memory overhead
val overhead = conf.sparkConf.getOption(s"$prefix.memoryOverhead")
.map(Utils.byteStringAsBytes)
.getOrElse((memoryBytes * 0.1).toLong) // 10% default overhead
val totalMemory = memoryBytes + overhead
limits("memory") = new Quantity(s"${totalMemory}")
requests("memory") = new Quantity(s"${totalMemory}")
}
new ResourceRequirementsBuilder()
.withLimits(limits.asJava)
.withRequests(requests.asJava)
.build()
}
}object MetadataUtils {
def buildLabels(conf: KubernetesConf): Map[String, String] = {
val baseLabels = Map(
Constants.SPARK_APP_ID_LABEL -> conf.appId,
Constants.SPARK_APP_NAME_LABEL -> conf.appName,
Constants.SPARK_VERSION_LABEL -> org.apache.spark.SPARK_VERSION
)
baseLabels ++ conf.labels
}
def buildAnnotations(conf: KubernetesConf): Map[String, String] = {
val baseAnnotations = Map(
Constants.CREATED_BY_ANNOTATION -> "Apache Spark",
Constants.SPARK_APP_NAME_ANNOTATION -> conf.appName
)
baseAnnotations ++ conf.annotations
}
def validateLabelKey(key: String): Unit = {
require(key.matches("[a-z0-9A-Z]([a-z0-9A-Z._-]*[a-z0-9A-Z])?"),
s"Invalid label key: $key")
require(key.length <= 63, s"Label key too long: $key")
}
def validateLabelValue(value: String): Unit = {
require(value.matches("[a-z0-9A-Z]([a-z0-9A-Z._-]*[a-z0-9A-Z])?") || value.isEmpty,
s"Invalid label value: $value")
require(value.length <= 63, s"Label value too long: $value")
}
}object KubernetesExceptionUtils {
def handleKubernetesApiException[T](operation: String)(block: => T): Try[T] = {
Try(block).recover {
case e: KubernetesClientException =>
logError(s"Kubernetes API error during $operation: ${e.getMessage}")
throw new SparkException(s"Failed to $operation: ${e.getMessage}", e)
case e: IOException =>
logError(s"Network error during $operation: ${e.getMessage}")
throw new SparkException(s"Network error during $operation: ${e.getMessage}", e)
case e: Exception =>
logError(s"Unexpected error during $operation: ${e.getMessage}")
throw new SparkException(s"Unexpected error during $operation: ${e.getMessage}", e)
}
}
def withRetry[T](
operation: String,
maxRetries: Int = 3,
backoffMillis: Long = 1000
)(block: => T): T = {
var lastException: Exception = null
for (attempt <- 1 to maxRetries) {
try {
return block
} catch {
case e: Exception =>
lastException = e
if (attempt < maxRetries) {
logWarning(s"$operation failed (attempt $attempt/$maxRetries), retrying in ${backoffMillis}ms")
Thread.sleep(backoffMillis * attempt) // Exponential backoff
}
}
}
throw new SparkException(s"$operation failed after $maxRetries attempts", lastException)
}
}
// Usage
val pod = KubernetesExceptionUtils.withRetry("create executor pod") {
kubernetesClient.pods()
.inNamespace(namespace)
.create(podSpec)
}object ConfigValidationUtils {
def validateImageName(image: String): Unit = {
require(image.nonEmpty, "Container image cannot be empty")
// Basic image name validation
val imagePattern = "^([a-zA-Z0-9._-]+(/[a-zA-Z0-9._-]+)*)(:([a-zA-Z0-9._-]+))?$".r
require(imagePattern.matches(image), s"Invalid container image format: $image")
// Warn about latest tag
if (image.endsWith(":latest")) {
logWarning("Using 'latest' tag is discouraged in production environments")
}
}
def validateNamespaceName(namespace: String): Unit = {
require(namespace.nonEmpty, "Kubernetes namespace cannot be empty")
require(namespace.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),
s"Invalid namespace name: $namespace")
require(namespace.length <= 63, s"Namespace name too long: $namespace")
}
def validateResourceName(name: String): Unit = {
require(name.nonEmpty, "Resource name cannot be empty")
require(name.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),
s"Invalid resource name: $name")
require(name.length <= 253, s"Resource name too long: $name")
}
def validateMemoryString(memory: String): Unit = {
try {
val bytes = Utils.byteStringAsBytes(memory)
require(bytes > 0, "Memory must be positive")
} catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(s"Invalid memory format: $memory")
}
}
}// Common utility operations combined
object KubernetesOperations {
def createConfiguredPod(
conf: KubernetesConf,
podTemplate: Option[File] = None
): SparkPod = {
val client = SparkKubernetesClientFactory.createKubernetesClient(
ClientType.Driver,
Some(conf)
)
val basePod = podTemplate match {
case Some(template) =>
KubernetesUtils.loadPodFromTemplate(client, template, None)
case None =>
SparkPod.initialPod()
}
val volumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
conf.sparkConf,
"spark.kubernetes.volume."
)
// Apply volumes and other configurations
applyVolumesToPod(basePod, volumes)
}
private def applyVolumesToPod(
pod: SparkPod,
volumes: Seq[KubernetesVolumeSpec]
): SparkPod = {
// Implementation using MountVolumesFeatureStep logic
new MountVolumesFeatureStep(volumes).configurePod(pod)
}
}The utilities layer provides the essential building blocks and helper functions that make the Kubernetes integration robust, flexible, and easy to use across all components of the Spark Kubernetes resource manager.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-kubernetes-2-12