or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-submission.mdcluster-management.mdconfiguration.mdfeature-steps.mdindex.mdpod-management.mdutilities.md

utilities.mddocs/

0

# Utilities and Helpers

1

2

The utilities layer provides essential helper functions, client management, and common operations that support the Kubernetes integration throughout the Apache Spark ecosystem. These utilities handle everything from configuration parsing to client creation and volume management.

3

4

## Core Utilities

5

6

### KubernetesUtils { .api }

7

8

Comprehensive collection of Kubernetes-specific utility functions:

9

10

```scala

11

object KubernetesUtils {

12

13

def parsePrefixedKeyValuePairs(

14

sparkConf: SparkConf,

15

prefix: String

16

): Map[String, String]

17

18

def loadPodFromTemplate(

19

kubernetesClient: KubernetesClient,

20

templateFile: File,

21

containerName: Option[String]

22

): SparkPod

23

24

def selectSparkContainer(

25

pod: Pod,

26

containerName: Option[String]

27

): SparkPod

28

29

def requireBothOrNeitherDefined[T](

30

opt1: Option[T],

31

opt2: Option[T],

32

errMessage: String

33

): Unit

34

}

35

```

36

37

### Configuration Parsing Utilities

38

39

#### Prefixed Key-Value Parsing

40

```scala

41

// Parse configuration properties with a common prefix

42

def parsePrefixedKeyValuePairs(

43

sparkConf: SparkConf,

44

prefix: String

45

): Map[String, String] = {

46

47

sparkConf.getAllWithPrefix(prefix).filter { case (key, value) =>

48

key.nonEmpty && value.nonEmpty

49

}.toMap

50

}

51

52

// Usage examples

53

val driverLabels = KubernetesUtils.parsePrefixedKeyValuePairs(

54

sparkConf,

55

"spark.kubernetes.driver.label."

56

)

57

// Input: spark.kubernetes.driver.label.app=myapp, spark.kubernetes.driver.label.version=1.0

58

// Output: Map("app" -> "myapp", "version" -> "1.0")

59

60

val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(

61

sparkConf,

62

"spark.kubernetes.executor.annotation."

63

)

64

// Input: spark.kubernetes.executor.annotation.prometheus.io/scrape=true

65

// Output: Map("prometheus.io/scrape" -> "true")

66

```

67

68

#### Validation Utilities

69

```scala

70

// Require both options to be defined or both to be None

71

def requireBothOrNeitherDefined[T](

72

opt1: Option[T],

73

opt2: Option[T],

74

errMessage: String

75

): Unit = {

76

(opt1, opt2) match {

77

case (Some(_), None) | (None, Some(_)) =>

78

throw new IllegalArgumentException(errMessage)

79

case _ => // Both defined or both None - valid

80

}

81

}

82

83

// Usage

84

KubernetesUtils.requireBothOrNeitherDefined(

85

conf.get(CLIENT_KEY_FILE),

86

conf.get(CLIENT_CERT_FILE),

87

"Both client key file and client cert file must be specified for mutual TLS"

88

)

89

```

90

91

#### Resource Name Generation

92

```scala

93

def generateResourceName(

94

appName: String,

95

appId: String,

96

resourceType: String

97

): String = {

98

val sanitizedAppName = appName

99

.toLowerCase

100

.replaceAll("[^a-z0-9-]", "-")

101

.replaceAll("-+", "-")

102

.take(30)

103

104

val shortAppId = appId.take(8)

105

106

s"$sanitizedAppName-$shortAppId-$resourceType"

107

}

108

109

// Usage

110

val driverPodName = KubernetesUtils.generateResourceName(

111

"My Spark App",

112

"app-123456789",

113

"driver"

114

)

115

// Result: "my-spark-app-app-1234-driver"

116

```

117

118

### Pod Template Management

119

120

#### Template Loading

121

