or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-submission.mdcluster-management.mdconfiguration.mdfeature-steps.mdindex.mdpod-management.mdutilities.md

configuration.mddocs/

0

# Configuration Management

1

2

The configuration management system provides a comprehensive, type-safe approach to configuring Kubernetes deployments for Spark applications, extending Spark's native configuration framework with Kubernetes-specific options and validation.

3

4

## Configuration Architecture

5

6

### KubernetesConf Hierarchy { .api }

7

8

Base abstract class containing all metadata needed for Kubernetes pod creation and management:

9

10

```scala

11

abstract class KubernetesConf(val sparkConf: SparkConf) {

12

13

val resourceNamePrefix: String

14

def labels: Map[String, String]

15

def environment: Map[String, String]

16

def annotations: Map[String, String]

17

def secretEnvNamesToKeyRefs: Map[String, String]

18

def secretNamesToMountPaths: Map[String, String]

19

def volumes: Seq[KubernetesVolumeSpec]

20

21

def appName: String = get("spark.app.name", "spark")

22

def namespace: String = get(KUBERNETES_NAMESPACE)

23

def imagePullPolicy: String = get(CONTAINER_IMAGE_PULL_POLICY)

24

def nodeSelector: Map[String, String] =

25

KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)

26

27

// Utility methods for configuration access

28

def contains(config: ConfigEntry[_]): Boolean = sparkConf.contains(config)

29

def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)

30

def get(conf: String): String = sparkConf.get(conf)

31

def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue)

32

def getOption(key: String): Option[String] = sparkConf.getOption(key)

33

}

34

```

35

36

**Core Properties**:

37

- **Resource Naming**: `resourceNamePrefix` ensures unique Kubernetes resource names

38

- **Metadata**: `labels` and `annotations` for pod identification and configuration

39

- **Environment**: Environment variables and secret references

40

- **Storage**: Volume specifications and secret mount paths

41

- **Scheduling**: Node selectors and image pull policies

42

43

### KubernetesDriverConf { .api }

44

45

Driver-specific configuration extending the base configuration:

46

47

```scala

48

class KubernetesDriverConf(

49

sparkConf: SparkConf,

50

val appId: String,

51

val mainAppResource: MainAppResource,

52

val mainClass: String,

53

val appArgs: Array[String]

54

) extends KubernetesConf(sparkConf) {

55

56

def serviceAnnotations: Map[String, String]

57

}

58

```

59

60

**Driver-Specific Features**:

61

- **Application Resources**: Main class, application JAR, and command-line arguments

62

- **Service Configuration**: Annotations for Kubernetes service creation

63

- **Network Configuration**: Driver port and service type settings

64

65

**Creation Pattern**:

66

```scala

67

val driverConf = KubernetesConf.createDriverConf(

68

sparkConf = conf,

69

appName = "my-spark-app",

70

appResourceNamePrefix = s"spark-${UUID.randomUUID().toString.take(8)}",

71

appId = "app-123456789",

72

mainAppResource = JavaMainAppResource(Some("local:///opt/spark/jars/my-app.jar")),

73

mainClass = "com.example.MySparkApp",

74

appArgs = Array("--input", "/data", "--output", "/results")

75

)

76

```

77

78

### KubernetesExecutorConf { .api }

79

80

Executor-specific configuration for worker pod creation:

81

82

```scala

83

class KubernetesExecutorConf(

84

sparkConf: SparkConf,

85

val appId: String,

86

val executorId: String,

87

val driverPod: Option[Pod]

88

) extends KubernetesConf(sparkConf)

89

```

90

91

**Executor-Specific Features**:

92

- **Executor Identity**: Unique executor ID for tracking and management

93

- **Driver Connection**: Reference to driver pod for communication setup

94

- **Resource Allocation**: Executor-specific CPU, memory, and storage configuration

95

96

## Configuration Definitions

97

98

### Config Object { .api }

99

100

Centralized configuration definitions using Spark's ConfigBuilder pattern:

101

102

