The configuration management system provides a comprehensive, type-safe approach to configuring Kubernetes deployments for Spark applications, extending Spark's native configuration framework with Kubernetes-specific options and validation.
Base abstract class containing all metadata needed for Kubernetes pod creation and management:
abstract class KubernetesConf(val sparkConf: SparkConf) {
val resourceNamePrefix: String
def labels: Map[String, String]
def environment: Map[String, String]
def annotations: Map[String, String]
def secretEnvNamesToKeyRefs: Map[String, String]
def secretNamesToMountPaths: Map[String, String]
def volumes: Seq[KubernetesVolumeSpec]
def appName: String = get("spark.app.name", "spark")
def namespace: String = get(KUBERNETES_NAMESPACE)
def imagePullPolicy: String = get(CONTAINER_IMAGE_PULL_POLICY)
def nodeSelector: Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
// Utility methods for configuration access
def contains(config: ConfigEntry[_]): Boolean = sparkConf.contains(config)
def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
def get(conf: String): String = sparkConf.get(conf)
def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue)
def getOption(key: String): Option[String] = sparkConf.getOption(key)
}Core Properties:
resourceNamePrefix ensures unique Kubernetes resource nameslabels and annotations for pod identification and configurationDriver-specific configuration extending the base configuration:
class KubernetesDriverConf(
sparkConf: SparkConf,
val appId: String,
val mainAppResource: MainAppResource,
val mainClass: String,
val appArgs: Array[String]
) extends KubernetesConf(sparkConf) {
def serviceAnnotations: Map[String, String]
}Driver-Specific Features:
Creation Pattern:
val driverConf = KubernetesConf.createDriverConf(
sparkConf = conf,
appName = "my-spark-app",
appResourceNamePrefix = s"spark-${UUID.randomUUID().toString.take(8)}",
appId = "app-123456789",
mainAppResource = JavaMainAppResource(Some("local:///opt/spark/jars/my-app.jar")),
mainClass = "com.example.MySparkApp",
appArgs = Array("--input", "/data", "--output", "/results")
)Executor-specific configuration for worker pod creation:
class KubernetesExecutorConf(
sparkConf: SparkConf,
val appId: String,
val executorId: String,
val driverPod: Option[Pod]
) extends KubernetesConf(sparkConf)Executor-Specific Features:
Centralized configuration definitions using Spark's ConfigBuilder pattern:
object Config {
// Container Images
val CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.container.image")
.doc("Container image to use for Spark pods")
.stringConf
.createWithDefault(null)
val DRIVER_CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.driver.container.image")
.doc("Container image to use for driver pod")
.fallbackConf(CONTAINER_IMAGE)
val EXECUTOR_CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.executor.container.image")
.doc("Container image to use for executor pods")
.fallbackConf(CONTAINER_IMAGE)
// Namespace and Authentication
val KUBERNETES_NAMESPACE = ConfigBuilder("spark.kubernetes.namespace")
.doc("Kubernetes namespace for Spark pods")
.stringConf
.createWithDefault("default")
val KUBERNETES_CONTEXT = ConfigBuilder("spark.kubernetes.context")
.doc("Kubernetes context to use")
.stringConf
.createOptional
val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder("spark.kubernetes.authenticate.serviceAccountName")
.doc("Service account for Spark pods")
.stringConf
.createOptional
// Resource Limits
val KUBERNETES_DRIVER_LIMIT_CORES = ConfigBuilder("spark.kubernetes.driver.limit.cores")
.doc("CPU limit for driver pod")
.stringConf
.createOptional
val KUBERNETES_EXECUTOR_LIMIT_CORES = ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("CPU limit for executor pods")
.stringConf
.createOptional
// Dynamic Allocation
val DYN_ALLOCATION_ENABLED = ConfigBuilder("spark.dynamicAllocation.enabled")
.doc("Enable dynamic allocation of executors")
.booleanConf
.createWithDefault(false)
val DYN_ALLOCATION_MIN_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.minExecutors")
.doc("Minimum number of executors")
.intConf
.createWithDefault(0)
val DYN_ALLOCATION_MAX_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.maxExecutors")
.doc("Maximum number of executors")
.intConf
.createWithDefault(Int.MaxValue)
// Networking
val KUBERNETES_DRIVER_POD_NAME = ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod")
.stringConf
.createOptional
val KUBERNETES_DRIVER_SERVICE_TYPE = ConfigBuilder("spark.kubernetes.driver.service.type")
.doc("Service type for driver pod")
.stringConf
.createWithDefault("ClusterIP")
// Volume Configuration
val KUBERNETES_VOLUMES_PREFIX = "spark.kubernetes.volume."
val KUBERNETES_DRIVER_VOLUMES_PREFIX = "spark.kubernetes.driver.volume."
val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volume."
// Pod Templates
val KUBERNETES_DRIVER_PODTEMPLATE_FILE = ConfigBuilder("spark.kubernetes.driver.podTemplateFile")
.doc("Path to driver pod template file")
.stringConf
.createOptional
val KUBERNETES_EXECUTOR_PODTEMPLATE_FILE = ConfigBuilder("spark.kubernetes.executor.podTemplateFile")
.doc("Path to executor pod template file")
.stringConf
.createOptional
}// Service Account Configuration
val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder("spark.kubernetes.authenticate.serviceAccountName")
// OAuth Token Authentication
val OAUTH_TOKEN = ConfigBuilder("spark.kubernetes.authenticate.oauthToken")
val OAUTH_TOKEN_FILE = ConfigBuilder("spark.kubernetes.authenticate.oauthTokenFile")
// Certificate Authentication
val CLIENT_KEY_FILE = ConfigBuilder("spark.kubernetes.authenticate.clientKeyFile")
val CLIENT_CERT_FILE = ConfigBuilder("spark.kubernetes.authenticate.clientCertFile")
val CA_CERT_FILE = ConfigBuilder("spark.kubernetes.authenticate.caCertFile")// CPU Configuration
val KUBERNETES_DRIVER_REQUEST_CORES = ConfigBuilder("spark.kubernetes.driver.request.cores")
val KUBERNETES_EXECUTOR_REQUEST_CORES = ConfigBuilder("spark.kubernetes.executor.request.cores")
// Memory Configuration
val KUBERNETES_DRIVER_MEMORY = ConfigBuilder("spark.kubernetes.driver.memory")
val KUBERNETES_EXECUTOR_MEMORY = ConfigBuilder("spark.kubernetes.executor.memory")
val MEMORY_OVERHEAD_FACTOR = ConfigBuilder("spark.kubernetes.memoryOverheadFactor")
// Instance Configuration
val KUBERNETES_EXECUTOR_INSTANCES = ConfigBuilder("spark.kubernetes.executor.instances")// Local Directory Configuration
val KUBERNETES_LOCAL_DIRS_TMPFS = ConfigBuilder("spark.kubernetes.local.dirs.tmpfs")
// Volume Mount Prefixes
val KUBERNETES_VOLUMES_HOSTPATH_PREFIX = "spark.kubernetes.volume.hostPath"
val KUBERNETES_VOLUMES_PVC_PREFIX = "spark.kubernetes.volume.persistentVolumeClaim"
val KUBERNETES_VOLUMES_EMPTYDIR_PREFIX = "spark.kubernetes.volume.emptyDir"Kubernetes-specific constants and default values:
object Constants {
// Labels
val SPARK_APP_ID_LABEL = "spark-app-selector"
val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
val SPARK_ROLE_LABEL = "spark-role"
val SPARK_VERSION_LABEL = "spark-version"
// Label Values
val SPARK_APP_NAME_LABEL = "spark-app-name"
val DRIVER_ROLE = "driver"
val EXECUTOR_ROLE = "executor"
// Ports
val DEFAULT_DRIVER_PORT = 7077
val DEFAULT_BLOCKMANAGER_PORT = 7078
val DEFAULT_UI_PORT = 4040
// Environment Variables
val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
// Paths
val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"
val SPARK_CONF_FILE_NAME = "spark.conf"
val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf"
val KERBEROS_CONF_DIR_PATH = "/opt/kerberos"
// Resource Types
val APP_RESOURCE_TYPE_JAVA = "java"
val APP_RESOURCE_TYPE_PYTHON = "python"
val APP_RESOURCE_TYPE_R = "r"
// Container Names
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
// Annotations
val CREATED_BY_ANNOTATION = "created-by"
val SPARK_APP_NAME_ANNOTATION = "spark-app-name"
}// Image Pull Policies
object ImagePullPolicy extends Enumeration {
val Always, Never, IfNotPresent = Value
}
// Service Types
object ServiceType extends Enumeration {
val ClusterIP, NodePort, LoadBalancer = Value
}
// Restart Policies
object RestartPolicy extends Enumeration {
val Always, OnFailure, Never = Value
}Comprehensive volume specification supporting multiple Kubernetes volume types:
case class KubernetesVolumeSpec(
volumeName: String,
mountPath: String,
mountSubPath: String = "",
mountReadOnly: Boolean = false,
volumeConf: KubernetesVolumeSpecificConf
)// Host Path Volumes
case class KubernetesHostPathVolumeConf(
hostPath: String
) extends KubernetesVolumeSpecificConf
// Persistent Volume Claims
case class KubernetesPVCVolumeConf(
claimName: String
) extends KubernetesVolumeSpecificConf
// Empty Directory Volumes
case class KubernetesEmptyDirVolumeConf(
medium: Option[String] = None,
sizeLimit: Option[String] = None
) extends KubernetesVolumeSpecificConf
// NFS Volumes
case class KubernetesNFSVolumeConf(
server: String,
path: String
) extends KubernetesVolumeSpecificConf// Host path volume
spark.conf.set("spark.kubernetes.volume.data.hostPath.path", "/host/data")
spark.conf.set("spark.kubernetes.volume.data.mount.path", "/data")
spark.conf.set("spark.kubernetes.volume.data.mount.readOnly", "true")
// PVC volume
spark.conf.set("spark.kubernetes.volume.storage.persistentVolumeClaim.claimName", "spark-pvc")
spark.conf.set("spark.kubernetes.volume.storage.mount.path", "/storage")
// Empty dir with size limit
spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.medium", "Memory")
spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.sizeLimit", "1Gi")
spark.conf.set("spark.kubernetes.volume.tmp.mount.path", "/tmp")object ConfigValidation {
def validateDriverConfig(conf: KubernetesDriverConf): Unit = {
validateBaseConfig(conf)
validateDriverSpecific(conf)
}
def validateExecutorConfig(conf: KubernetesExecutorConf): Unit = {
validateBaseConfig(conf)
validateExecutorSpecific(conf)
}
private def validateBaseConfig(conf: KubernetesConf): Unit = {
// Required configurations
require(conf.get(CONTAINER_IMAGE).nonEmpty,
"Container image must be specified with spark.kubernetes.container.image")
require(conf.namespace.nonEmpty,
"Kubernetes namespace must be specified")
// Resource validation
validateResourceLimits(conf)
validateVolumeConfiguration(conf)
validateNetworkConfiguration(conf)
}
private def validateResourceLimits(conf: KubernetesConf): Unit = {
// CPU validation
conf.get(KUBERNETES_DRIVER_LIMIT_CORES).foreach { cores =>
require(cores.toDouble > 0, "Driver CPU limit must be positive")
}
// Memory validation
conf.get(KUBERNETES_DRIVER_MEMORY).foreach { memory =>
require(Utils.byteStringAsBytes(memory) > 0, "Driver memory must be positive")
}
}
private def validateVolumeConfiguration(conf: KubernetesConf): Unit = {
conf.volumes.foreach { volume =>
require(volume.volumeName.nonEmpty, "Volume name cannot be empty")
require(volume.mountPath.nonEmpty, "Mount path cannot be empty")
require(volume.mountPath.startsWith("/"), "Mount path must be absolute")
}
}
}// Image name validation
def validateImageName(image: String): Unit = {
require(image.nonEmpty, "Container image cannot be empty")
require(!image.contains("latest") || sparkConf.get(ALLOW_LATEST_IMAGE_TAG),
"Using 'latest' tag is discouraged in production")
}
// Namespace validation
def validateNamespace(namespace: String): Unit = {
require(namespace.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),
"Namespace must be a valid DNS-1123 label")
require(namespace.length <= 63, "Namespace must be 63 characters or less")
}
// Resource name validation
def validateResourceName(name: String): Unit = {
require(name.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),
"Resource name must be a valid DNS-1123 subdomain")
require(name.length <= 253, "Resource name must be 253 characters or less")
}// Environment-based configuration
val conf = new SparkConf()
.set(KUBERNETES_NAMESPACE, sys.env.getOrElse("SPARK_NAMESPACE", "default"))
.set(CONTAINER_IMAGE, sys.env("SPARK_IMAGE"))
.set(KUBERNETES_SERVICE_ACCOUNT_NAME, sys.env.getOrElse("SERVICE_ACCOUNT", "spark"))
// Conditional configuration
if (conf.get(DYN_ALLOCATION_ENABLED)) {
conf.set(DYN_ALLOCATION_MIN_EXECUTORS, "2")
conf.set(DYN_ALLOCATION_MAX_EXECUTORS, "20")
}// Development configuration template
def createDevConfig(): SparkConf = new SparkConf()
.set(KUBERNETES_NAMESPACE, "spark-dev")
.set(CONTAINER_IMAGE, "spark:3.0.1-dev")
.set(KUBERNETES_DRIVER_MEMORY, "1g")
.set(KUBERNETES_EXECUTOR_INSTANCES, "2")
.set(KUBERNETES_EXECUTOR_MEMORY, "2g")
// Production configuration template
def createProdConfig(): SparkConf = new SparkConf()
.set(KUBERNETES_NAMESPACE, "spark-prod")
.set(CONTAINER_IMAGE, "my-org/spark:3.0.1-stable")
.set(KUBERNETES_DRIVER_MEMORY, "4g")
.set(KUBERNETES_EXECUTOR_INSTANCES, "10")
.set(KUBERNETES_EXECUTOR_MEMORY, "8g")
.set(KUBERNETES_DRIVER_LIMIT_CORES, "2")
.set(KUBERNETES_EXECUTOR_LIMIT_CORES, "4")// Base configuration
val baseConf = new SparkConf()
.set(KUBERNETES_NAMESPACE, "spark")
.set(CONTAINER_IMAGE, "spark:3.0.1")
// Driver-specific additions
val driverConf = baseConf.clone()
.set(KUBERNETES_DRIVER_MEMORY, "2g")
.set(KUBERNETES_DRIVER_SERVICE_TYPE, "LoadBalancer")
// Executor-specific additions
val executorConf = baseConf.clone()
.set(KUBERNETES_EXECUTOR_MEMORY, "4g")
.set(KUBERNETES_EXECUTOR_INSTANCES, "5")// Volume configuration using prefixes
spark.conf.set("spark.kubernetes.volume.data.hostPath.path", "/host/data")
spark.conf.set("spark.kubernetes.volume.logs.persistentVolumeClaim.claimName", "logs-pvc")
// Secret configuration using prefixes
spark.conf.set("spark.kubernetes.driver.secrets.db-secret", "/opt/secrets/db")
spark.conf.set("spark.kubernetes.executor.secrets.api-key", "/opt/secrets/api")
// Environment variable secrets
spark.conf.set("spark.kubernetes.driver.secretKeyRef.DB_PASSWORD", "db-secret:password")
spark.conf.set("spark.kubernetes.executor.secretKeyRef.API_TOKEN", "api-secret:token")// Pod template file configuration
spark.conf.set("spark.kubernetes.driver.podTemplateFile", "/templates/driver-pod.yaml")
spark.conf.set("spark.kubernetes.executor.podTemplateFile", "/templates/executor-pod.yaml")
// Template merging with configuration
val templatePod = KubernetesUtils.loadPodFromTemplate(
kubernetesClient,
new File("/templates/driver-pod.yaml"),
Some(Constants.DRIVER_CONTAINER_NAME)
)
// Configuration takes precedence over template
val finalPod = mergePodTemplateWithConfig(templatePod, driverConf)The configuration management system provides a robust, extensible foundation for customizing every aspect of Kubernetes deployments while maintaining compatibility with Spark's existing configuration patterns.