CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-kubernetes-2-12

Apache Spark Kubernetes resource manager that enables running Spark applications on Kubernetes clusters

Overview
Eval results
Files

utilities.mddocs/

Utilities and Helpers

The utilities layer provides essential helper functions, client management, and common operations that support the Kubernetes integration throughout the Apache Spark ecosystem. These utilities handle everything from configuration parsing to client creation and volume management.

Core Utilities

KubernetesUtils { .api }

Comprehensive collection of Kubernetes-specific utility functions:

object KubernetesUtils {
  
  def parsePrefixedKeyValuePairs(
    sparkConf: SparkConf, 
    prefix: String
  ): Map[String, String]
  
  def loadPodFromTemplate(
    kubernetesClient: KubernetesClient,
    templateFile: File,
    containerName: Option[String]
  ): SparkPod
  
  def selectSparkContainer(
    pod: Pod, 
    containerName: Option[String]
  ): SparkPod
  
  def requireBothOrNeitherDefined[T](
    opt1: Option[T], 
    opt2: Option[T], 
    errMessage: String
  ): Unit
}

Configuration Parsing Utilities

Prefixed Key-Value Parsing

// Parse configuration properties with a common prefix
def parsePrefixedKeyValuePairs(
  sparkConf: SparkConf, 
  prefix: String
): Map[String, String] = {
  
  sparkConf.getAllWithPrefix(prefix).filter { case (key, value) =>
    key.nonEmpty && value.nonEmpty
  }.toMap
}

// Usage examples
val driverLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
  sparkConf, 
  "spark.kubernetes.driver.label."
)
// Input: spark.kubernetes.driver.label.app=myapp, spark.kubernetes.driver.label.version=1.0
// Output: Map("app" -> "myapp", "version" -> "1.0")

val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
  sparkConf,
  "spark.kubernetes.executor.annotation."  
)
// Input: spark.kubernetes.executor.annotation.prometheus.io/scrape=true
// Output: Map("prometheus.io/scrape" -> "true")

Validation Utilities

// Require both options to be defined or both to be None
def requireBothOrNeitherDefined[T](
  opt1: Option[T], 
  opt2: Option[T], 
  errMessage: String
): Unit = {
  (opt1, opt2) match {
    case (Some(_), None) | (None, Some(_)) => 
      throw new IllegalArgumentException(errMessage)
    case _ => // Both defined or both None - valid
  }
}

// Usage
KubernetesUtils.requireBothOrNeitherDefined(
  conf.get(CLIENT_KEY_FILE),
  conf.get(CLIENT_CERT_FILE),
  "Both client key file and client cert file must be specified for mutual TLS"
)

Resource Name Generation

def generateResourceName(
  appName: String, 
  appId: String,
  resourceType: String
): String = {
  val sanitizedAppName = appName
    .toLowerCase
    .replaceAll("[^a-z0-9-]", "-")
    .replaceAll("-+", "-")
    .take(30)
  
  val shortAppId = appId.take(8)
  
  s"$sanitizedAppName-$shortAppId-$resourceType"
}

// Usage
val driverPodName = KubernetesUtils.generateResourceName(
  "My Spark App", 
  "app-123456789", 
  "driver"
)
// Result: "my-spark-app-app-1234-driver"

Pod Template Management

Template Loading

def loadPodFromTemplate(
  kubernetesClient: KubernetesClient,
  templateFile: File,
  containerName: Option[String]
): SparkPod = {
  
  require(templateFile.exists(), s"Pod template file does not exist: ${templateFile.getPath}")
  
  val podTemplate = try {
    kubernetesClient.pods()
      .load(new FileInputStream(templateFile))
      .get()
  } catch {
    case e: Exception =>
      throw new RuntimeException(s"Failed to load pod template from ${templateFile.getPath}", e)
  }
  
  selectSparkContainer(podTemplate, containerName)
}

// Usage
val templatePod = KubernetesUtils.loadPodFromTemplate(
  kubernetesClient,
  new File("/templates/driver-pod.yaml"),
  Some(Constants.DRIVER_CONTAINER_NAME)
)

Container Selection

