or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-submission.mdcluster-management.mdconfiguration.mdfeature-steps.mdindex.mdpod-management.mdutilities.md
tile.json

feature-steps.mddocs/

Feature Steps System

The feature steps system provides a modular, extensible architecture for configuring Kubernetes pods through composable configuration steps. Each feature step handles a specific aspect of pod configuration, enabling flexible customization of both driver and executor pods.

Core Architecture

KubernetesFeatureConfigStep { .api }

Base interface for all feature configuration steps:

trait KubernetesFeatureConfigStep {
  def configurePod(pod: SparkPod): SparkPod
  def getAdditionalPodSystemProperties(): Map[String, String]
  def getAdditionalKubernetesResources(): Seq[HasMetadata]
}

Key Responsibilities:

  • Pod Configuration: Apply specific modifications to pod specifications
  • System Properties: Contribute JVM system properties for Spark components
  • Additional Resources: Create supporting Kubernetes resources (Services, ConfigMaps, etc.)

Usage Pattern:

// Feature steps are applied in sequence during pod building
val featureSteps: Seq[KubernetesFeatureConfigStep] = createFeatureSteps(conf)

val finalSpec = featureSteps.foldLeft(initialPod) { (pod, step) =>
  step.configurePod(pod)
}

val allSystemProperties = featureSteps.flatMap(_.getAdditionalPodSystemProperties()).toMap
val additionalResources = featureSteps.flatMap(_.getAdditionalKubernetesResources())

Driver Feature Steps

BasicDriverFeatureStep { .api }

Core driver pod configuration including essential container setup:

class BasicDriverFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
  override def configurePod(pod: SparkPod): SparkPod
  override def getAdditionalPodSystemProperties(): Map[String, String]
  override def getAdditionalKubernetesResources(): Seq[HasMetadata]
}

Configuration Applied:

  • Container Image: Sets driver container image from configuration
  • Resource Limits: Applies CPU and memory limits and requests
  • Basic Labels: Adds Spark application and role labels
  • Environment Variables: Sets essential Spark environment variables
  • Port Configuration: Configures driver ports for communication

Implementation Example:

override def configurePod(pod: SparkPod): SparkPod = {
  val driverContainer = new ContainerBuilder(pod.container)
    .withName(Constants.DRIVER_CONTAINER_NAME)
    .withImage(conf.get(CONTAINER_IMAGE))
    .withImagePullPolicy(conf.imagePullPolicy)
    .addNewPort()
      .withName("driver-rpc-port")
      .withContainerPort(DEFAULT_DRIVER_PORT)
      .withProtocol("TCP")
    .endPort()
    .addToEnv(ENV_DRIVER_BIND_ADDRESS, "0.0.0.0")
    .addToEnv(ENV_APPLICATION_ID, conf.appId)
    .withResources(buildResourceRequirements(conf))
    .build()

  val driverPod = new PodBuilder(pod.pod)
    .editOrNewMetadata()
      .withName(conf.resourceNamePrefix + "-driver")
      .addToLabels(SPARK_APP_ID_LABEL, conf.appId)
      .addToLabels(SPARK_ROLE_LABEL, DRIVER_ROLE)
    .endMetadata()
    .editOrNewSpec()
      .withRestartPolicy("Never")
      .withServiceAccount(conf.get(KUBERNETES_SERVICE_ACCOUNT_NAME).orNull)
    .endSpec()
    .build()

  SparkPod(driverPod, driverContainer)
}

DriverServiceFeatureStep { .api }

Creates Kubernetes service for driver pod to enable executor communication:

class DriverServiceFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
  override def configurePod(pod: SparkPod): SparkPod
  override def getAdditionalKubernetesResources(): Seq[HasMetadata]
}

Service Configuration:

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
  val service = new ServiceBuilder()
    .withNewMetadata()
      .withName(conf.resourceNamePrefix + "-driver-svc")
      .withNamespace(conf.namespace)
      .addToLabels(SPARK_APP_ID_LABEL, conf.appId)
      .addToAnnotations(conf.serviceAnnotations.asJava)
    .endMetadata()
    .withNewSpec()
      .withType(conf.get(KUBERNETES_DRIVER_SERVICE_TYPE))
      .addToSelector(SPARK_APP_ID_LABEL, conf.appId)
      .addToSelector(SPARK_ROLE_LABEL, DRIVER_ROLE)
      .addNewPort()
        .withName("driver-rpc-port")
        .withPort(DEFAULT_DRIVER_PORT)
        .withTargetPort(new IntOrString(DEFAULT_DRIVER_PORT))
      .endPort()
      .addNewPort()
        .withName("blockmanager")
        .withPort(DEFAULT_BLOCKMANAGER_PORT)
        .withTargetPort(new IntOrString(DEFAULT_BLOCKMANAGER_PORT))
      .endPort()
    .endSpec()
    .build()

  Seq(service)
}