```scala

103

object Config {

104

// Container Images

105

val CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.container.image")

106

.doc("Container image to use for Spark pods")

107

.stringConf

108

.createWithDefault(null)

109

110

val DRIVER_CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.driver.container.image")

111

.doc("Container image to use for driver pod")

112

.fallbackConf(CONTAINER_IMAGE)

113

114

val EXECUTOR_CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.executor.container.image")

115

.doc("Container image to use for executor pods")

116

.fallbackConf(CONTAINER_IMAGE)

117

118

// Namespace and Authentication

119

val KUBERNETES_NAMESPACE = ConfigBuilder("spark.kubernetes.namespace")

120

.doc("Kubernetes namespace for Spark pods")

121

.stringConf

122

.createWithDefault("default")

123

124

val KUBERNETES_CONTEXT = ConfigBuilder("spark.kubernetes.context")

125

.doc("Kubernetes context to use")

126

.stringConf

127

.createOptional

128

129

val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder("spark.kubernetes.authenticate.serviceAccountName")

130

.doc("Service account for Spark pods")

131

.stringConf

132

.createOptional

133

134

// Resource Limits

135

val KUBERNETES_DRIVER_LIMIT_CORES = ConfigBuilder("spark.kubernetes.driver.limit.cores")

136

.doc("CPU limit for driver pod")

137

.stringConf

138

.createOptional

139

140

val KUBERNETES_EXECUTOR_LIMIT_CORES = ConfigBuilder("spark.kubernetes.executor.limit.cores")

141

.doc("CPU limit for executor pods")

142

.stringConf

143

.createOptional

144

145

// Dynamic Allocation

146

val DYN_ALLOCATION_ENABLED = ConfigBuilder("spark.dynamicAllocation.enabled")

147

.doc("Enable dynamic allocation of executors")

148

.booleanConf

149

.createWithDefault(false)

150

151

val DYN_ALLOCATION_MIN_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.minExecutors")

152

.doc("Minimum number of executors")

153

.intConf

154

.createWithDefault(0)

155

156

val DYN_ALLOCATION_MAX_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.maxExecutors")

157

.doc("Maximum number of executors")

158

.intConf

159

.createWithDefault(Int.MaxValue)

160

161

// Networking

162

val KUBERNETES_DRIVER_POD_NAME = ConfigBuilder("spark.kubernetes.driver.pod.name")

163

.doc("Name of the driver pod")

164

.stringConf

165

.createOptional

166

167

val KUBERNETES_DRIVER_SERVICE_TYPE = ConfigBuilder("spark.kubernetes.driver.service.type")

168

.doc("Service type for driver pod")

169

.stringConf

170

.createWithDefault("ClusterIP")

171

172

// Volume Configuration

173

val KUBERNETES_VOLUMES_PREFIX = "spark.kubernetes.volume."

174

val KUBERNETES_DRIVER_VOLUMES_PREFIX = "spark.kubernetes.driver.volume."

175

val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volume."

176

177

// Pod Templates

178

val KUBERNETES_DRIVER_PODTEMPLATE_FILE = ConfigBuilder("spark.kubernetes.driver.podTemplateFile")

179

.doc("Path to driver pod template file")

180

.stringConf

181

.createOptional

182

183

val KUBERNETES_EXECUTOR_PODTEMPLATE_FILE = ConfigBuilder("spark.kubernetes.executor.podTemplateFile")

184

.doc("Path to executor pod template file")

185

.stringConf

186

.createOptional

187

}

188

```

189

190

### Configuration Categories

191

192

#### Authentication and Security

193

```scala

194

// Service Account Configuration

195

val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder("spark.kubernetes.authenticate.serviceAccountName")

196

197

// OAuth Token Authentication

198

val OAUTH_TOKEN = ConfigBuilder("spark.kubernetes.authenticate.oauthToken")

199

val OAUTH_TOKEN_FILE = ConfigBuilder("spark.kubernetes.authenticate.oauthTokenFile")

200

201

// Certificate Authentication

202

val CLIENT_KEY_FILE = ConfigBuilder("spark.kubernetes.authenticate.clientKeyFile")

203

val CLIENT_CERT_FILE = ConfigBuilder("spark.kubernetes.authenticate.clientCertFile")

204

val CA_CERT_FILE = ConfigBuilder("spark.kubernetes.authenticate.caCertFile")

205

```

206

207

#### Resource Management

208

