or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-submission.mdcluster-management.mdconfiguration.mdfeature-steps.mdindex.mdpod-management.mdutilities.md

feature-steps.mddocs/

0

# Feature Steps System

1

2

The feature steps system provides a modular, extensible architecture for configuring Kubernetes pods through composable configuration steps. Each feature step handles a specific aspect of pod configuration, enabling flexible customization of both driver and executor pods.

3

4

## Core Architecture

5

6

### KubernetesFeatureConfigStep { .api }

7

8

Base interface for all feature configuration steps:

9

10

```scala

11

trait KubernetesFeatureConfigStep {

12

def configurePod(pod: SparkPod): SparkPod

13

def getAdditionalPodSystemProperties(): Map[String, String]

14

def getAdditionalKubernetesResources(): Seq[HasMetadata]

15

}

16

```

17

18

**Key Responsibilities**:

19

- **Pod Configuration**: Apply specific modifications to pod specifications

20

- **System Properties**: Contribute JVM system properties for Spark components

21

- **Additional Resources**: Create supporting Kubernetes resources (Services, ConfigMaps, etc.)

22

23

**Usage Pattern**:

24

```scala

25

// Feature steps are applied in sequence during pod building

26

val featureSteps: Seq[KubernetesFeatureConfigStep] = createFeatureSteps(conf)

27

28

val finalSpec = featureSteps.foldLeft(initialPod) { (pod, step) =>

29

step.configurePod(pod)

30

}

31

32

val allSystemProperties = featureSteps.flatMap(_.getAdditionalPodSystemProperties()).toMap

33

val additionalResources = featureSteps.flatMap(_.getAdditionalKubernetesResources())

34

```

35

36

## Driver Feature Steps

37

38

### BasicDriverFeatureStep { .api }

39

40

Core driver pod configuration including essential container setup:

41

42

```scala

43

class BasicDriverFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {

44

override def configurePod(pod: SparkPod): SparkPod

45

override def getAdditionalPodSystemProperties(): Map[String, String]

46

override def getAdditionalKubernetesResources(): Seq[HasMetadata]

47

}

48

```

49

50

**Configuration Applied**:

51

- **Container Image**: Sets driver container image from configuration

52

- **Resource Limits**: Applies CPU and memory limits and requests

53

- **Basic Labels**: Adds Spark application and role labels

54

- **Environment Variables**: Sets essential Spark environment variables

55

- **Port Configuration**: Configures driver ports for communication

56

57

**Implementation Example**:

58

```scala

59

override def configurePod(pod: SparkPod): SparkPod = {

60

val driverContainer = new ContainerBuilder(pod.container)

61

.withName(Constants.DRIVER_CONTAINER_NAME)

62

.withImage(conf.get(CONTAINER_IMAGE))

63

.withImagePullPolicy(conf.imagePullPolicy)

64

.addNewPort()

65

.withName("driver-rpc-port")

66

.withContainerPort(DEFAULT_DRIVER_PORT)

67

.withProtocol("TCP")

68

.endPort()

69

.addToEnv(ENV_DRIVER_BIND_ADDRESS, "0.0.0.0")

70

.addToEnv(ENV_APPLICATION_ID, conf.appId)

71

.withResources(buildResourceRequirements(conf))

72

.build()

73

74

val driverPod = new PodBuilder(pod.pod)

75

.editOrNewMetadata()

76

.withName(conf.resourceNamePrefix + "-driver")

77

.addToLabels(SPARK_APP_ID_LABEL, conf.appId)

78

.addToLabels(SPARK_ROLE_LABEL, DRIVER_ROLE)

79

.endMetadata()

80

.editOrNewSpec()

81

.withRestartPolicy("Never")

82

.withServiceAccount(conf.get(KUBERNETES_SERVICE_ACCOUNT_NAME).orNull)

83

.endSpec()

84

.build()

85

86

SparkPod(driverPod, driverContainer)

87

}

88

```

89

90

### DriverServiceFeatureStep { .api }

91

92

Creates Kubernetes service for driver pod to enable executor communication:

93

94

```scala

95

class DriverServiceFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {

96

override def configurePod(pod: SparkPod): SparkPod

97

override def getAdditionalKubernetesResources(): Seq[HasMetadata]

98

}

99

```