DriverCommandFeatureStep { .api }

Configures the driver container command and arguments for application execution:

class DriverCommandFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
  override def configurePod(pod: SparkPod): SparkPod
}

Command Configuration:

override def configurePod(pod: SparkPod): SparkPod = {
  val driverContainer = new ContainerBuilder(pod.container)
    .withCommand("/opt/spark/bin/spark-submit")
    .withArgs(buildDriverArgs(conf): _*)
    .build()

  pod.copy(container = driverContainer)
}

private def buildDriverArgs(conf: KubernetesDriverConf): Seq[String] = {
  val baseArgs = Seq(
    "--class", conf.mainClass,
    "--deploy-mode", "client"
  )

  val resourceArgs = conf.mainAppResource match {
    case JavaMainAppResource(Some(resource)) => Seq(resource)
    case PythonMainAppResource(resource) => Seq(resource)
    case RMainAppResource(resource) => Seq(resource)
    case _ => Seq.empty
  }

  val appArgs = conf.appArgs

  baseArgs ++ resourceArgs ++ appArgs
}

DriverKubernetesCredentialsFeatureStep { .api }

Configures Kubernetes API credentials for driver pod operations:

class DriverKubernetesCredentialsFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
  override def configurePod(pod: SparkPod): SparkPod
  override def getAdditionalPodSystemProperties(): Map[String, String]
}

Credential Configuration:

override def configurePod(pod: SparkPod): SparkPod = {
  val credentialVolumes = buildCredentialVolumes(conf)
  val credentialVolumeMounts = buildCredentialVolumeMounts(conf)

  val driverContainer = new ContainerBuilder(pod.container)
    .addAllToVolumeMounts(credentialVolumeMounts.asJava)
    .build()

  val driverPod = new PodBuilder(pod.pod)
    .editSpec()
      .addAllToVolumes(credentialVolumes.asJava)
    .endSpec()
    .build()

  SparkPod(driverPod, driverContainer)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = {
  Map(
    "spark.kubernetes.executor.podNamePrefix" -> conf.resourceNamePrefix,
    "spark.kubernetes.executor.namespace" -> conf.namespace,
    "spark.kubernetes.executor.container.image" -> conf.get(EXECUTOR_CONTAINER_IMAGE)
  )
}

Executor Feature Steps

BasicExecutorFeatureStep { .api }

Core executor pod configuration for Spark worker functionality:

class BasicExecutorFeatureStep(
  conf: KubernetesExecutorConf,
  secMgr: SecurityManager,
  clock: Clock
) extends KubernetesFeatureConfigStep {
  
  override def configurePod(pod: SparkPod): SparkPod
}

Executor Configuration:

override def configurePod(pod: SparkPod): SparkPod = {
  val executorContainer = new ContainerBuilder(pod.container)
    .withName(Constants.EXECUTOR_CONTAINER_NAME)
    .withImage(conf.get(EXECUTOR_CONTAINER_IMAGE))
    .withImagePullPolicy(conf.imagePullPolicy)
    .addNewPort()
      .withName("blockmanager")
      .withContainerPort(DEFAULT_BLOCKMANAGER_PORT)
      .withProtocol("TCP")
    .endPort()
    .addToEnv(ENV_DRIVER_URL, buildDriverUrl(conf))
    .addToEnv(ENV_EXECUTOR_CORES, conf.get(KUBERNETES_EXECUTOR_CORES))
    .addToEnv(ENV_EXECUTOR_MEMORY, conf.get(KUBERNETES_EXECUTOR_MEMORY))
    .addToEnv(ENV_EXECUTOR_ID, conf.executorId)
    .withCommand("/opt/spark/bin/spark-class")
    .withArgs("org.apache.spark.executor.CoarseGrainedExecutorBackend")
    .withResources(buildExecutorResourceRequirements(conf))
    .build()

  val executorPod = new PodBuilder(pod.pod)
    .editOrNewMetadata()
      .withName(s"${conf.resourceNamePrefix}-exec-${conf.executorId}")
      .addToLabels(SPARK_APP_ID_LABEL, conf.appId)
      .addToLabels(SPARK_ROLE_LABEL, EXECUTOR_ROLE)
      .addToLabels(SPARK_EXECUTOR_ID_LABEL, conf.executorId)
    .endMetadata()
    .build()

  SparkPod(executorPod, executorContainer)
}

ExecutorKubernetesCredentialsFeatureStep { .api }

Configures Kubernetes credentials for executor pod operations:

class ExecutorKubernetesCredentialsFeatureStep(conf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep {
  override def configurePod(pod: SparkPod): SparkPod
}

Shared Feature Steps

MountSecretsFeatureStep { .api }

Mounts Kubernetes secrets as volumes in pods:

class MountSecretsFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
  override def configurePod(pod: SparkPod): SparkPod
}

Secret Mounting:

override def configurePod(pod: SparkPod): SparkPod = {
  if (conf.secretNamesToMountPaths.nonEmpty) {
    val secretVolumes = conf.secretNamesToMountPaths.map { case (secretName, mountPath) =>
      new VolumeBuilder()
        .withName(s"$secretName-volume")
        .withNewSecret()
          .withSecretName(secretName)
          .withDefaultMode(420) // 0644 octal
        .endSecret()
        .build()
    }.toSeq

    val secretVolumeMounts = conf.secretNamesToMountPaths.map { case (secretName, mountPath) =>
      new VolumeMountBuilder()
        .withName(s"$secretName-volume")
        .withMountPath(mountPath)
        .withReadOnly(true)
        .build()
    }.toSeq

    val updatedContainer = new ContainerBuilder(pod.container)
      .addAllToVolumeMounts(secretVolumeMounts.asJava)
      .build()

    val updatedPod = new PodBuilder(pod.pod)
      .editSpec()
        .addAllToVolumes(secretVolumes.asJava)
      .endSpec()
      .build()

    SparkPod(updatedPod, updatedContainer)
  } else {
    pod
  }
}

EnvSecretsFeatureStep { .api }

Injects secret values as environment variables using Kubernetes secret references:

class EnvSecretsFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
  override def configurePod(pod: SparkPod): SparkPod
}