```scala

122

def loadPodFromTemplate(

123

kubernetesClient: KubernetesClient,

124

templateFile: File,

125

containerName: Option[String]

126

): SparkPod = {

127

128

require(templateFile.exists(), s"Pod template file does not exist: ${templateFile.getPath}")

129

130

val podTemplate = try {

131

kubernetesClient.pods()

132

.load(new FileInputStream(templateFile))

133

.get()

134

} catch {

135

case e: Exception =>

136

throw new RuntimeException(s"Failed to load pod template from ${templateFile.getPath}", e)

137

}

138

139

selectSparkContainer(podTemplate, containerName)

140

}

141

142

// Usage

143

val templatePod = KubernetesUtils.loadPodFromTemplate(

144

kubernetesClient,

145

new File("/templates/driver-pod.yaml"),

146

Some(Constants.DRIVER_CONTAINER_NAME)

147

)

148

```

149

150

#### Container Selection

151

```scala

152

def selectSparkContainer(

153

pod: Pod,

154

containerName: Option[String]

155

): SparkPod = {

156

157

val containers = pod.getSpec.getContainers.asScala

158

159

val selectedContainer = containerName match {

160

case Some(name) =>

161

containers.find(_.getName == name).getOrElse {

162

throw new RuntimeException(s"Container '$name' not found in pod template")

163

}

164

case None =>

165

if (containers.size == 1) {

166

containers.head

167

} else {

168

throw new RuntimeException(

169

s"Pod template contains ${containers.size} containers. " +

170

"Must specify containerName when template has multiple containers."

171

)

172

}

173

}

174

175

SparkPod(pod, selectedContainer)

176

}

177

```

178

179

## Client Management

180

181

### SparkKubernetesClientFactory { .api }

182

183

Factory for creating configured Kubernetes client instances:

184

185

```scala

186

object SparkKubernetesClientFactory {

187

188

def createKubernetesClient(

189

clientType: ClientType,

190

kubernetesConf: Option[KubernetesConf] = None,

191

sparkConf: Option[SparkConf] = None,

192

defaultServiceAccountToken: Option[String] = None,

193

clientContext: Option[String] = None

194

): KubernetesClient

195

196

sealed trait ClientType

197

object ClientType {

198

case object Driver extends ClientType

199

case object Executor extends ClientType

200

case object Submission extends ClientType

201

}

202

}

203

```

204

205

### Client Configuration

206

207

#### Basic Client Creation

208

```scala

209

// Driver client for executor management

210

val driverClient = SparkKubernetesClientFactory.createKubernetesClient(

211

ClientType.Driver,

212

kubernetesConf = Some(driverConf)

213

)

214

215

// Submission client for application deployment

216

val submissionClient = SparkKubernetesClientFactory.createKubernetesClient(

217

ClientType.Submission,

218

sparkConf = Some(conf),

219

clientContext = Some("my-cluster-context")

220

)

221

222

// Executor client for pod operations

223

val executorClient = SparkKubernetesClientFactory.createKubernetesClient(

224

ClientType.Executor,

225

kubernetesConf = Some(executorConf),

226

defaultServiceAccountToken = Some(serviceAccountToken)

227

)

228

```

229

230

#### Authentication Configuration

231

