or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-submission.mdcluster-management.mdconfiguration.mdfeature-steps.mdindex.mdpod-management.mdutilities.md
tile.json

configuration.mddocs/

Configuration Management

The configuration management system provides a comprehensive, type-safe approach to configuring Kubernetes deployments for Spark applications, extending Spark's native configuration framework with Kubernetes-specific options and validation.

Configuration Architecture

KubernetesConf Hierarchy { .api }

Base abstract class containing all metadata needed for Kubernetes pod creation and management:

abstract class KubernetesConf(val sparkConf: SparkConf) {
  
  val resourceNamePrefix: String
  def labels: Map[String, String]
  def environment: Map[String, String]
  def annotations: Map[String, String]
  def secretEnvNamesToKeyRefs: Map[String, String]
  def secretNamesToMountPaths: Map[String, String]
  def volumes: Seq[KubernetesVolumeSpec]

  def appName: String = get("spark.app.name", "spark")
  def namespace: String = get(KUBERNETES_NAMESPACE) 
  def imagePullPolicy: String = get(CONTAINER_IMAGE_PULL_POLICY)
  def nodeSelector: Map[String, String] = 
    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
    
  // Utility methods for configuration access
  def contains(config: ConfigEntry[_]): Boolean = sparkConf.contains(config)
  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
  def get(conf: String): String = sparkConf.get(conf)
  def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue)
  def getOption(key: String): Option[String] = sparkConf.getOption(key)
}

Core Properties:

  • Resource Naming: resourceNamePrefix ensures unique Kubernetes resource names
  • Metadata: labels and annotations for pod identification and configuration
  • Environment: Environment variables and secret references
  • Storage: Volume specifications and secret mount paths
  • Scheduling: Node selectors and image pull policies

KubernetesDriverConf { .api }

Driver-specific configuration extending the base configuration:

class KubernetesDriverConf(
  sparkConf: SparkConf,
  val appId: String,
  val mainAppResource: MainAppResource,
  val mainClass: String,
  val appArgs: Array[String]
) extends KubernetesConf(sparkConf) {
  
  def serviceAnnotations: Map[String, String]
}

Driver-Specific Features:

  • Application Resources: Main class, application JAR, and command-line arguments
  • Service Configuration: Annotations for Kubernetes service creation
  • Network Configuration: Driver port and service type settings

Creation Pattern:

val driverConf = KubernetesConf.createDriverConf(
  sparkConf = conf,
  appName = "my-spark-app",
  appResourceNamePrefix = s"spark-${UUID.randomUUID().toString.take(8)}",
  appId = "app-123456789",
  mainAppResource = JavaMainAppResource(Some("local:///opt/spark/jars/my-app.jar")),
  mainClass = "com.example.MySparkApp",
  appArgs = Array("--input", "/data", "--output", "/results")
)

KubernetesExecutorConf { .api }

Executor-specific configuration for worker pod creation:

class KubernetesExecutorConf(
  sparkConf: SparkConf,
  val appId: String,
  val executorId: String,
  val driverPod: Option[Pod]
) extends KubernetesConf(sparkConf)

Executor-Specific Features:

  • Executor Identity: Unique executor ID for tracking and management
  • Driver Connection: Reference to driver pod for communication setup
  • Resource Allocation: Executor-specific CPU, memory, and storage configuration

Configuration Definitions

Config Object { .api }

Centralized configuration definitions using Spark's ConfigBuilder pattern:

object Config {
  // Container Images
  val CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.container.image")
    .doc("Container image to use for Spark pods")
    .stringConf
    .createWithDefault(null)

  val DRIVER_CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.driver.container.image")
    .doc("Container image to use for driver pod")
    .fallbackConf(CONTAINER_IMAGE)

  val EXECUTOR_CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.executor.container.image") 
    .doc("Container image to use for executor pods")
    .fallbackConf(CONTAINER_IMAGE)

  // Namespace and Authentication
  val KUBERNETES_NAMESPACE = ConfigBuilder("spark.kubernetes.namespace")
    .doc("Kubernetes namespace for Spark pods")
    .stringConf
    .createWithDefault("default")

  val KUBERNETES_CONTEXT = ConfigBuilder("spark.kubernetes.context")
    .doc("Kubernetes context to use")
    .stringConf
    .createOptional

  val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder("spark.kubernetes.authenticate.serviceAccountName")
    .doc("Service account for Spark pods")
    .stringConf
    .createOptional

  // Resource Limits
  val KUBERNETES_DRIVER_LIMIT_CORES = ConfigBuilder("spark.kubernetes.driver.limit.cores")
    .doc("CPU limit for driver pod")
    .stringConf
    .createOptional