Environment Variable Secrets:

override def configurePod(pod: SparkPod): SparkPod = {
  if (conf.secretEnvNamesToKeyRefs.nonEmpty) {
    val secretEnvVars = conf.secretEnvNamesToKeyRefs.map { case (envName, secretKeyRef) =>
      val Array(secretName, key) = secretKeyRef.split(":", 2)
      
      new EnvVarBuilder()
        .withName(envName)
        .withNewValueFrom()
          .withNewSecretKeyRef()
            .withName(secretName)
            .withKey(key)
          .endSecretKeyRef()
        .endValueFrom()
        .build()
    }.toSeq

    val updatedContainer = new ContainerBuilder(pod.container)
      .addAllToEnv(secretEnvVars.asJava)
      .build()

    pod.copy(container = updatedContainer)
  } else {
    pod
  }
}

MountVolumesFeatureStep { .api }

Mounts user-specified volumes in pods:

class MountVolumesFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
  override def configurePod(pod: SparkPod): SparkPod
}

Volume Mounting Implementation:

override def configurePod(pod: SparkPod): SparkPod = {
  if (conf.volumes.nonEmpty) {
    val volumes = conf.volumes.map(createVolumeSpec)
    val volumeMounts = conf.volumes.map(createVolumeMountSpec)

    val updatedContainer = new ContainerBuilder(pod.container)
      .addAllToVolumeMounts(volumeMounts.asJava)
      .build()

    val updatedPod = new PodBuilder(pod.pod)
      .editSpec()
        .addAllToVolumes(volumes.asJava)
      .endSpec()
      .build()

    SparkPod(updatedPod, updatedContainer)
  } else {
    pod
  }
}