```scala

232

def createKubernetesClient(

233

clientType: ClientType,

234

kubernetesConf: Option[KubernetesConf] = None,

235

sparkConf: Option[SparkConf] = None,

236

defaultServiceAccountToken: Option[String] = None,

237

clientContext: Option[String] = None

238

): KubernetesClient = {

239

240

val config = new ConfigBuilder()

241

.withApiVersion("v1")

242

243

// Configure API server URL

244

sparkConf.flatMap(_.getOption("spark.kubernetes.apiserver.host")).foreach { host =>

245

config.withMasterUrl(host)

246

}

247

248

// Configure authentication

249

configureAuthentication(config, sparkConf, defaultServiceAccountToken)

250

251

// Configure TLS

252

configureTls(config, sparkConf)

253

254

// Configure context

255

clientContext.foreach(config.withCurrentContext)

256

257

DefaultKubernetesClient.fromConfig(config.build())

258

}

259

260

private def configureAuthentication(

261

config: ConfigBuilder,

262

sparkConf: Option[SparkConf],

263

defaultServiceAccountToken: Option[String]

264

): Unit = {

265

266

sparkConf.foreach { conf =>

267

// OAuth token authentication

268

conf.getOption(OAUTH_TOKEN.key).orElse {

269

conf.getOption(OAUTH_TOKEN_FILE.key).map { tokenFile =>

270

Files.readAllLines(Paths.get(tokenFile)).asScala.mkString

271

}

272

}.foreach(config.withOauthToken)

273

274

// Username/password authentication

275

conf.getOption("spark.kubernetes.authenticate.username").foreach { username =>

276

config.withUsername(username)

277

conf.getOption("spark.kubernetes.authenticate.password").foreach { password =>

278

config.withPassword(password)

279

}

280

}

281

282

// Service account token

283

conf.getOption(KUBERNETES_SERVICE_ACCOUNT_NAME.key).orElse {

284

defaultServiceAccountToken

285

}.foreach(config.withOauthToken)

286

}

287

}

288

289

private def configureTls(

290

config: ConfigBuilder,

291

sparkConf: Option[SparkConf]

292

): Unit = {

293

294

sparkConf.foreach { conf =>

295

// CA certificate

296

conf.getOption(CA_CERT_FILE.key).foreach { caCertFile =>

297

config.withCaCertFile(caCertFile)

298

}

299

300

// Client certificates for mutual TLS

301

val clientCertFile = conf.getOption(CLIENT_CERT_FILE.key)

302

val clientKeyFile = conf.getOption(CLIENT_KEY_FILE.key)

303

304

(clientCertFile, clientKeyFile) match {

305

case (Some(certFile), Some(keyFile)) =>

306

config.withClientCertFile(certFile)

307

config.withClientKeyFile(keyFile)

308

case _ => // No client certificates

309

}

310

311

// Trust all certificates (for development only)

312

if (conf.getBoolean("spark.kubernetes.apiserver.trustCerts", false)) {

313

config.withTrustCerts(true)

314

}

315

}

316

}

317

```

318

319

## Volume Utilities

320

321

### KubernetesVolumeUtils { .api }

322

323

Utilities for parsing and handling Kubernetes volume configurations:

324

325

```scala

326

object KubernetesVolumeUtils {

327

328

def parseVolumesWithPrefix(

329

sparkConf: SparkConf,

330

prefix: String

331

): Seq[KubernetesVolumeSpec]

332

333

private def parseVolumeSpec(

334

volumeName: String,

335

volumeConf: Map[String, String]

336

): KubernetesVolumeSpec

337

}

338

```

339

340

### Volume Parsing Implementation

341

342

#### Prefix-Based Volume Configuration

343

```scala

344

def parseVolumesWithPrefix(

345

sparkConf: SparkConf,

346

prefix: String

347

): Seq[KubernetesVolumeSpec] = {

348

349

val volumeConfigs = sparkConf.getAllWithPrefix(prefix)

350

.groupBy { case (key, _) =>

351

// Extract volume name from key like "volume.data.hostPath.path"

352

key.split('.').headOption.getOrElse("")

353

}

354

.filter(_._1.nonEmpty)

355

356

volumeConfigs.map { case (volumeName, configs) =>

357

val configMap = configs.map { case (key, value) =>

358

// Remove volume name prefix: "data.hostPath.path" -> "hostPath.path"

359

key.substring(volumeName.length + 1) -> value

360

}.toMap

361

362

parseVolumeSpec(volumeName, configMap)

363

}.toSeq

364

}

365

366

// Usage

367

val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(

368

sparkConf,

369

"spark.kubernetes.driver.volume."

370

)

371

372

val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(

373

sparkConf,

374

"spark.kubernetes.executor.volume."

375

)

376

```

377

378

#### Volume Specification Parsing

379