def selectSparkContainer(
  pod: Pod, 
  containerName: Option[String]
): SparkPod = {
  
  val containers = pod.getSpec.getContainers.asScala
  
  val selectedContainer = containerName match {
    case Some(name) => 
      containers.find(_.getName == name).getOrElse {
        throw new RuntimeException(s"Container '$name' not found in pod template")
      }
    case None =>
      if (containers.size == 1) {
        containers.head
      } else {
        throw new RuntimeException(
          s"Pod template contains ${containers.size} containers. " +
          "Must specify containerName when template has multiple containers."
        )
      }
  }
  
  SparkPod(pod, selectedContainer)
}

Client Management

SparkKubernetesClientFactory { .api }

Factory for creating configured Kubernetes client instances:

object SparkKubernetesClientFactory {
  
  def createKubernetesClient(
    clientType: ClientType,
    kubernetesConf: Option[KubernetesConf] = None,
    sparkConf: Option[SparkConf] = None,
    defaultServiceAccountToken: Option[String] = None,
    clientContext: Option[String] = None
  ): KubernetesClient
  
  sealed trait ClientType
  object ClientType {
    case object Driver extends ClientType
    case object Executor extends ClientType  
    case object Submission extends ClientType
  }
}

Client Configuration

Basic Client Creation

// Driver client for executor management
val driverClient = SparkKubernetesClientFactory.createKubernetesClient(
  ClientType.Driver,
  kubernetesConf = Some(driverConf)
)

// Submission client for application deployment
val submissionClient = SparkKubernetesClientFactory.createKubernetesClient(
  ClientType.Submission,
  sparkConf = Some(conf),
  clientContext = Some("my-cluster-context")
)

// Executor client for pod operations  
val executorClient = SparkKubernetesClientFactory.createKubernetesClient(
  ClientType.Executor,
  kubernetesConf = Some(executorConf),
  defaultServiceAccountToken = Some(serviceAccountToken)
)

Authentication Configuration

def createKubernetesClient(
  clientType: ClientType,
  kubernetesConf: Option[KubernetesConf] = None,
  sparkConf: Option[SparkConf] = None,
  defaultServiceAccountToken: Option[String] = None,
  clientContext: Option[String] = None
): KubernetesClient = {

  val config = new ConfigBuilder()
    .withApiVersion("v1")
    
  // Configure API server URL
  sparkConf.flatMap(_.getOption("spark.kubernetes.apiserver.host")).foreach { host =>
    config.withMasterUrl(host)
  }
  
  // Configure authentication
  configureAuthentication(config, sparkConf, defaultServiceAccountToken)
  
  // Configure TLS
  configureTls(config, sparkConf)
  
  // Configure context
  clientContext.foreach(config.withCurrentContext)
  
  DefaultKubernetesClient.fromConfig(config.build())
}

private def configureAuthentication(
  config: ConfigBuilder, 
  sparkConf: Option[SparkConf],
  defaultServiceAccountToken: Option[String]
): Unit = {
  
  sparkConf.foreach { conf =>
    // OAuth token authentication
    conf.getOption(OAUTH_TOKEN.key).orElse {
      conf.getOption(OAUTH_TOKEN_FILE.key).map { tokenFile =>
        Files.readAllLines(Paths.get(tokenFile)).asScala.mkString
      }
    }.foreach(config.withOauthToken)
    
    // Username/password authentication
    conf.getOption("spark.kubernetes.authenticate.username").foreach { username =>
      config.withUsername(username)
      conf.getOption("spark.kubernetes.authenticate.password").foreach { password =>
        config.withPassword(password)
      }
    }
    
    // Service account token
    conf.getOption(KUBERNETES_SERVICE_ACCOUNT_NAME.key).orElse {
      defaultServiceAccountToken
    }.foreach(config.withOauthToken)
  }
}

private def configureTls(
  config: ConfigBuilder, 
  sparkConf: Option[SparkConf]
): Unit = {
  
  sparkConf.foreach { conf =>
    // CA certificate
    conf.getOption(CA_CERT_FILE.key).foreach { caCertFile =>
      config.withCaCertFile(caCertFile)
    }
    
    // Client certificates for mutual TLS
    val clientCertFile = conf.getOption(CLIENT_CERT_FILE.key)
    val clientKeyFile = conf.getOption(CLIENT_KEY_FILE.key)
    
    (clientCertFile, clientKeyFile) match {
      case (Some(certFile), Some(keyFile)) =>
        config.withClientCertFile(certFile)
        config.withClientKeyFile(keyFile)
      case _ => // No client certificates
    }
    
    // Trust all certificates (for development only)
    if (conf.getBoolean("spark.kubernetes.apiserver.trustCerts", false)) {
      config.withTrustCerts(true)
    }
  }
}