  val KUBERNETES_EXECUTOR_LIMIT_CORES = ConfigBuilder("spark.kubernetes.executor.limit.cores")
    .doc("CPU limit for executor pods")
    .stringConf
    .createOptional

  // Dynamic Allocation
  val DYN_ALLOCATION_ENABLED = ConfigBuilder("spark.dynamicAllocation.enabled")
    .doc("Enable dynamic allocation of executors")
    .booleanConf
    .createWithDefault(false)

  val DYN_ALLOCATION_MIN_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.minExecutors")
    .doc("Minimum number of executors")
    .intConf
    .createWithDefault(0)

  val DYN_ALLOCATION_MAX_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.maxExecutors")
    .doc("Maximum number of executors")
    .intConf
    .createWithDefault(Int.MaxValue)

  // Networking
  val KUBERNETES_DRIVER_POD_NAME = ConfigBuilder("spark.kubernetes.driver.pod.name")
    .doc("Name of the driver pod")
    .stringConf
    .createOptional

  val KUBERNETES_DRIVER_SERVICE_TYPE = ConfigBuilder("spark.kubernetes.driver.service.type")
    .doc("Service type for driver pod")
    .stringConf
    .createWithDefault("ClusterIP")

  // Volume Configuration
  val KUBERNETES_VOLUMES_PREFIX = "spark.kubernetes.volume."
  val KUBERNETES_DRIVER_VOLUMES_PREFIX = "spark.kubernetes.driver.volume."  
  val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volume."

  // Pod Templates
  val KUBERNETES_DRIVER_PODTEMPLATE_FILE = ConfigBuilder("spark.kubernetes.driver.podTemplateFile")
    .doc("Path to driver pod template file")
    .stringConf
    .createOptional

  val KUBERNETES_EXECUTOR_PODTEMPLATE_FILE = ConfigBuilder("spark.kubernetes.executor.podTemplateFile")
    .doc("Path to executor pod template file")  
    .stringConf
    .createOptional
}

Configuration Categories

Authentication and Security

// Service Account Configuration
val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder("spark.kubernetes.authenticate.serviceAccountName")

// OAuth Token Authentication
val OAUTH_TOKEN = ConfigBuilder("spark.kubernetes.authenticate.oauthToken")
val OAUTH_TOKEN_FILE = ConfigBuilder("spark.kubernetes.authenticate.oauthTokenFile")

// Certificate Authentication  
val CLIENT_KEY_FILE = ConfigBuilder("spark.kubernetes.authenticate.clientKeyFile")
val CLIENT_CERT_FILE = ConfigBuilder("spark.kubernetes.authenticate.clientCertFile")
val CA_CERT_FILE = ConfigBuilder("spark.kubernetes.authenticate.caCertFile")

Resource Management

// CPU Configuration
val KUBERNETES_DRIVER_REQUEST_CORES = ConfigBuilder("spark.kubernetes.driver.request.cores")
val KUBERNETES_EXECUTOR_REQUEST_CORES = ConfigBuilder("spark.kubernetes.executor.request.cores")

// Memory Configuration
val KUBERNETES_DRIVER_MEMORY = ConfigBuilder("spark.kubernetes.driver.memory")
val KUBERNETES_EXECUTOR_MEMORY = ConfigBuilder("spark.kubernetes.executor.memory")
val MEMORY_OVERHEAD_FACTOR = ConfigBuilder("spark.kubernetes.memoryOverheadFactor")

// Instance Configuration
val KUBERNETES_EXECUTOR_INSTANCES = ConfigBuilder("spark.kubernetes.executor.instances")

Storage and Volumes

// Local Directory Configuration
val KUBERNETES_LOCAL_DIRS_TMPFS = ConfigBuilder("spark.kubernetes.local.dirs.tmpfs")

// Volume Mount Prefixes
val KUBERNETES_VOLUMES_HOSTPATH_PREFIX = "spark.kubernetes.volume.hostPath"
val KUBERNETES_VOLUMES_PVC_PREFIX = "spark.kubernetes.volume.persistentVolumeClaim"
val KUBERNETES_VOLUMES_EMPTYDIR_PREFIX = "spark.kubernetes.volume.emptyDir"

Constants and Defaults

Constants Object { .api }

Kubernetes-specific constants and default values:

object Constants {
  // Labels
  val SPARK_APP_ID_LABEL = "spark-app-selector"
  val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
  val SPARK_ROLE_LABEL = "spark-role"
  val SPARK_VERSION_LABEL = "spark-version"

  // Label Values  
  val SPARK_APP_NAME_LABEL = "spark-app-name"
  val DRIVER_ROLE = "driver"
  val EXECUTOR_ROLE = "executor"

  // Ports
  val DEFAULT_DRIVER_PORT = 7077
  val DEFAULT_BLOCKMANAGER_PORT = 7078
  val DEFAULT_UI_PORT = 4040