100

101

**Service Configuration**:

102

```scala

103

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {

104

val service = new ServiceBuilder()

105

.withNewMetadata()

106

.withName(conf.resourceNamePrefix + "-driver-svc")

107

.withNamespace(conf.namespace)

108

.addToLabels(SPARK_APP_ID_LABEL, conf.appId)

109

.addToAnnotations(conf.serviceAnnotations.asJava)

110

.endMetadata()

111

.withNewSpec()

112

.withType(conf.get(KUBERNETES_DRIVER_SERVICE_TYPE))

113

.addToSelector(SPARK_APP_ID_LABEL, conf.appId)

114

.addToSelector(SPARK_ROLE_LABEL, DRIVER_ROLE)

115

.addNewPort()

116

.withName("driver-rpc-port")

117

.withPort(DEFAULT_DRIVER_PORT)

118

.withTargetPort(new IntOrString(DEFAULT_DRIVER_PORT))

119

.endPort()

120

.addNewPort()

121

.withName("blockmanager")

122

.withPort(DEFAULT_BLOCKMANAGER_PORT)

123

.withTargetPort(new IntOrString(DEFAULT_BLOCKMANAGER_PORT))

124

.endPort()

125

.endSpec()

126

.build()

127

128

Seq(service)

129

}

130

```

131

132

### DriverCommandFeatureStep { .api }

133

134

Configures the driver container command and arguments for application execution:

135

136

```scala

137

class DriverCommandFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {

138

override def configurePod(pod: SparkPod): SparkPod

139

}

140

```

141

142

**Command Configuration**:

143

```scala

144

override def configurePod(pod: SparkPod): SparkPod = {

145

val driverContainer = new ContainerBuilder(pod.container)

146

.withCommand("/opt/spark/bin/spark-submit")

147

.withArgs(buildDriverArgs(conf): _*)

148

.build()

149

150

pod.copy(container = driverContainer)

151

}

152

153

private def buildDriverArgs(conf: KubernetesDriverConf): Seq[String] = {

154

val baseArgs = Seq(

155

"--class", conf.mainClass,

156

"--deploy-mode", "client"

157

)

158

159

val resourceArgs = conf.mainAppResource match {

160

case JavaMainAppResource(Some(resource)) => Seq(resource)

161

case PythonMainAppResource(resource) => Seq(resource)

162

case RMainAppResource(resource) => Seq(resource)

163

case _ => Seq.empty

164

}

165

166

val appArgs = conf.appArgs

167

168

baseArgs ++ resourceArgs ++ appArgs

169

}

170

```

171

172

### DriverKubernetesCredentialsFeatureStep { .api }

173

174

Configures Kubernetes API credentials for driver pod operations:

175

176

```scala

177

class DriverKubernetesCredentialsFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {

178

override def configurePod(pod: SparkPod): SparkPod

179

override def getAdditionalPodSystemProperties(): Map[String, String]

180

}

181

```

182

183

**Credential Configuration**:

184

```scala

185

override def configurePod(pod: SparkPod): SparkPod = {

186

val credentialVolumes = buildCredentialVolumes(conf)

187

val credentialVolumeMounts = buildCredentialVolumeMounts(conf)

188

189

val driverContainer = new ContainerBuilder(pod.container)

190

.addAllToVolumeMounts(credentialVolumeMounts.asJava)

191

.build()

192

193

val driverPod = new PodBuilder(pod.pod)

194

.editSpec()

195

.addAllToVolumes(credentialVolumes.asJava)

196

.endSpec()

197

.build()

198

199

SparkPod(driverPod, driverContainer)

200

}

201

202

override def getAdditionalPodSystemProperties(): Map[String, String] = {

203

Map(

204

"spark.kubernetes.executor.podNamePrefix" -> conf.resourceNamePrefix,

205

"spark.kubernetes.executor.namespace" -> conf.namespace,

206

"spark.kubernetes.executor.container.image" -> conf.get(EXECUTOR_CONTAINER_IMAGE)

207

)

208

}

209

```

210

211

## Executor Feature Steps

212

213

### BasicExecutorFeatureStep { .api }

214

215

