The feature steps system provides a modular, extensible architecture for configuring Kubernetes pods through composable configuration steps. Each feature step handles a specific aspect of pod configuration, enabling flexible customization of both driver and executor pods.
Base interface for all feature configuration steps:
trait KubernetesFeatureConfigStep {
def configurePod(pod: SparkPod): SparkPod
def getAdditionalPodSystemProperties(): Map[String, String]
def getAdditionalKubernetesResources(): Seq[HasMetadata]
}Key Responsibilities:
Usage Pattern:
// Feature steps are applied in sequence during pod building
val featureSteps: Seq[KubernetesFeatureConfigStep] = createFeatureSteps(conf)
val finalSpec = featureSteps.foldLeft(initialPod) { (pod, step) =>
step.configurePod(pod)
}
val allSystemProperties = featureSteps.flatMap(_.getAdditionalPodSystemProperties()).toMap
val additionalResources = featureSteps.flatMap(_.getAdditionalKubernetesResources())Core driver pod configuration including essential container setup:
class BasicDriverFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod
override def getAdditionalPodSystemProperties(): Map[String, String]
override def getAdditionalKubernetesResources(): Seq[HasMetadata]
}Configuration Applied:
Implementation Example:
override def configurePod(pod: SparkPod): SparkPod = {
val driverContainer = new ContainerBuilder(pod.container)
.withName(Constants.DRIVER_CONTAINER_NAME)
.withImage(conf.get(CONTAINER_IMAGE))
.withImagePullPolicy(conf.imagePullPolicy)
.addNewPort()
.withName("driver-rpc-port")
.withContainerPort(DEFAULT_DRIVER_PORT)
.withProtocol("TCP")
.endPort()
.addToEnv(ENV_DRIVER_BIND_ADDRESS, "0.0.0.0")
.addToEnv(ENV_APPLICATION_ID, conf.appId)
.withResources(buildResourceRequirements(conf))
.build()
val driverPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(conf.resourceNamePrefix + "-driver")
.addToLabels(SPARK_APP_ID_LABEL, conf.appId)
.addToLabels(SPARK_ROLE_LABEL, DRIVER_ROLE)
.endMetadata()
.editOrNewSpec()
.withRestartPolicy("Never")
.withServiceAccount(conf.get(KUBERNETES_SERVICE_ACCOUNT_NAME).orNull)
.endSpec()
.build()
SparkPod(driverPod, driverContainer)
}Creates Kubernetes service for driver pod to enable executor communication:
class DriverServiceFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod
override def getAdditionalKubernetesResources(): Seq[HasMetadata]
}Service Configuration:
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
val service = new ServiceBuilder()
.withNewMetadata()
.withName(conf.resourceNamePrefix + "-driver-svc")
.withNamespace(conf.namespace)
.addToLabels(SPARK_APP_ID_LABEL, conf.appId)
.addToAnnotations(conf.serviceAnnotations.asJava)
.endMetadata()
.withNewSpec()
.withType(conf.get(KUBERNETES_DRIVER_SERVICE_TYPE))
.addToSelector(SPARK_APP_ID_LABEL, conf.appId)
.addToSelector(SPARK_ROLE_LABEL, DRIVER_ROLE)
.addNewPort()
.withName("driver-rpc-port")
.withPort(DEFAULT_DRIVER_PORT)
.withTargetPort(new IntOrString(DEFAULT_DRIVER_PORT))
.endPort()
.addNewPort()
.withName("blockmanager")
.withPort(DEFAULT_BLOCKMANAGER_PORT)
.withTargetPort(new IntOrString(DEFAULT_BLOCKMANAGER_PORT))
.endPort()
.endSpec()
.build()
Seq(service)
}Configures the driver container command and arguments for application execution:
class DriverCommandFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod
}Command Configuration:
override def configurePod(pod: SparkPod): SparkPod = {
val driverContainer = new ContainerBuilder(pod.container)
.withCommand("/opt/spark/bin/spark-submit")
.withArgs(buildDriverArgs(conf): _*)
.build()
pod.copy(container = driverContainer)
}
private def buildDriverArgs(conf: KubernetesDriverConf): Seq[String] = {
val baseArgs = Seq(
"--class", conf.mainClass,
"--deploy-mode", "client"
)
val resourceArgs = conf.mainAppResource match {
case JavaMainAppResource(Some(resource)) => Seq(resource)
case PythonMainAppResource(resource) => Seq(resource)
case RMainAppResource(resource) => Seq(resource)
case _ => Seq.empty
}
val appArgs = conf.appArgs
baseArgs ++ resourceArgs ++ appArgs
}Configures Kubernetes API credentials for driver pod operations:
class DriverKubernetesCredentialsFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod
override def getAdditionalPodSystemProperties(): Map[String, String]
}Credential Configuration:
override def configurePod(pod: SparkPod): SparkPod = {
val credentialVolumes = buildCredentialVolumes(conf)
val credentialVolumeMounts = buildCredentialVolumeMounts(conf)
val driverContainer = new ContainerBuilder(pod.container)
.addAllToVolumeMounts(credentialVolumeMounts.asJava)
.build()
val driverPod = new PodBuilder(pod.pod)
.editSpec()
.addAllToVolumes(credentialVolumes.asJava)
.endSpec()
.build()
SparkPod(driverPod, driverContainer)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = {
Map(
"spark.kubernetes.executor.podNamePrefix" -> conf.resourceNamePrefix,
"spark.kubernetes.executor.namespace" -> conf.namespace,
"spark.kubernetes.executor.container.image" -> conf.get(EXECUTOR_CONTAINER_IMAGE)
)
}Core executor pod configuration for Spark worker functionality:
class BasicExecutorFeatureStep(
conf: KubernetesExecutorConf,
secMgr: SecurityManager,
clock: Clock
) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod
}Executor Configuration:
override def configurePod(pod: SparkPod): SparkPod = {
val executorContainer = new ContainerBuilder(pod.container)
.withName(Constants.EXECUTOR_CONTAINER_NAME)
.withImage(conf.get(EXECUTOR_CONTAINER_IMAGE))
.withImagePullPolicy(conf.imagePullPolicy)
.addNewPort()
.withName("blockmanager")
.withContainerPort(DEFAULT_BLOCKMANAGER_PORT)
.withProtocol("TCP")
.endPort()
.addToEnv(ENV_DRIVER_URL, buildDriverUrl(conf))
.addToEnv(ENV_EXECUTOR_CORES, conf.get(KUBERNETES_EXECUTOR_CORES))
.addToEnv(ENV_EXECUTOR_MEMORY, conf.get(KUBERNETES_EXECUTOR_MEMORY))
.addToEnv(ENV_EXECUTOR_ID, conf.executorId)
.withCommand("/opt/spark/bin/spark-class")
.withArgs("org.apache.spark.executor.CoarseGrainedExecutorBackend")
.withResources(buildExecutorResourceRequirements(conf))
.build()
val executorPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(s"${conf.resourceNamePrefix}-exec-${conf.executorId}")
.addToLabels(SPARK_APP_ID_LABEL, conf.appId)
.addToLabels(SPARK_ROLE_LABEL, EXECUTOR_ROLE)
.addToLabels(SPARK_EXECUTOR_ID_LABEL, conf.executorId)
.endMetadata()
.build()
SparkPod(executorPod, executorContainer)
}Configures Kubernetes credentials for executor pod operations:
class ExecutorKubernetesCredentialsFeatureStep(conf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod
}Mounts Kubernetes secrets as volumes in pods:
class MountSecretsFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod
}Secret Mounting:
override def configurePod(pod: SparkPod): SparkPod = {
if (conf.secretNamesToMountPaths.nonEmpty) {
val secretVolumes = conf.secretNamesToMountPaths.map { case (secretName, mountPath) =>
new VolumeBuilder()
.withName(s"$secretName-volume")
.withNewSecret()
.withSecretName(secretName)
.withDefaultMode(420) // 0644 octal
.endSecret()
.build()
}.toSeq
val secretVolumeMounts = conf.secretNamesToMountPaths.map { case (secretName, mountPath) =>
new VolumeMountBuilder()
.withName(s"$secretName-volume")
.withMountPath(mountPath)
.withReadOnly(true)
.build()
}.toSeq
val updatedContainer = new ContainerBuilder(pod.container)
.addAllToVolumeMounts(secretVolumeMounts.asJava)
.build()
val updatedPod = new PodBuilder(pod.pod)
.editSpec()
.addAllToVolumes(secretVolumes.asJava)
.endSpec()
.build()
SparkPod(updatedPod, updatedContainer)
} else {
pod
}
}Injects secret values as environment variables using Kubernetes secret references:
class EnvSecretsFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod
}Environment Variable Secrets:
override def configurePod(pod: SparkPod): SparkPod = {
if (conf.secretEnvNamesToKeyRefs.nonEmpty) {
val secretEnvVars = conf.secretEnvNamesToKeyRefs.map { case (envName, secretKeyRef) =>
val Array(secretName, key) = secretKeyRef.split(":", 2)
new EnvVarBuilder()
.withName(envName)
.withNewValueFrom()
.withNewSecretKeyRef()
.withName(secretName)
.withKey(key)
.endSecretKeyRef()
.endValueFrom()
.build()
}.toSeq
val updatedContainer = new ContainerBuilder(pod.container)
.addAllToEnv(secretEnvVars.asJava)
.build()
pod.copy(container = updatedContainer)
} else {
pod
}
}Mounts user-specified volumes in pods:
class MountVolumesFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod
}Volume Mounting Implementation:
override def configurePod(pod: SparkPod): SparkPod = {
if (conf.volumes.nonEmpty) {
val volumes = conf.volumes.map(createVolumeSpec)
val volumeMounts = conf.volumes.map(createVolumeMountSpec)
val updatedContainer = new ContainerBuilder(pod.container)
.addAllToVolumeMounts(volumeMounts.asJava)
.build()
val updatedPod = new PodBuilder(pod.pod)
.editSpec()
.addAllToVolumes(volumes.asJava)
.endSpec()
.build()
SparkPod(updatedPod, updatedContainer)
} else {
pod
}
}
private def createVolumeSpec(volumeSpec: KubernetesVolumeSpec): Volume = {
val volumeBuilder = new VolumeBuilder()
.withName(volumeSpec.volumeName)
volumeSpec.volumeConf match {
case KubernetesHostPathVolumeConf(hostPath) =>
volumeBuilder.withNewHostPath()
.withPath(hostPath)
.withType("Directory")
.endHostPath()
case KubernetesPVCVolumeConf(claimName) =>
volumeBuilder.withNewPersistentVolumeClaim()
.withClaimName(claimName)
.withReadOnly(volumeSpec.mountReadOnly)
.endPersistentVolumeClaim()
case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
volumeBuilder.withNewEmptyDir()
.withMedium(medium.orNull)
.withSizeLimit(sizeLimit.map(new Quantity(_)).orNull)
.endEmptyDir()
}
volumeBuilder.build()
}Configures local directories for Spark scratch space using emptyDir volumes:
class LocalDirsFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod
override def getAdditionalPodSystemProperties(): Map[String, String]
}Local Directory Configuration:
override def configurePod(pod: SparkPod): SparkPod = {
val localDirVolumes = createLocalDirVolumes(conf)
val localDirVolumeMounts = createLocalDirVolumeMounts(conf)
val updatedContainer = new ContainerBuilder(pod.container)
.addAllToVolumeMounts(localDirVolumeMounts.asJava)
.addToEnv("SPARK_LOCAL_DIRS", localDirPaths.mkString(","))
.build()
val updatedPod = new PodBuilder(pod.pod)
.editSpec()
.addAllToVolumes(localDirVolumes.asJava)
.endSpec()
.build()
SparkPod(updatedPod, updatedContainer)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = {
Map("spark.local.dir" -> localDirPaths.mkString(","))
}
private def createLocalDirVolumes(conf: KubernetesConf): Seq[Volume] = {
val useTemporaryFileSystem = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)
localDirNames.map { dirName =>
new VolumeBuilder()
.withName(s"spark-local-dir-$dirName")
.withNewEmptyDir()
.withMedium(if (useTemporaryFileSystem) "Memory" else null)
.endEmptyDir()
.build()
}
}Configures Hadoop configuration for driver pod:
class HadoopConfDriverFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod
override def getAdditionalPodSystemProperties(): Map[String, String]
}Configures Kerberos authentication for driver pod:
class KerberosConfDriverFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod
}Handles pod template configuration via Kubernetes ConfigMaps:
class PodTemplateConfigMapStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
override def getAdditionalKubernetesResources(): Seq[HasMetadata]
}Feature steps are integrated into the builder pattern for both driver and executor pods:
// Driver feature step composition
class KubernetesDriverBuilder {
def buildFromFeatures(
conf: KubernetesDriverConf,
client: KubernetesClient
): KubernetesDriverSpec = {
val featureSteps = Seq(
new BasicDriverFeatureStep(conf),
new DriverServiceFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf)
) ++ createOptionalSteps(conf)
val initialPod = SparkPod.initialPod()
val configuredPod = applyFeatureSteps(initialPod, featureSteps)
val systemProperties = collectSystemProperties(featureSteps)
val additionalResources = collectAdditionalResources(featureSteps)
KubernetesDriverSpec(configuredPod, additionalResources, systemProperties)
}
private def createOptionalSteps(conf: KubernetesDriverConf): Seq[KubernetesFeatureConfigStep] = {
val steps = mutable.ArrayBuffer[KubernetesFeatureConfigStep]()
if (conf.secretNamesToMountPaths.nonEmpty) {
steps += new MountSecretsFeatureStep(conf)
}
if (conf.secretEnvNamesToKeyRefs.nonEmpty) {
steps += new EnvSecretsFeatureStep(conf)
}
if (conf.volumes.nonEmpty) {
steps += new MountVolumesFeatureStep(conf)
}
if (conf.get(HADOOP_CONF_CONFIGMAP_NAME).isDefined) {
steps += new HadoopConfDriverFeatureStep(conf)
}
steps.toSeq
}
}// Executor feature step composition
class KubernetesExecutorBuilder {
def buildFromFeatures(
conf: KubernetesExecutorConf,
secMgr: SecurityManager,
client: KubernetesClient
): SparkPod = {
val featureSteps = Seq(
new BasicExecutorFeatureStep(conf, secMgr, Clock.systemUTC()),
new ExecutorKubernetesCredentialsFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new LocalDirsFeatureStep(conf)
)
val initialPod = SparkPod.initialPod()
applyFeatureSteps(initialPod, featureSteps)
}
}// Feature steps can be conditionally applied based on configuration
class ConditionalFeatureStep(
condition: KubernetesConf => Boolean,
step: KubernetesFeatureConfigStep
) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod = {
if (condition(conf)) step.configurePod(pod) else pod
}
override def getAdditionalPodSystemProperties(): Map[String, String] = {
if (condition(conf)) step.getAdditionalPodSystemProperties() else Map.empty
}
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
if (condition(conf)) step.getAdditionalKubernetesResources() else Seq.empty
}
}
// Usage
val conditionalStep = new ConditionalFeatureStep(
conf => conf.get(ENABLE_MONITORING).getOrElse(false),
new MonitoringFeatureStep(conf)
)// Combine multiple related feature steps
class CompositeFeatureStep(steps: KubernetesFeatureConfigStep*) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod = {
steps.foldLeft(pod) { (currentPod, step) =>
step.configurePod(currentPod)
}
}
override def getAdditionalPodSystemProperties(): Map[String, String] = {
steps.flatMap(_.getAdditionalPodSystemProperties()).toMap
}
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
steps.flatMap(_.getAdditionalKubernetesResources())
}
}
// Security-related feature steps
val securitySteps = new CompositeFeatureStep(
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf)
)// Example: Custom monitoring feature step
class MonitoringFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod = {
val monitoringContainer = new ContainerBuilder(pod.container)
.addToEnv("PROMETHEUS_ENABLED", "true")
.addNewPort()
.withName("metrics")
.withContainerPort(8080)
.withProtocol("TCP")
.endPort()
.build()
val monitoringPod = new PodBuilder(pod.pod)
.editMetadata()
.addToAnnotations("prometheus.io/scrape", "true")
.addToAnnotations("prometheus.io/port", "8080")
.addToAnnotations("prometheus.io/path", "/metrics")
.endMetadata()
.build()
SparkPod(monitoringPod, monitoringContainer)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = {
Map(
"spark.sql.streaming.metricsEnabled" -> "true",
"spark.metrics.conf.driver.sink.prometheus.class" -> "org.apache.spark.metrics.sink.PrometheusSink"
)
}
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
val monitoringService = new ServiceBuilder()
.withNewMetadata()
.withName(s"${conf.resourceNamePrefix}-metrics")
.withNamespace(conf.namespace)
.endMetadata()
.withNewSpec()
.addToSelector(SPARK_APP_ID_LABEL, conf.appId)
.addNewPort()
.withName("metrics")
.withPort(8080)
.withTargetPort(new IntOrString(8080))
.endPort()
.endSpec()
.build()
Seq(monitoringService)
}
}The feature steps system provides a powerful, modular approach to pod configuration that enables extensive customization while maintaining clean separation of concerns and reusability across different deployment scenarios.