  // Environment Variables
  val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
  val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
  val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
  val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
  val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"

  // Paths
  val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"
  val SPARK_CONF_FILE_NAME = "spark.conf"
  val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf"
  val KERBEROS_CONF_DIR_PATH = "/opt/kerberos"

  // Resource Types
  val APP_RESOURCE_TYPE_JAVA = "java"
  val APP_RESOURCE_TYPE_PYTHON = "python"
  val APP_RESOURCE_TYPE_R = "r"

  // Container Names
  val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
  val EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"

  // Annotations
  val CREATED_BY_ANNOTATION = "created-by"
  val SPARK_APP_NAME_ANNOTATION = "spark-app-name"
}

Default Values and Validation

// Image Pull Policies
object ImagePullPolicy extends Enumeration {
  val Always, Never, IfNotPresent = Value
}

// Service Types
object ServiceType extends Enumeration {
  val ClusterIP, NodePort, LoadBalancer = Value
}

// Restart Policies  
object RestartPolicy extends Enumeration {
  val Always, OnFailure, Never = Value
}

Volume Configuration

KubernetesVolumeSpec { .api }

Comprehensive volume specification supporting multiple Kubernetes volume types:

case class KubernetesVolumeSpec(
  volumeName: String,
  mountPath: String,
  mountSubPath: String = "",
  mountReadOnly: Boolean = false,
  volumeConf: KubernetesVolumeSpecificConf
)

Volume Types { .api }

// Host Path Volumes
case class KubernetesHostPathVolumeConf(
  hostPath: String
) extends KubernetesVolumeSpecificConf

// Persistent Volume Claims
case class KubernetesPVCVolumeConf(
  claimName: String
) extends KubernetesVolumeSpecificConf

// Empty Directory Volumes
case class KubernetesEmptyDirVolumeConf(
  medium: Option[String] = None,
  sizeLimit: Option[String] = None
) extends KubernetesVolumeSpecificConf

// NFS Volumes
case class KubernetesNFSVolumeConf(
  server: String,
  path: String
) extends KubernetesVolumeSpecificConf

Volume Configuration Examples

// Host path volume
spark.conf.set("spark.kubernetes.volume.data.hostPath.path", "/host/data")
spark.conf.set("spark.kubernetes.volume.data.mount.path", "/data")
spark.conf.set("spark.kubernetes.volume.data.mount.readOnly", "true")

// PVC volume
spark.conf.set("spark.kubernetes.volume.storage.persistentVolumeClaim.claimName", "spark-pvc")
spark.conf.set("spark.kubernetes.volume.storage.mount.path", "/storage")

// Empty dir with size limit
spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.medium", "Memory")
spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.sizeLimit", "1Gi")
spark.conf.set("spark.kubernetes.volume.tmp.mount.path", "/tmp")

Configuration Validation

Validation Framework

object ConfigValidation {
  
  def validateDriverConfig(conf: KubernetesDriverConf): Unit = {
    validateBaseConfig(conf)
    validateDriverSpecific(conf)
  }
  
  def validateExecutorConfig(conf: KubernetesExecutorConf): Unit = {
    validateBaseConfig(conf)
    validateExecutorSpecific(conf)
  }
  
  private def validateBaseConfig(conf: KubernetesConf): Unit = {
    // Required configurations
    require(conf.get(CONTAINER_IMAGE).nonEmpty, 
      "Container image must be specified with spark.kubernetes.container.image")
    
    require(conf.namespace.nonEmpty, 
      "Kubernetes namespace must be specified")
    
    // Resource validation
    validateResourceLimits(conf)
    validateVolumeConfiguration(conf)
    validateNetworkConfiguration(conf)
  }
  
  private def validateResourceLimits(conf: KubernetesConf): Unit = {
    // CPU validation
    conf.get(KUBERNETES_DRIVER_LIMIT_CORES).foreach { cores =>
      require(cores.toDouble > 0, "Driver CPU limit must be positive")
    }
    
    // Memory validation  
    conf.get(KUBERNETES_DRIVER_MEMORY).foreach { memory =>
      require(Utils.byteStringAsBytes(memory) > 0, "Driver memory must be positive")
    }
  }
  
  private def validateVolumeConfiguration(conf: KubernetesConf): Unit = {
    conf.volumes.foreach { volume =>
      require(volume.volumeName.nonEmpty, "Volume name cannot be empty")
      require(volume.mountPath.nonEmpty, "Mount path cannot be empty")
      require(volume.mountPath.startsWith("/"), "Mount path must be absolute")
    }
  }
}

Common Validation Rules