private def createVolumeSpec(volumeSpec: KubernetesVolumeSpec): Volume = {
  val volumeBuilder = new VolumeBuilder()
    .withName(volumeSpec.volumeName)

  volumeSpec.volumeConf match {
    case KubernetesHostPathVolumeConf(hostPath) =>
      volumeBuilder.withNewHostPath()
        .withPath(hostPath)
        .withType("Directory")
      .endHostPath()

    case KubernetesPVCVolumeConf(claimName) =>
      volumeBuilder.withNewPersistentVolumeClaim()
        .withClaimName(claimName)
        .withReadOnly(volumeSpec.mountReadOnly)
      .endPersistentVolumeClaim()

    case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
      volumeBuilder.withNewEmptyDir()
        .withMedium(medium.orNull)
        .withSizeLimit(sizeLimit.map(new Quantity(_)).orNull)
      .endEmptyDir()
  }

  volumeBuilder.build()
}

LocalDirsFeatureStep { .api }

Configures local directories for Spark scratch space using emptyDir volumes:

class LocalDirsFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
  override def configurePod(pod: SparkPod): SparkPod
  override def getAdditionalPodSystemProperties(): Map[String, String]
}

Local Directory Configuration:

override def configurePod(pod: SparkPod): SparkPod = {
  val localDirVolumes = createLocalDirVolumes(conf)
  val localDirVolumeMounts = createLocalDirVolumeMounts(conf)

  val updatedContainer = new ContainerBuilder(pod.container)
    .addAllToVolumeMounts(localDirVolumeMounts.asJava)
    .addToEnv("SPARK_LOCAL_DIRS", localDirPaths.mkString(","))
    .build()

  val updatedPod = new PodBuilder(pod.pod)
    .editSpec()
      .addAllToVolumes(localDirVolumes.asJava)
    .endSpec()
    .build()

  SparkPod(updatedPod, updatedContainer)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = {
  Map("spark.local.dir" -> localDirPaths.mkString(","))
}

private def createLocalDirVolumes(conf: KubernetesConf): Seq[Volume] = {
  val useTemporaryFileSystem = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)
  
  localDirNames.map { dirName =>
    new VolumeBuilder()
      .withName(s"spark-local-dir-$dirName")
      .withNewEmptyDir()
        .withMedium(if (useTemporaryFileSystem) "Memory" else null)
      .endEmptyDir()
      .build()
  }
}

HadoopConfDriverFeatureStep { .api }

Configures Hadoop configuration for driver pod:

class HadoopConfDriverFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
  override def configurePod(pod: SparkPod): SparkPod
  override def getAdditionalPodSystemProperties(): Map[String, String]
}

KerberosConfDriverFeatureStep { .api }

Configures Kerberos authentication for driver pod:

class KerberosConfDriverFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
  override def configurePod(pod: SparkPod): SparkPod
}

PodTemplateConfigMapStep { .api }

Handles pod template configuration via Kubernetes ConfigMaps:

class PodTemplateConfigMapStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
  override def getAdditionalKubernetesResources(): Seq[HasMetadata]
}

Feature Step Composition

Builder Pattern Integration

Feature steps are integrated into the builder pattern for both driver and executor pods:

// Driver feature step composition
class KubernetesDriverBuilder {
  def buildFromFeatures(
    conf: KubernetesDriverConf, 
    client: KubernetesClient
  ): KubernetesDriverSpec = {
    
    val featureSteps = Seq(
      new BasicDriverFeatureStep(conf),
      new DriverServiceFeatureStep(conf),
      new DriverCommandFeatureStep(conf),
      new DriverKubernetesCredentialsFeatureStep(conf)
    ) ++ createOptionalSteps(conf)

    val initialPod = SparkPod.initialPod()
    val configuredPod = applyFeatureSteps(initialPod, featureSteps)
    val systemProperties = collectSystemProperties(featureSteps)
    val additionalResources = collectAdditionalResources(featureSteps)

    KubernetesDriverSpec(configuredPod, additionalResources, systemProperties)
  }

  private def createOptionalSteps(conf: KubernetesDriverConf): Seq[KubernetesFeatureConfigStep] = {
    val steps = mutable.ArrayBuffer[KubernetesFeatureConfigStep]()

    if (conf.secretNamesToMountPaths.nonEmpty) {
      steps += new MountSecretsFeatureStep(conf)
    }

    if (conf.secretEnvNamesToKeyRefs.nonEmpty) {
      steps += new EnvSecretsFeatureStep(conf)
    }

    if (conf.volumes.nonEmpty) {
      steps += new MountVolumesFeatureStep(conf)
    }

    if (conf.get(HADOOP_CONF_CONFIGMAP_NAME).isDefined) {
      steps += new HadoopConfDriverFeatureStep(conf)
    }

    steps.toSeq
  }
}