```scala

380

private def parseVolumeSpec(

381

volumeName: String,

382

volumeConf: Map[String, String]

383

): KubernetesVolumeSpec = {

384

385

val mountPath = volumeConf.getOrElse("mount.path",

386

throw new IllegalArgumentException(s"Volume $volumeName missing mount.path"))

387

388

val mountSubPath = volumeConf.getOrElse("mount.subPath", "")

389

val mountReadOnly = volumeConf.getOrElse("mount.readOnly", "false").toBoolean

390

391

val volumeType = determineVolumeType(volumeConf)

392

val volumeSpecificConf = parseVolumeSpecificConf(volumeType, volumeConf)

393

394

KubernetesVolumeSpec(

395

volumeName = volumeName,

396

mountPath = mountPath,

397

mountSubPath = mountSubPath,

398

mountReadOnly = mountReadOnly,

399

volumeConf = volumeSpecificConf

400

)

401

}

402

403

private def determineVolumeType(volumeConf: Map[String, String]): String = {

404

val volumeTypes = Set("hostPath", "persistentVolumeClaim", "emptyDir", "nfs", "configMap", "secret")

405

406

val presentTypes = volumeTypes.filter { volumeType =>

407

volumeConf.keys.exists(_.startsWith(s"$volumeType."))

408

}

409

410

presentTypes.size match {

411

case 0 => throw new IllegalArgumentException("No volume type specified")

412

case 1 => presentTypes.head

413

case _ => throw new IllegalArgumentException(s"Multiple volume types specified: ${presentTypes.mkString(", ")}")

414

}

415

}

416

417

private def parseVolumeSpecificConf(

418

volumeType: String,

419

volumeConf: Map[String, String]

420

): KubernetesVolumeSpecificConf = {

421

422

volumeType match {

423

case "hostPath" =>

424

val hostPath = volumeConf.getOrElse("hostPath.path",

425

throw new IllegalArgumentException("hostPath volume missing path"))

426

KubernetesHostPathVolumeConf(hostPath)

427

428

case "persistentVolumeClaim" =>

429

val claimName = volumeConf.getOrElse("persistentVolumeClaim.claimName",

430

throw new IllegalArgumentException("PVC volume missing claimName"))

431

KubernetesPVCVolumeConf(claimName)

432

433

case "emptyDir" =>

434

val medium = volumeConf.get("emptyDir.medium")

435

val sizeLimit = volumeConf.get("emptyDir.sizeLimit")

436

KubernetesEmptyDirVolumeConf(medium, sizeLimit)

437

438

case "nfs" =>

439

val server = volumeConf.getOrElse("nfs.server",

440

throw new IllegalArgumentException("NFS volume missing server"))

441

val path = volumeConf.getOrElse("nfs.path",

442

throw new IllegalArgumentException("NFS volume missing path"))

443

KubernetesNFSVolumeConf(server, path)

444

445

case _ =>

446

throw new IllegalArgumentException(s"Unsupported volume type: $volumeType")

447

}

448

}

449

```

450

451

### Volume Configuration Examples

452

453

```scala

454

// Host path volume configuration

455

spark.conf.set("spark.kubernetes.volume.data.hostPath.path", "/host/data")

456

spark.conf.set("spark.kubernetes.volume.data.mount.path", "/data")

457

spark.conf.set("spark.kubernetes.volume.data.mount.readOnly", "true")

458

459

// PVC volume configuration

460

spark.conf.set("spark.kubernetes.volume.storage.persistentVolumeClaim.claimName", "spark-pvc")

461

spark.conf.set("spark.kubernetes.volume.storage.mount.path", "/storage")

462

spark.conf.set("spark.kubernetes.volume.storage.mount.subPath", "spark-data")

463

464

// EmptyDir volume with memory backing

465

spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.medium", "Memory")

466

spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.sizeLimit", "1Gi")

467

spark.conf.set("spark.kubernetes.volume.tmp.mount.path", "/tmp")

468

469

// NFS volume configuration

470

spark.conf.set("spark.kubernetes.volume.shared.nfs.server", "nfs-server.example.com")

471

spark.conf.set("spark.kubernetes.volume.shared.nfs.path", "/shared/data")

472

spark.conf.set("spark.kubernetes.volume.shared.mount.path", "/shared")

473

```