// Image name validation
def validateImageName(image: String): Unit = {
  require(image.nonEmpty, "Container image cannot be empty")
  require(!image.contains("latest") || sparkConf.get(ALLOW_LATEST_IMAGE_TAG), 
    "Using 'latest' tag is discouraged in production")
}

// Namespace validation  
def validateNamespace(namespace: String): Unit = {
  require(namespace.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),
    "Namespace must be a valid DNS-1123 label")
  require(namespace.length <= 63, "Namespace must be 63 characters or less")
}

// Resource name validation
def validateResourceName(name: String): Unit = {
  require(name.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),
    "Resource name must be a valid DNS-1123 subdomain")
  require(name.length <= 253, "Resource name must be 253 characters or less")
}

Configuration Usage Patterns

Dynamic Configuration

// Environment-based configuration
val conf = new SparkConf()
  .set(KUBERNETES_NAMESPACE, sys.env.getOrElse("SPARK_NAMESPACE", "default"))
  .set(CONTAINER_IMAGE, sys.env("SPARK_IMAGE"))
  .set(KUBERNETES_SERVICE_ACCOUNT_NAME, sys.env.getOrElse("SERVICE_ACCOUNT", "spark"))

// Conditional configuration
if (conf.get(DYN_ALLOCATION_ENABLED)) {
  conf.set(DYN_ALLOCATION_MIN_EXECUTORS, "2")
  conf.set(DYN_ALLOCATION_MAX_EXECUTORS, "20")
}

Configuration Templates

// Development configuration template
def createDevConfig(): SparkConf = new SparkConf()
  .set(KUBERNETES_NAMESPACE, "spark-dev")
  .set(CONTAINER_IMAGE, "spark:3.0.1-dev")
  .set(KUBERNETES_DRIVER_MEMORY, "1g")
  .set(KUBERNETES_EXECUTOR_INSTANCES, "2")
  .set(KUBERNETES_EXECUTOR_MEMORY, "2g")

// Production configuration template  
def createProdConfig(): SparkConf = new SparkConf()
  .set(KUBERNETES_NAMESPACE, "spark-prod")
  .set(CONTAINER_IMAGE, "my-org/spark:3.0.1-stable")
  .set(KUBERNETES_DRIVER_MEMORY, "4g")
  .set(KUBERNETES_EXECUTOR_INSTANCES, "10")
  .set(KUBERNETES_EXECUTOR_MEMORY, "8g")
  .set(KUBERNETES_DRIVER_LIMIT_CORES, "2")
  .set(KUBERNETES_EXECUTOR_LIMIT_CORES, "4")

Configuration Inheritance

// Base configuration
val baseConf = new SparkConf()
  .set(KUBERNETES_NAMESPACE, "spark")
  .set(CONTAINER_IMAGE, "spark:3.0.1")

// Driver-specific additions
val driverConf = baseConf.clone()
  .set(KUBERNETES_DRIVER_MEMORY, "2g")
  .set(KUBERNETES_DRIVER_SERVICE_TYPE, "LoadBalancer")

// Executor-specific additions  
val executorConf = baseConf.clone()
  .set(KUBERNETES_EXECUTOR_MEMORY, "4g")
  .set(KUBERNETES_EXECUTOR_INSTANCES, "5")

Advanced Configuration Features

Prefix-Based Configuration

// Volume configuration using prefixes
spark.conf.set("spark.kubernetes.volume.data.hostPath.path", "/host/data")
spark.conf.set("spark.kubernetes.volume.logs.persistentVolumeClaim.claimName", "logs-pvc")

// Secret configuration using prefixes
spark.conf.set("spark.kubernetes.driver.secrets.db-secret", "/opt/secrets/db")
spark.conf.set("spark.kubernetes.executor.secrets.api-key", "/opt/secrets/api")

// Environment variable secrets
spark.conf.set("spark.kubernetes.driver.secretKeyRef.DB_PASSWORD", "db-secret:password")
spark.conf.set("spark.kubernetes.executor.secretKeyRef.API_TOKEN", "api-secret:token")

Pod Template Integration

// Pod template file configuration
spark.conf.set("spark.kubernetes.driver.podTemplateFile", "/templates/driver-pod.yaml")
spark.conf.set("spark.kubernetes.executor.podTemplateFile", "/templates/executor-pod.yaml")

// Template merging with configuration
val templatePod = KubernetesUtils.loadPodFromTemplate(
  kubernetesClient, 
  new File("/templates/driver-pod.yaml"),
  Some(Constants.DRIVER_CONTAINER_NAME)
)

// Configuration takes precedence over template
val finalPod = mergePodTemplateWithConfig(templatePod, driverConf)

The configuration management system provides a robust, extensible foundation for customizing every aspect of Kubernetes deployments while maintaining compatibility with Spark's existing configuration patterns.