Core executor pod configuration for Spark worker functionality:

216

217

```scala

218

class BasicExecutorFeatureStep(

219

conf: KubernetesExecutorConf,

220

secMgr: SecurityManager,

221

clock: Clock

222

) extends KubernetesFeatureConfigStep {

223

224

override def configurePod(pod: SparkPod): SparkPod

225

}

226

```

227

228

**Executor Configuration**:

229

```scala

230

override def configurePod(pod: SparkPod): SparkPod = {

231

val executorContainer = new ContainerBuilder(pod.container)

232

.withName(Constants.EXECUTOR_CONTAINER_NAME)

233

.withImage(conf.get(EXECUTOR_CONTAINER_IMAGE))

234

.withImagePullPolicy(conf.imagePullPolicy)

235

.addNewPort()

236

.withName("blockmanager")

237

.withContainerPort(DEFAULT_BLOCKMANAGER_PORT)

238

.withProtocol("TCP")

239

.endPort()

240

.addToEnv(ENV_DRIVER_URL, buildDriverUrl(conf))

241

.addToEnv(ENV_EXECUTOR_CORES, conf.get(KUBERNETES_EXECUTOR_CORES))

242

.addToEnv(ENV_EXECUTOR_MEMORY, conf.get(KUBERNETES_EXECUTOR_MEMORY))

243

.addToEnv(ENV_EXECUTOR_ID, conf.executorId)

244

.withCommand("/opt/spark/bin/spark-class")

245

.withArgs("org.apache.spark.executor.CoarseGrainedExecutorBackend")

246

.withResources(buildExecutorResourceRequirements(conf))

247

.build()

248

249

val executorPod = new PodBuilder(pod.pod)

250

.editOrNewMetadata()

251

.withName(s"${conf.resourceNamePrefix}-exec-${conf.executorId}")

252

.addToLabels(SPARK_APP_ID_LABEL, conf.appId)

253

.addToLabels(SPARK_ROLE_LABEL, EXECUTOR_ROLE)

254

.addToLabels(SPARK_EXECUTOR_ID_LABEL, conf.executorId)

255

.endMetadata()

256

.build()

257

258

SparkPod(executorPod, executorContainer)

259

}

260

```

261

262

### ExecutorKubernetesCredentialsFeatureStep { .api }

263

264

Configures Kubernetes credentials for executor pod operations:

265

266

```scala

267

class ExecutorKubernetesCredentialsFeatureStep(conf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep {

268

override def configurePod(pod: SparkPod): SparkPod

269

}

270

```

271

272

## Shared Feature Steps

273

274

### MountSecretsFeatureStep { .api }

275

276

Mounts Kubernetes secrets as volumes in pods:

277

278

```scala

279

class MountSecretsFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {

280

override def configurePod(pod: SparkPod): SparkPod

281

}

282

```

283

284

**Secret Mounting**:

285

```scala

286

override def configurePod(pod: SparkPod): SparkPod = {

287

if (conf.secretNamesToMountPaths.nonEmpty) {

288

val secretVolumes = conf.secretNamesToMountPaths.map { case (secretName, mountPath) =>

289

new VolumeBuilder()

290

.withName(s"$secretName-volume")

291

.withNewSecret()

292

.withSecretName(secretName)

293

.withDefaultMode(420) // 0644 octal

294

.endSecret()

295

.build()

296

}.toSeq

297

298

val secretVolumeMounts = conf.secretNamesToMountPaths.map { case (secretName, mountPath) =>

299

new VolumeMountBuilder()

300

.withName(s"$secretName-volume")

301

.withMountPath(mountPath)

302

.withReadOnly(true)

303

.build()

304

}.toSeq

305

306

val updatedContainer = new ContainerBuilder(pod.container)

307

.addAllToVolumeMounts(secretVolumeMounts.asJava)

308

.build()

309

310

val updatedPod = new PodBuilder(pod.pod)

311

.editSpec()

312

.addAllToVolumes(secretVolumes.asJava)

313

.endSpec()

314

.build()

315

316

SparkPod(updatedPod, updatedContainer)

317

} else {

318

pod

319

}

320

}

321

```

322

323

### EnvSecretsFeatureStep { .api }

324

325