474

475

## Resource Management Utilities

476

477

### Resource Requirement Building

478

479

```scala

480

object ResourceUtils {

481

482

def buildResourceRequirements(

483

conf: KubernetesConf,

484

isDriver: Boolean = false

485

): ResourceRequirements = {

486

487

val prefix = if (isDriver) "spark.kubernetes.driver" else "spark.kubernetes.executor"

488

489

val limits = mutable.Map[String, Quantity]()

490

val requests = mutable.Map[String, Quantity]()

491

492

// CPU configuration

493

conf.sparkConf.getOption(s"$prefix.limit.cores").foreach { cores =>

494

limits("cpu") = new Quantity(cores)

495

}

496

497

conf.sparkConf.getOption(s"$prefix.request.cores").foreach { cores =>

498

requests("cpu") = new Quantity(cores)

499

}

500

501

// Memory configuration

502

conf.sparkConf.getOption(s"$prefix.memory").foreach { memory =>

503

val memoryBytes = Utils.byteStringAsBytes(memory)

504

val memoryMb = memoryBytes / (1024 * 1024)

505

506

// Add memory overhead

507

val overhead = conf.sparkConf.getOption(s"$prefix.memoryOverhead")

508

.map(Utils.byteStringAsBytes)

509

.getOrElse((memoryBytes * 0.1).toLong) // 10% default overhead

510

511

val totalMemory = memoryBytes + overhead

512

limits("memory") = new Quantity(s"${totalMemory}")

513

requests("memory") = new Quantity(s"${totalMemory}")

514

}

515

516

new ResourceRequirementsBuilder()

517

.withLimits(limits.asJava)

518

.withRequests(requests.asJava)

519

.build()

520

}

521

}

522

```

523

524

### Label and Annotation Utilities

525

526

```scala

527

object MetadataUtils {

528

529

def buildLabels(conf: KubernetesConf): Map[String, String] = {

530

val baseLabels = Map(

531

Constants.SPARK_APP_ID_LABEL -> conf.appId,

532

Constants.SPARK_APP_NAME_LABEL -> conf.appName,

533

Constants.SPARK_VERSION_LABEL -> org.apache.spark.SPARK_VERSION

534

)

535

536

baseLabels ++ conf.labels

537

}

538

539

def buildAnnotations(conf: KubernetesConf): Map[String, String] = {

540

val baseAnnotations = Map(

541

Constants.CREATED_BY_ANNOTATION -> "Apache Spark",

542

Constants.SPARK_APP_NAME_ANNOTATION -> conf.appName

543

)

544

545

baseAnnotations ++ conf.annotations

546

}

547

548

def validateLabelKey(key: String): Unit = {

549

require(key.matches("[a-z0-9A-Z]([a-z0-9A-Z._-]*[a-z0-9A-Z])?"),

550

s"Invalid label key: $key")

551

require(key.length <= 63, s"Label key too long: $key")

552

}

553

554

def validateLabelValue(value: String): Unit = {

555

require(value.matches("[a-z0-9A-Z]([a-z0-9A-Z._-]*[a-z0-9A-Z])?") || value.isEmpty,

556

s"Invalid label value: $value")

557

require(value.length <= 63, s"Label value too long: $value")

558

}

559

}

560

```

561

562

## Error Handling Utilities

563

564

### Kubernetes Exception Handling

565

566