Executor Feature Step Composition

// Executor feature step composition
class KubernetesExecutorBuilder {
  def buildFromFeatures(
    conf: KubernetesExecutorConf,
    secMgr: SecurityManager, 
    client: KubernetesClient
  ): SparkPod = {
    
    val featureSteps = Seq(
      new BasicExecutorFeatureStep(conf, secMgr, Clock.systemUTC()),
      new ExecutorKubernetesCredentialsFeatureStep(conf),
      new MountSecretsFeatureStep(conf),
      new EnvSecretsFeatureStep(conf),
      new MountVolumesFeatureStep(conf),
      new LocalDirsFeatureStep(conf)
    )

    val initialPod = SparkPod.initialPod()
    applyFeatureSteps(initialPod, featureSteps)
  }
}

Advanced Feature Step Patterns

Conditional Feature Steps

// Feature steps can be conditionally applied based on configuration
class ConditionalFeatureStep(
  condition: KubernetesConf => Boolean,
  step: KubernetesFeatureConfigStep
) extends KubernetesFeatureConfigStep {
  
  override def configurePod(pod: SparkPod): SparkPod = {
    if (condition(conf)) step.configurePod(pod) else pod
  }
  
  override def getAdditionalPodSystemProperties(): Map[String, String] = {
    if (condition(conf)) step.getAdditionalPodSystemProperties() else Map.empty
  }
  
  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
    if (condition(conf)) step.getAdditionalKubernetesResources() else Seq.empty
  }
}

// Usage
val conditionalStep = new ConditionalFeatureStep(
  conf => conf.get(ENABLE_MONITORING).getOrElse(false),
  new MonitoringFeatureStep(conf)
)

Composite Feature Steps

// Combine multiple related feature steps
class CompositeFeatureStep(steps: KubernetesFeatureConfigStep*) extends KubernetesFeatureConfigStep {
  
  override def configurePod(pod: SparkPod): SparkPod = {
    steps.foldLeft(pod) { (currentPod, step) =>
      step.configurePod(currentPod)
    }
  }
  
  override def getAdditionalPodSystemProperties(): Map[String, String] = {
    steps.flatMap(_.getAdditionalPodSystemProperties()).toMap
  }
  
  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
    steps.flatMap(_.getAdditionalKubernetesResources())
  }
}

// Security-related feature steps
val securitySteps = new CompositeFeatureStep(
  new MountSecretsFeatureStep(conf),
  new EnvSecretsFeatureStep(conf),
  new KerberosConfDriverFeatureStep(conf)
)

Custom Feature Steps

// Example: Custom monitoring feature step
class MonitoringFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
  
  override def configurePod(pod: SparkPod): SparkPod = {
    val monitoringContainer = new ContainerBuilder(pod.container)
      .addToEnv("PROMETHEUS_ENABLED", "true")
      .addNewPort()
        .withName("metrics")
        .withContainerPort(8080)
        .withProtocol("TCP")
      .endPort()
      .build()

    val monitoringPod = new PodBuilder(pod.pod)
      .editMetadata()
        .addToAnnotations("prometheus.io/scrape", "true")
        .addToAnnotations("prometheus.io/port", "8080")
        .addToAnnotations("prometheus.io/path", "/metrics")
      .endMetadata()
      .build()

    SparkPod(monitoringPod, monitoringContainer)
  }
  
  override def getAdditionalPodSystemProperties(): Map[String, String] = {
    Map(
      "spark.sql.streaming.metricsEnabled" -> "true",
      "spark.metrics.conf.driver.sink.prometheus.class" -> "org.apache.spark.metrics.sink.PrometheusSink"
    )
  }
  
  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
    val monitoringService = new ServiceBuilder()
      .withNewMetadata()
        .withName(s"${conf.resourceNamePrefix}-metrics")
        .withNamespace(conf.namespace)
      .endMetadata()
      .withNewSpec()
        .addToSelector(SPARK_APP_ID_LABEL, conf.appId)
        .addNewPort()
          .withName("metrics")
          .withPort(8080)
          .withTargetPort(new IntOrString(8080))
        .endPort()
      .endSpec()
      .build()

    Seq(monitoringService)
  }
}

The feature steps system provides a powerful, modular approach to pod configuration that enables extensive customization while maintaining clean separation of concerns and reusability across different deployment scenarios.