```scala

209

// CPU Configuration

210

val KUBERNETES_DRIVER_REQUEST_CORES = ConfigBuilder("spark.kubernetes.driver.request.cores")

211

val KUBERNETES_EXECUTOR_REQUEST_CORES = ConfigBuilder("spark.kubernetes.executor.request.cores")

212

213

// Memory Configuration

214

val KUBERNETES_DRIVER_MEMORY = ConfigBuilder("spark.kubernetes.driver.memory")

215

val KUBERNETES_EXECUTOR_MEMORY = ConfigBuilder("spark.kubernetes.executor.memory")

216

val MEMORY_OVERHEAD_FACTOR = ConfigBuilder("spark.kubernetes.memoryOverheadFactor")

217

218

// Instance Configuration

219

val KUBERNETES_EXECUTOR_INSTANCES = ConfigBuilder("spark.kubernetes.executor.instances")

220

```

221

222

#### Storage and Volumes

223

```scala

224

// Local Directory Configuration

225

val KUBERNETES_LOCAL_DIRS_TMPFS = ConfigBuilder("spark.kubernetes.local.dirs.tmpfs")

226

227

// Volume Mount Prefixes

228

val KUBERNETES_VOLUMES_HOSTPATH_PREFIX = "spark.kubernetes.volume.hostPath"

229

val KUBERNETES_VOLUMES_PVC_PREFIX = "spark.kubernetes.volume.persistentVolumeClaim"

230

val KUBERNETES_VOLUMES_EMPTYDIR_PREFIX = "spark.kubernetes.volume.emptyDir"

231

```

232

233

## Constants and Defaults

234

235

### Constants Object { .api }

236

237

Kubernetes-specific constants and default values:

238

239

```scala

240

object Constants {

241

// Labels

242

val SPARK_APP_ID_LABEL = "spark-app-selector"

243

val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"

244

val SPARK_ROLE_LABEL = "spark-role"

245

val SPARK_VERSION_LABEL = "spark-version"

246

247

// Label Values

248

val SPARK_APP_NAME_LABEL = "spark-app-name"

249

val DRIVER_ROLE = "driver"

250

val EXECUTOR_ROLE = "executor"

251

252

// Ports

253

val DEFAULT_DRIVER_PORT = 7077

254

val DEFAULT_BLOCKMANAGER_PORT = 7078

255

val DEFAULT_UI_PORT = 4040

256

257

// Environment Variables

258

val ENV_DRIVER_URL = "SPARK_DRIVER_URL"

259

val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"

260

val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"

261

val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"

262

val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"

263

264

// Paths

265

val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"

266

val SPARK_CONF_FILE_NAME = "spark.conf"

267

val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf"

268

val KERBEROS_CONF_DIR_PATH = "/opt/kerberos"

269

270

// Resource Types

271

val APP_RESOURCE_TYPE_JAVA = "java"

272

val APP_RESOURCE_TYPE_PYTHON = "python"

273

val APP_RESOURCE_TYPE_R = "r"

274

275

// Container Names

276

val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"

277

val EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"

278

279

// Annotations

280

val CREATED_BY_ANNOTATION = "created-by"

281

val SPARK_APP_NAME_ANNOTATION = "spark-app-name"

282

}

283

```

284

285

### Default Values and Validation

286

287

```scala

288

// Image Pull Policies

289

object ImagePullPolicy extends Enumeration {

290

val Always, Never, IfNotPresent = Value

291

}

292

293

// Service Types

294

object ServiceType extends Enumeration {

295

val ClusterIP, NodePort, LoadBalancer = Value

296

}

297

298

// Restart Policies

299

object RestartPolicy extends Enumeration {

300

val Always, OnFailure, Never = Value

301

}

302

```

303

304

## Volume Configuration

305

306

### KubernetesVolumeSpec { .api }

307

308

Comprehensive volume specification supporting multiple Kubernetes volume types:

309

310

```scala

311

case class KubernetesVolumeSpec(

312

volumeName: String,

313

mountPath: String,

314

mountSubPath: String = "",

315

mountReadOnly: Boolean = false,

316

volumeConf: KubernetesVolumeSpecificConf

317

)

318

```

319

320

### Volume Types { .api }

321

322