Injects secret values as environment variables using Kubernetes secret references:

326

327

```scala

328

class EnvSecretsFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {

329

override def configurePod(pod: SparkPod): SparkPod

330

}

331

```

332

333

**Environment Variable Secrets**:

334

```scala

335

override def configurePod(pod: SparkPod): SparkPod = {

336

if (conf.secretEnvNamesToKeyRefs.nonEmpty) {

337

val secretEnvVars = conf.secretEnvNamesToKeyRefs.map { case (envName, secretKeyRef) =>

338

val Array(secretName, key) = secretKeyRef.split(":", 2)

339

340

new EnvVarBuilder()

341

.withName(envName)

342

.withNewValueFrom()

343

.withNewSecretKeyRef()

344

.withName(secretName)

345

.withKey(key)

346

.endSecretKeyRef()

347

.endValueFrom()

348

.build()

349

}.toSeq

350

351

val updatedContainer = new ContainerBuilder(pod.container)

352

.addAllToEnv(secretEnvVars.asJava)

353

.build()

354

355

pod.copy(container = updatedContainer)

356

} else {

357

pod

358

}

359

}

360

```

361

362

### MountVolumesFeatureStep { .api }

363

364

Mounts user-specified volumes in pods:

365

366

```scala

367

class MountVolumesFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {

368

override def configurePod(pod: SparkPod): SparkPod

369

}

370

```

371

372

**Volume Mounting Implementation**:

373

```scala

374

override def configurePod(pod: SparkPod): SparkPod = {

375

if (conf.volumes.nonEmpty) {

376

val volumes = conf.volumes.map(createVolumeSpec)

377

val volumeMounts = conf.volumes.map(createVolumeMountSpec)

378

379

val updatedContainer = new ContainerBuilder(pod.container)

380

.addAllToVolumeMounts(volumeMounts.asJava)

381

.build()

382

383

val updatedPod = new PodBuilder(pod.pod)

384

.editSpec()

385

.addAllToVolumes(volumes.asJava)

386

.endSpec()

387

.build()

388

389

SparkPod(updatedPod, updatedContainer)

390

} else {

391

pod

392

}

393

}

394

395

private def createVolumeSpec(volumeSpec: KubernetesVolumeSpec): Volume = {

396

val volumeBuilder = new VolumeBuilder()

397

.withName(volumeSpec.volumeName)

398

399

volumeSpec.volumeConf match {

400

case KubernetesHostPathVolumeConf(hostPath) =>

401

volumeBuilder.withNewHostPath()

402

.withPath(hostPath)

403

.withType("Directory")

404

.endHostPath()

405

406

case KubernetesPVCVolumeConf(claimName) =>

407

volumeBuilder.withNewPersistentVolumeClaim()

408

.withClaimName(claimName)

409

.withReadOnly(volumeSpec.mountReadOnly)

410

.endPersistentVolumeClaim()

411

412

case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>

413

volumeBuilder.withNewEmptyDir()

414

.withMedium(medium.orNull)

415

.withSizeLimit(sizeLimit.map(new Quantity(_)).orNull)

416

.endEmptyDir()

417

}

418

419

volumeBuilder.build()

420

}

421

```

422

423

### LocalDirsFeatureStep { .api }

424

425

Configures local directories for Spark scratch space using emptyDir volumes:

426

427

```scala

428

class LocalDirsFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {

429

override def configurePod(pod: SparkPod): SparkPod

430

override def getAdditionalPodSystemProperties(): Map[String, String]

431

}

432

```

433

434

**Local Directory Configuration**:

435