Volume Utilities

KubernetesVolumeUtils { .api }

Utilities for parsing and handling Kubernetes volume configurations:

object KubernetesVolumeUtils {
  
  def parseVolumesWithPrefix(
    sparkConf: SparkConf, 
    prefix: String
  ): Seq[KubernetesVolumeSpec]
  
  private def parseVolumeSpec(
    volumeName: String,
    volumeConf: Map[String, String]
  ): KubernetesVolumeSpec
}

Volume Parsing Implementation

Prefix-Based Volume Configuration

def parseVolumesWithPrefix(
  sparkConf: SparkConf, 
  prefix: String
): Seq[KubernetesVolumeSpec] = {
  
  val volumeConfigs = sparkConf.getAllWithPrefix(prefix)
    .groupBy { case (key, _) => 
      // Extract volume name from key like "volume.data.hostPath.path"
      key.split('.').headOption.getOrElse("")
    }
    .filter(_._1.nonEmpty)
  
  volumeConfigs.map { case (volumeName, configs) =>
    val configMap = configs.map { case (key, value) =>
      // Remove volume name prefix: "data.hostPath.path" -> "hostPath.path"
      key.substring(volumeName.length + 1) -> value
    }.toMap
    
    parseVolumeSpec(volumeName, configMap)
  }.toSeq
}

// Usage
val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
  sparkConf, 
  "spark.kubernetes.driver.volume."
)

val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
  sparkConf,
  "spark.kubernetes.executor.volume."
)

Volume Specification Parsing

private def parseVolumeSpec(
  volumeName: String,
  volumeConf: Map[String, String]
): KubernetesVolumeSpec = {
  
  val mountPath = volumeConf.getOrElse("mount.path", 
    throw new IllegalArgumentException(s"Volume $volumeName missing mount.path"))
  
  val mountSubPath = volumeConf.getOrElse("mount.subPath", "")
  val mountReadOnly = volumeConf.getOrElse("mount.readOnly", "false").toBoolean
  
  val volumeType = determineVolumeType(volumeConf)
  val volumeSpecificConf = parseVolumeSpecificConf(volumeType, volumeConf)
  
  KubernetesVolumeSpec(
    volumeName = volumeName,
    mountPath = mountPath,
    mountSubPath = mountSubPath,
    mountReadOnly = mountReadOnly,
    volumeConf = volumeSpecificConf
  )
}

private def determineVolumeType(volumeConf: Map[String, String]): String = {
  val volumeTypes = Set("hostPath", "persistentVolumeClaim", "emptyDir", "nfs", "configMap", "secret")
  
  val presentTypes = volumeTypes.filter { volumeType =>
    volumeConf.keys.exists(_.startsWith(s"$volumeType."))
  }
  
  presentTypes.size match {
    case 0 => throw new IllegalArgumentException("No volume type specified")
    case 1 => presentTypes.head
    case _ => throw new IllegalArgumentException(s"Multiple volume types specified: ${presentTypes.mkString(", ")}")
  }
}

private def parseVolumeSpecificConf(
  volumeType: String, 
  volumeConf: Map[String, String]
): KubernetesVolumeSpecificConf = {
  
  volumeType match {
    case "hostPath" =>
      val hostPath = volumeConf.getOrElse("hostPath.path",
        throw new IllegalArgumentException("hostPath volume missing path"))
      KubernetesHostPathVolumeConf(hostPath)
      
    case "persistentVolumeClaim" =>
      val claimName = volumeConf.getOrElse("persistentVolumeClaim.claimName",
        throw new IllegalArgumentException("PVC volume missing claimName"))
      KubernetesPVCVolumeConf(claimName)
      
    case "emptyDir" =>
      val medium = volumeConf.get("emptyDir.medium")
      val sizeLimit = volumeConf.get("emptyDir.sizeLimit")
      KubernetesEmptyDirVolumeConf(medium, sizeLimit)
      
    case "nfs" =>
      val server = volumeConf.getOrElse("nfs.server",
        throw new IllegalArgumentException("NFS volume missing server"))
      val path = volumeConf.getOrElse("nfs.path",
        throw new IllegalArgumentException("NFS volume missing path"))
      KubernetesNFSVolumeConf(server, path)
      
    case _ =>
      throw new IllegalArgumentException(s"Unsupported volume type: $volumeType")
  }
}