```scala

323

// Host Path Volumes

324

case class KubernetesHostPathVolumeConf(

325

hostPath: String

326

) extends KubernetesVolumeSpecificConf

327

328

// Persistent Volume Claims

329

case class KubernetesPVCVolumeConf(

330

claimName: String

331

) extends KubernetesVolumeSpecificConf

332

333

// Empty Directory Volumes

334

case class KubernetesEmptyDirVolumeConf(

335

medium: Option[String] = None,

336

sizeLimit: Option[String] = None

337

) extends KubernetesVolumeSpecificConf

338

339

// NFS Volumes

340

case class KubernetesNFSVolumeConf(

341

server: String,

342

path: String

343

) extends KubernetesVolumeSpecificConf

344

```

345

346

### Volume Configuration Examples

347

348

```scala

349

// Host path volume

350

spark.conf.set("spark.kubernetes.volume.data.hostPath.path", "/host/data")

351

spark.conf.set("spark.kubernetes.volume.data.mount.path", "/data")

352

spark.conf.set("spark.kubernetes.volume.data.mount.readOnly", "true")

353

354

// PVC volume

355

spark.conf.set("spark.kubernetes.volume.storage.persistentVolumeClaim.claimName", "spark-pvc")

356

spark.conf.set("spark.kubernetes.volume.storage.mount.path", "/storage")

357

358

// Empty dir with size limit

359

spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.medium", "Memory")

360

spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.sizeLimit", "1Gi")

361

spark.conf.set("spark.kubernetes.volume.tmp.mount.path", "/tmp")

362

```

363

364

## Configuration Validation

365

366

### Validation Framework

367

368

```scala

369

object ConfigValidation {

370

371

def validateDriverConfig(conf: KubernetesDriverConf): Unit = {

372

validateBaseConfig(conf)

373

validateDriverSpecific(conf)

374

}

375

376

def validateExecutorConfig(conf: KubernetesExecutorConf): Unit = {

377

validateBaseConfig(conf)

378

validateExecutorSpecific(conf)

379

}

380

381

private def validateBaseConfig(conf: KubernetesConf): Unit = {

382

// Required configurations

383

require(conf.get(CONTAINER_IMAGE).nonEmpty,

384

"Container image must be specified with spark.kubernetes.container.image")

385

386

require(conf.namespace.nonEmpty,

387

"Kubernetes namespace must be specified")

388

389

// Resource validation

390

validateResourceLimits(conf)

391

validateVolumeConfiguration(conf)

392

validateNetworkConfiguration(conf)

393

}

394

395

private def validateResourceLimits(conf: KubernetesConf): Unit = {

396

// CPU validation

397

conf.get(KUBERNETES_DRIVER_LIMIT_CORES).foreach { cores =>

398

require(cores.toDouble > 0, "Driver CPU limit must be positive")

399

}

400

401

// Memory validation

402

conf.get(KUBERNETES_DRIVER_MEMORY).foreach { memory =>

403

require(Utils.byteStringAsBytes(memory) > 0, "Driver memory must be positive")

404

}

405

}

406

407

private def validateVolumeConfiguration(conf: KubernetesConf): Unit = {

408

conf.volumes.foreach { volume =>

409

require(volume.volumeName.nonEmpty, "Volume name cannot be empty")

410

require(volume.mountPath.nonEmpty, "Mount path cannot be empty")

411

require(volume.mountPath.startsWith("/"), "Mount path must be absolute")

412

}

413

}

414

}

415

```

416

417

### Common Validation Rules

418

419

```scala

420

// Image name validation

421

def validateImageName(image: String): Unit = {

422

require(image.nonEmpty, "Container image cannot be empty")

423

require(!image.contains("latest") || sparkConf.get(ALLOW_LATEST_IMAGE_TAG),

424

"Using 'latest' tag is discouraged in production")

425

}

426

427

// Namespace validation

428

def validateNamespace(namespace: String): Unit = {

429

require(namespace.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),

430

"Namespace must be a valid DNS-1123 label")

431

require(namespace.length <= 63, "Namespace must be 63 characters or less")

432

}

433

434

// Resource name validation

435

def validateResourceName(name: String): Unit = {

436

require(name.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),

437

"Resource name must be a valid DNS-1123 subdomain")

438

require(name.length <= 253, "Resource name must be 253 characters or less")

439

}

440

```

441

442

## Configuration Usage Patterns

443

444

### Dynamic Configuration

445

446