```scala

436

override def configurePod(pod: SparkPod): SparkPod = {

437

val localDirVolumes = createLocalDirVolumes(conf)

438

val localDirVolumeMounts = createLocalDirVolumeMounts(conf)

439

440

val updatedContainer = new ContainerBuilder(pod.container)

441

.addAllToVolumeMounts(localDirVolumeMounts.asJava)

442

.addToEnv("SPARK_LOCAL_DIRS", localDirPaths.mkString(","))

443

.build()

444

445

val updatedPod = new PodBuilder(pod.pod)

446

.editSpec()

447

.addAllToVolumes(localDirVolumes.asJava)

448

.endSpec()

449

.build()

450

451

SparkPod(updatedPod, updatedContainer)

452

}

453

454

override def getAdditionalPodSystemProperties(): Map[String, String] = {

455

Map("spark.local.dir" -> localDirPaths.mkString(","))

456

}

457

458

private def createLocalDirVolumes(conf: KubernetesConf): Seq[Volume] = {

459

val useTemporaryFileSystem = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)

460

461

localDirNames.map { dirName =>

462

new VolumeBuilder()

463

.withName(s"spark-local-dir-$dirName")

464

.withNewEmptyDir()

465

.withMedium(if (useTemporaryFileSystem) "Memory" else null)

466

.endEmptyDir()

467

.build()

468

}

469

}

470

```

471

472

### HadoopConfDriverFeatureStep { .api }

473

474

Configures Hadoop configuration for driver pod:

475

476

```scala

477

class HadoopConfDriverFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {

478

override def configurePod(pod: SparkPod): SparkPod

479

override def getAdditionalPodSystemProperties(): Map[String, String]

480

}

481

```

482

483

### KerberosConfDriverFeatureStep { .api }

484

485

Configures Kerberos authentication for driver pod:

486

487

```scala

488

class KerberosConfDriverFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {

489

override def configurePod(pod: SparkPod): SparkPod

490

}

491

```

492

493

### PodTemplateConfigMapStep { .api }

494

495

Handles pod template configuration via Kubernetes ConfigMaps:

496

497

```scala

498

class PodTemplateConfigMapStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {

499

override def getAdditionalKubernetesResources(): Seq[HasMetadata]

500

}

501

```

502

503

## Feature Step Composition

504

505

### Builder Pattern Integration

506

507

Feature steps are integrated into the builder pattern for both driver and executor pods:

508

509

```scala

510

// Driver feature step composition

511

class KubernetesDriverBuilder {

512

def buildFromFeatures(

513

conf: KubernetesDriverConf,

514

client: KubernetesClient

515

): KubernetesDriverSpec = {

516

517

val featureSteps = Seq(

518

new BasicDriverFeatureStep(conf),

519

new DriverServiceFeatureStep(conf),

520

new DriverCommandFeatureStep(conf),

521

new DriverKubernetesCredentialsFeatureStep(conf)

522

) ++ createOptionalSteps(conf)

523

524

val initialPod = SparkPod.initialPod()

525

val configuredPod = applyFeatureSteps(initialPod, featureSteps)

526

val systemProperties = collectSystemProperties(featureSteps)

527

val additionalResources = collectAdditionalResources(featureSteps)

528

529

KubernetesDriverSpec(configuredPod, additionalResources, systemProperties)

530

}

531

532

private def createOptionalSteps(conf: KubernetesDriverConf): Seq[KubernetesFeatureConfigStep] = {

533

val steps = mutable.ArrayBuffer[KubernetesFeatureConfigStep]()

534

535

if (conf.secretNamesToMountPaths.nonEmpty) {

536

steps += new MountSecretsFeatureStep(conf)

537

}

538

539

if (conf.secretEnvNamesToKeyRefs.nonEmpty) {

540

steps += new EnvSecretsFeatureStep(conf)

541

}

542

543

if (conf.volumes.nonEmpty) {

544

steps += new MountVolumesFeatureStep(conf)

545

}

546

547

if (conf.get(HADOOP_CONF_CONFIGMAP_NAME).isDefined) {

548

steps += new HadoopConfDriverFeatureStep(conf)

549

}

550

551

steps.toSeq

552

}

553

}

554

```

555

556

### Executor Feature Step Composition

557

558

```scala

559

// Executor feature step composition

560

class KubernetesExecutorBuilder {

561

def buildFromFeatures(

562

conf: KubernetesExecutorConf,

563

secMgr: SecurityManager,

564

client: KubernetesClient

565

): SparkPod = {

566

567

val featureSteps = Seq(

568

new BasicExecutorFeatureStep(conf, secMgr, Clock.systemUTC()),

569

new ExecutorKubernetesCredentialsFeatureStep(conf),

570

new MountSecretsFeatureStep(conf),

571

new EnvSecretsFeatureStep(conf),

572

new MountVolumesFeatureStep(conf),

573

new LocalDirsFeatureStep(conf)

574

)

575

576

val initialPod = SparkPod.initialPod()

577

applyFeatureSteps(initialPod, featureSteps)

578

}

579

}

580

```