Volume Configuration Examples

// Host path volume configuration
spark.conf.set("spark.kubernetes.volume.data.hostPath.path", "/host/data")
spark.conf.set("spark.kubernetes.volume.data.mount.path", "/data")
spark.conf.set("spark.kubernetes.volume.data.mount.readOnly", "true")

// PVC volume configuration  
spark.conf.set("spark.kubernetes.volume.storage.persistentVolumeClaim.claimName", "spark-pvc")
spark.conf.set("spark.kubernetes.volume.storage.mount.path", "/storage")
spark.conf.set("spark.kubernetes.volume.storage.mount.subPath", "spark-data")

// EmptyDir volume with memory backing
spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.medium", "Memory")
spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.sizeLimit", "1Gi")
spark.conf.set("spark.kubernetes.volume.tmp.mount.path", "/tmp")

// NFS volume configuration
spark.conf.set("spark.kubernetes.volume.shared.nfs.server", "nfs-server.example.com")
spark.conf.set("spark.kubernetes.volume.shared.nfs.path", "/shared/data")
spark.conf.set("spark.kubernetes.volume.shared.mount.path", "/shared")

Resource Management Utilities

Resource Requirement Building

object ResourceUtils {
  
  def buildResourceRequirements(
    conf: KubernetesConf,
    isDriver: Boolean = false
  ): ResourceRequirements = {
    
    val prefix = if (isDriver) "spark.kubernetes.driver" else "spark.kubernetes.executor"
    
    val limits = mutable.Map[String, Quantity]()
    val requests = mutable.Map[String, Quantity]()
    
    // CPU configuration
    conf.sparkConf.getOption(s"$prefix.limit.cores").foreach { cores =>
      limits("cpu") = new Quantity(cores)
    }
    
    conf.sparkConf.getOption(s"$prefix.request.cores").foreach { cores =>
      requests("cpu") = new Quantity(cores)
    }
    
    // Memory configuration
    conf.sparkConf.getOption(s"$prefix.memory").foreach { memory =>
      val memoryBytes = Utils.byteStringAsBytes(memory)
      val memoryMb = memoryBytes / (1024 * 1024)
      
      // Add memory overhead
      val overhead = conf.sparkConf.getOption(s"$prefix.memoryOverhead")
        .map(Utils.byteStringAsBytes)
        .getOrElse((memoryBytes * 0.1).toLong) // 10% default overhead
      
      val totalMemory = memoryBytes + overhead
      limits("memory") = new Quantity(s"${totalMemory}")
      requests("memory") = new Quantity(s"${totalMemory}")
    }
    
    new ResourceRequirementsBuilder()
      .withLimits(limits.asJava)
      .withRequests(requests.asJava)
      .build()
  }
}

Label and Annotation Utilities

object MetadataUtils {
  
  def buildLabels(conf: KubernetesConf): Map[String, String] = {
    val baseLabels = Map(
      Constants.SPARK_APP_ID_LABEL -> conf.appId,
      Constants.SPARK_APP_NAME_LABEL -> conf.appName,
      Constants.SPARK_VERSION_LABEL -> org.apache.spark.SPARK_VERSION
    )
    
    baseLabels ++ conf.labels
  }
  
  def buildAnnotations(conf: KubernetesConf): Map[String, String] = {
    val baseAnnotations = Map(
      Constants.CREATED_BY_ANNOTATION -> "Apache Spark",
      Constants.SPARK_APP_NAME_ANNOTATION -> conf.appName
    )
    
    baseAnnotations ++ conf.annotations
  }
  
  def validateLabelKey(key: String): Unit = {
    require(key.matches("[a-z0-9A-Z]([a-z0-9A-Z._-]*[a-z0-9A-Z])?"),
      s"Invalid label key: $key")
    require(key.length <= 63, s"Label key too long: $key")
  }
  
  def validateLabelValue(value: String): Unit = {
    require(value.matches("[a-z0-9A-Z]([a-z0-9A-Z._-]*[a-z0-9A-Z])?") || value.isEmpty,
      s"Invalid label value: $value")
    require(value.length <= 63, s"Label value too long: $value")
  }
}

Error Handling Utilities

Kubernetes Exception Handling

object KubernetesExceptionUtils {
  