```scala

567

object KubernetesExceptionUtils {

568

569

def handleKubernetesApiException[T](operation: String)(block: => T): Try[T] = {

570

Try(block).recover {

571

case e: KubernetesClientException =>

572

logError(s"Kubernetes API error during $operation: ${e.getMessage}")

573

throw new SparkException(s"Failed to $operation: ${e.getMessage}", e)

574

575

case e: IOException =>

576

logError(s"Network error during $operation: ${e.getMessage}")

577

throw new SparkException(s"Network error during $operation: ${e.getMessage}", e)

578

579

case e: Exception =>

580

logError(s"Unexpected error during $operation: ${e.getMessage}")

581

throw new SparkException(s"Unexpected error during $operation: ${e.getMessage}", e)

582

}

583

}

584

585

def withRetry[T](

586

operation: String,

587

maxRetries: Int = 3,

588

backoffMillis: Long = 1000

589

)(block: => T): T = {

590

591

var lastException: Exception = null

592

593

for (attempt <- 1 to maxRetries) {

594

try {

595

return block

596

} catch {

597

case e: Exception =>

598

lastException = e

599

if (attempt < maxRetries) {

600

logWarning(s"$operation failed (attempt $attempt/$maxRetries), retrying in ${backoffMillis}ms")

601

Thread.sleep(backoffMillis * attempt) // Exponential backoff

602

}

603

}

604

}

605

606

throw new SparkException(s"$operation failed after $maxRetries attempts", lastException)

607

}

608

}

609

610

// Usage

611

val pod = KubernetesExceptionUtils.withRetry("create executor pod") {

612

kubernetesClient.pods()

613

.inNamespace(namespace)

614

.create(podSpec)

615

}

616

```

617

618

### Configuration Validation Utilities

619

620

```scala

621

object ConfigValidationUtils {

622

623

def validateImageName(image: String): Unit = {

624

require(image.nonEmpty, "Container image cannot be empty")

625

626

// Basic image name validation

627

val imagePattern = "^([a-zA-Z0-9._-]+(/[a-zA-Z0-9._-]+)*)(:([a-zA-Z0-9._-]+))?$".r

628

require(imagePattern.matches(image), s"Invalid container image format: $image")

629

630

// Warn about latest tag

631

if (image.endsWith(":latest")) {

632

logWarning("Using 'latest' tag is discouraged in production environments")

633

}

634

}

635

636

def validateNamespaceName(namespace: String): Unit = {

637

require(namespace.nonEmpty, "Kubernetes namespace cannot be empty")

638

require(namespace.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),

639

s"Invalid namespace name: $namespace")

640

require(namespace.length <= 63, s"Namespace name too long: $namespace")

641

}

642

643

def validateResourceName(name: String): Unit = {

644

require(name.nonEmpty, "Resource name cannot be empty")

645

require(name.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),

646

s"Invalid resource name: $name")

647

require(name.length <= 253, s"Resource name too long: $name")

648

}

649

650

def validateMemoryString(memory: String): Unit = {

651

try {

652

val bytes = Utils.byteStringAsBytes(memory)

653

require(bytes > 0, "Memory must be positive")

654

} catch {

655

case _: NumberFormatException =>

656

throw new IllegalArgumentException(s"Invalid memory format: $memory")

657

}

658

}

659

}

660

```

661

662

## Integration Patterns

663

664

### Utility Composition

665

666

```scala

667

// Common utility operations combined

668

object KubernetesOperations {

669

670

def createConfiguredPod(

671

conf: KubernetesConf,

672

podTemplate: Option[File] = None

673

): SparkPod = {

674

675

val client = SparkKubernetesClientFactory.createKubernetesClient(

676

ClientType.Driver,

677

Some(conf)

678

)

679

680

val basePod = podTemplate match {

681

case Some(template) =>

682

KubernetesUtils.loadPodFromTemplate(client, template, None)

683

case None =>

684

SparkPod.initialPod()

685

}

686

687

val volumes = KubernetesVolumeUtils.parseVolumesWithPrefix(

688

conf.sparkConf,

689

"spark.kubernetes.volume."

690

)

691

692

// Apply volumes and other configurations

693

applyVolumesToPod(basePod, volumes)

694

}

695

696

private def applyVolumesToPod(

697

pod: SparkPod,

698

volumes: Seq[KubernetesVolumeSpec]

699

): SparkPod = {

700

// Implementation using MountVolumesFeatureStep logic

701

new MountVolumesFeatureStep(volumes).configurePod(pod)

702

}

703

}

704

```

705

706

The utilities layer provides the essential building blocks and helper functions that make the Kubernetes integration robust, flexible, and easy to use across all components of the Spark Kubernetes resource manager.