581

582

## Advanced Feature Step Patterns

583

584

### Conditional Feature Steps

585

586

```scala

587

// Feature steps can be conditionally applied based on configuration

588

class ConditionalFeatureStep(

589

condition: KubernetesConf => Boolean,

590

step: KubernetesFeatureConfigStep

591

) extends KubernetesFeatureConfigStep {

592

593

override def configurePod(pod: SparkPod): SparkPod = {

594

if (condition(conf)) step.configurePod(pod) else pod

595

}

596

597

override def getAdditionalPodSystemProperties(): Map[String, String] = {

598

if (condition(conf)) step.getAdditionalPodSystemProperties() else Map.empty

599

}

600

601

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {

602

if (condition(conf)) step.getAdditionalKubernetesResources() else Seq.empty

603

}

604

}

605

606

// Usage

607

val conditionalStep = new ConditionalFeatureStep(

608

conf => conf.get(ENABLE_MONITORING).getOrElse(false),

609

new MonitoringFeatureStep(conf)

610

)

611

```

612

613

### Composite Feature Steps

614

615

```scala

616

// Combine multiple related feature steps

617

class CompositeFeatureStep(steps: KubernetesFeatureConfigStep*) extends KubernetesFeatureConfigStep {

618

619

override def configurePod(pod: SparkPod): SparkPod = {

620

steps.foldLeft(pod) { (currentPod, step) =>

621

step.configurePod(currentPod)

622

}

623

}

624

625

override def getAdditionalPodSystemProperties(): Map[String, String] = {

626

steps.flatMap(_.getAdditionalPodSystemProperties()).toMap

627

}

628

629

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {

630

steps.flatMap(_.getAdditionalKubernetesResources())

631

}

632

}

633

634

// Security-related feature steps

635

val securitySteps = new CompositeFeatureStep(

636

new MountSecretsFeatureStep(conf),

637

new EnvSecretsFeatureStep(conf),

638

new KerberosConfDriverFeatureStep(conf)

639

)

640

```

641

642

### Custom Feature Steps

643

644

```scala

645

// Example: Custom monitoring feature step

646

class MonitoringFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {

647

648

override def configurePod(pod: SparkPod): SparkPod = {

649

val monitoringContainer = new ContainerBuilder(pod.container)

650

.addToEnv("PROMETHEUS_ENABLED", "true")

651

.addNewPort()

652

.withName("metrics")

653

.withContainerPort(8080)

654

.withProtocol("TCP")

655

.endPort()

656

.build()

657

658

val monitoringPod = new PodBuilder(pod.pod)

659

.editMetadata()

660

.addToAnnotations("prometheus.io/scrape", "true")

661

.addToAnnotations("prometheus.io/port", "8080")

662

.addToAnnotations("prometheus.io/path", "/metrics")

663

.endMetadata()

664

.build()

665

666

SparkPod(monitoringPod, monitoringContainer)

667

}

668

669

override def getAdditionalPodSystemProperties(): Map[String, String] = {

670

Map(

671

"spark.sql.streaming.metricsEnabled" -> "true",

672

"spark.metrics.conf.driver.sink.prometheus.class" -> "org.apache.spark.metrics.sink.PrometheusSink"

673

)

674

}

675

676

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {

677

val monitoringService = new ServiceBuilder()

678

.withNewMetadata()

679

.withName(s"${conf.resourceNamePrefix}-metrics")

680

.withNamespace(conf.namespace)

681

.endMetadata()

682

.withNewSpec()

683

.addToSelector(SPARK_APP_ID_LABEL, conf.appId)

684

.addNewPort()

685

.withName("metrics")

686

.withPort(8080)

687

.withTargetPort(new IntOrString(8080))

688

.endPort()

689

.endSpec()

690

.build()

691

692

Seq(monitoringService)

693

}

694

}

695

```

696

697

The feature steps system provides a powerful, modular approach to pod configuration that enables extensive customization while maintaining clean separation of concerns and reusability across different deployment scenarios.