  def handleKubernetesApiException[T](operation: String)(block: => T): Try[T] = {
    Try(block).recover {
      case e: KubernetesClientException =>
        logError(s"Kubernetes API error during $operation: ${e.getMessage}")
        throw new SparkException(s"Failed to $operation: ${e.getMessage}", e)
        
      case e: IOException =>
        logError(s"Network error during $operation: ${e.getMessage}")
        throw new SparkException(s"Network error during $operation: ${e.getMessage}", e)
        
      case e: Exception =>
        logError(s"Unexpected error during $operation: ${e.getMessage}")
        throw new SparkException(s"Unexpected error during $operation: ${e.getMessage}", e)
    }
  }
  
  def withRetry[T](
    operation: String,
    maxRetries: Int = 3,
    backoffMillis: Long = 1000
  )(block: => T): T = {
    
    var lastException: Exception = null
    
    for (attempt <- 1 to maxRetries) {
      try {
        return block
      } catch {
        case e: Exception =>
          lastException = e
          if (attempt < maxRetries) {
            logWarning(s"$operation failed (attempt $attempt/$maxRetries), retrying in ${backoffMillis}ms")
            Thread.sleep(backoffMillis * attempt) // Exponential backoff
          }
      }
    }
    
    throw new SparkException(s"$operation failed after $maxRetries attempts", lastException)
  }
}

// Usage
val pod = KubernetesExceptionUtils.withRetry("create executor pod") {
  kubernetesClient.pods()
    .inNamespace(namespace)  
    .create(podSpec)
}

Configuration Validation Utilities

object ConfigValidationUtils {
  
  def validateImageName(image: String): Unit = {
    require(image.nonEmpty, "Container image cannot be empty")
    
    // Basic image name validation
    val imagePattern = "^([a-zA-Z0-9._-]+(/[a-zA-Z0-9._-]+)*)(:([a-zA-Z0-9._-]+))?$".r
    require(imagePattern.matches(image), s"Invalid container image format: $image")
    
    // Warn about latest tag
    if (image.endsWith(":latest")) {
      logWarning("Using 'latest' tag is discouraged in production environments")
    }
  }
  
  def validateNamespaceName(namespace: String): Unit = {
    require(namespace.nonEmpty, "Kubernetes namespace cannot be empty")
    require(namespace.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"), 
      s"Invalid namespace name: $namespace")
    require(namespace.length <= 63, s"Namespace name too long: $namespace")
  }
  
  def validateResourceName(name: String): Unit = {
    require(name.nonEmpty, "Resource name cannot be empty")
    require(name.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),
      s"Invalid resource name: $name")
    require(name.length <= 253, s"Resource name too long: $name")
  }
  
  def validateMemoryString(memory: String): Unit = {
    try {
      val bytes = Utils.byteStringAsBytes(memory)
      require(bytes > 0, "Memory must be positive")
    } catch {
      case _: NumberFormatException =>
        throw new IllegalArgumentException(s"Invalid memory format: $memory")
    }
  }
}

Integration Patterns

Utility Composition

// Common utility operations combined
object KubernetesOperations {
  
  def createConfiguredPod(
    conf: KubernetesConf,
    podTemplate: Option[File] = None
  ): SparkPod = {
    
    val client = SparkKubernetesClientFactory.createKubernetesClient(
      ClientType.Driver,
      Some(conf)
    )
    
    val basePod = podTemplate match {
      case Some(template) =>
        KubernetesUtils.loadPodFromTemplate(client, template, None)
      case None =>
        SparkPod.initialPod()
    }
    
    val volumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
      conf.sparkConf,
      "spark.kubernetes.volume."
    )
    
    // Apply volumes and other configurations
    applyVolumesToPod(basePod, volumes)
  }
  
  private def applyVolumesToPod(
    pod: SparkPod, 
    volumes: Seq[KubernetesVolumeSpec]
  ): SparkPod = {
    // Implementation using MountVolumesFeatureStep logic
    new MountVolumesFeatureStep(volumes).configurePod(pod)
  }
}

The utilities layer provides the essential building blocks and helper functions that make the Kubernetes integration robust, flexible, and easy to use across all components of the Spark Kubernetes resource manager.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-kubernetes-2-12

docs

application-submission.md

cluster-management.md

configuration.md

feature-steps.md

index.md

pod-management.md

utilities.md

tile.json