```scala

447

// Environment-based configuration

448

val conf = new SparkConf()

449

.set(KUBERNETES_NAMESPACE, sys.env.getOrElse("SPARK_NAMESPACE", "default"))

450

.set(CONTAINER_IMAGE, sys.env("SPARK_IMAGE"))

451

.set(KUBERNETES_SERVICE_ACCOUNT_NAME, sys.env.getOrElse("SERVICE_ACCOUNT", "spark"))

452

453

// Conditional configuration

454

if (conf.get(DYN_ALLOCATION_ENABLED)) {

455

conf.set(DYN_ALLOCATION_MIN_EXECUTORS, "2")

456

conf.set(DYN_ALLOCATION_MAX_EXECUTORS, "20")

457

}

458

```

459

460

### Configuration Templates

461

462

```scala

463

// Development configuration template

464

def createDevConfig(): SparkConf = new SparkConf()

465

.set(KUBERNETES_NAMESPACE, "spark-dev")

466

.set(CONTAINER_IMAGE, "spark:3.0.1-dev")

467

.set(KUBERNETES_DRIVER_MEMORY, "1g")

468

.set(KUBERNETES_EXECUTOR_INSTANCES, "2")

469

.set(KUBERNETES_EXECUTOR_MEMORY, "2g")

470

471

// Production configuration template

472

def createProdConfig(): SparkConf = new SparkConf()

473

.set(KUBERNETES_NAMESPACE, "spark-prod")

474

.set(CONTAINER_IMAGE, "my-org/spark:3.0.1-stable")

475

.set(KUBERNETES_DRIVER_MEMORY, "4g")

476

.set(KUBERNETES_EXECUTOR_INSTANCES, "10")

477

.set(KUBERNETES_EXECUTOR_MEMORY, "8g")

478

.set(KUBERNETES_DRIVER_LIMIT_CORES, "2")

479

.set(KUBERNETES_EXECUTOR_LIMIT_CORES, "4")

480

```

481

482

### Configuration Inheritance

483

484

```scala

485

// Base configuration

486

val baseConf = new SparkConf()

487

.set(KUBERNETES_NAMESPACE, "spark")

488

.set(CONTAINER_IMAGE, "spark:3.0.1")

489

490

// Driver-specific additions

491

val driverConf = baseConf.clone()

492

.set(KUBERNETES_DRIVER_MEMORY, "2g")

493

.set(KUBERNETES_DRIVER_SERVICE_TYPE, "LoadBalancer")

494

495

// Executor-specific additions

496

val executorConf = baseConf.clone()

497

.set(KUBERNETES_EXECUTOR_MEMORY, "4g")

498

.set(KUBERNETES_EXECUTOR_INSTANCES, "5")

499

```

500

501

## Advanced Configuration Features

502

503

### Prefix-Based Configuration

504

505

```scala

506

// Volume configuration using prefixes

507

spark.conf.set("spark.kubernetes.volume.data.hostPath.path", "/host/data")

508

spark.conf.set("spark.kubernetes.volume.logs.persistentVolumeClaim.claimName", "logs-pvc")

509

510

// Secret configuration using prefixes

511

spark.conf.set("spark.kubernetes.driver.secrets.db-secret", "/opt/secrets/db")

512

spark.conf.set("spark.kubernetes.executor.secrets.api-key", "/opt/secrets/api")

513

514

// Environment variable secrets

515

spark.conf.set("spark.kubernetes.driver.secretKeyRef.DB_PASSWORD", "db-secret:password")

516

spark.conf.set("spark.kubernetes.executor.secretKeyRef.API_TOKEN", "api-secret:token")

517

```

518

519

### Pod Template Integration

520

521

```scala

522

// Pod template file configuration

523

spark.conf.set("spark.kubernetes.driver.podTemplateFile", "/templates/driver-pod.yaml")

524

spark.conf.set("spark.kubernetes.executor.podTemplateFile", "/templates/executor-pod.yaml")

525

526

// Template merging with configuration

527

val templatePod = KubernetesUtils.loadPodFromTemplate(

528

kubernetesClient,

529

new File("/templates/driver-pod.yaml"),

530

Some(Constants.DRIVER_CONTAINER_NAME)

531

)

532

533

// Configuration takes precedence over template

534

val finalPod = mergePodTemplateWithConfig(templatePod, driverConf)

535

```

536

537

The configuration management system provides a robust, extensible foundation for customizing every aspect of Kubernetes deployments while maintaining compatibility with Spark's existing configuration patterns.