or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-management.mdconfiguration.mdindex.mdresource-management.mdscheduler-integration.md

configuration.mddocs/

0

# Configuration System

1

2

Comprehensive configuration system for YARN-specific settings including resource allocation, security, deployment options, and application parameters. Provides both programmatic configuration through SparkConf and command-line argument parsing.

3

4

## Capabilities

5

6

### YARN Configuration Properties

7

8

The YARN module provides extensive configuration options through the `config` package object.

9

10

```scala { .api }

11

/**

12

* YARN-specific configuration keys and defaults

13

* All configuration properties are accessed through SparkConf

14

*/

15

package object config {

16

17

/** YARN application tags for resource management and monitoring */

18

val APPLICATION_TAGS: ConfigEntry[Set[String]]

19

20

/** Application priority in YARN queue (0-10, higher = more priority) */

21

val APPLICATION_PRIORITY: ConfigEntry[Int]

22

23

/** YARN queue name for application submission */

24

val QUEUE_NAME: ConfigEntry[String]

25

26

/** Maximum application attempts allowed by YARN */

27

val MAX_APP_ATTEMPTS: ConfigEntry[Int]

28

29

/** ApplicationMaster memory allocation */

30

val AM_MEMORY: ConfigEntry[Long]

31

32

/** ApplicationMaster CPU core allocation */

33

val AM_CORES: ConfigEntry[Int]

34

35

/** ApplicationMaster memory overhead for YARN container */

36

val AM_MEMORY_OVERHEAD: ConfigEntry[Long]

37

38

/** Node label expression for executor placement */

39

val EXECUTOR_NODE_LABEL_EXPRESSION: OptionalConfigEntry[String]

40

41

/** Node label expression for ApplicationMaster placement */

42

val AM_NODE_LABEL_EXPRESSION: OptionalConfigEntry[String]

43

44

/** Location of Spark archive file for distribution */

45

val SPARK_ARCHIVE: OptionalConfigEntry[String]

46

47

/** Whether to wait for application completion */

48

val WAIT_FOR_APP_COMPLETION: ConfigEntry[Boolean]

49

50

/** Kerberos keytab file location for security */

51

val KEYTAB: OptionalConfigEntry[String]

52

53

/** Kerberos principal name for authentication */

54

val PRINCIPAL: OptionalConfigEntry[String]

55

56

/** Staging directory for application files */

57

val STAGING_DIR: OptionalConfigEntry[String]

58

59

/** Executor failure validity interval */

60

val EXECUTOR_FAILURES_VALIDITY_INTERVAL: ConfigEntry[Long]

61

62

/** Maximum executor failures before application failure */

63

val MAX_EXECUTOR_FAILURES: ConfigEntry[Int]

64

65

/** ApplicationMaster attempt failures validity interval */

66

val AM_ATTEMPT_FAILURES_VALIDITY_INTERVAL: ConfigEntry[Long]

67

68

/** Container launcher maximum threads */

69

val CONTAINER_LAUNCHER_MAX_THREADS: ConfigEntry[Int]

70

71

/** Scheduler heartbeat interval */

72

val SCHEDULER_HEARTBEAT_INTERVAL: ConfigEntry[Long]

73

74

/** Initial allocation interval */

75

val SCHEDULER_INITIAL_ALLOCATION_INTERVAL: ConfigEntry[Long]

76

77

/** Whether to preserve staging files after completion */

78

val PRESERVE_STAGING_FILES: ConfigEntry[Boolean]

79

80

/** File replication factor for staging files */

81

val STAGING_FILE_REPLICATION: ConfigEntry[Short]

82

83

/** Whether to roll application master logs */

84

val AM_LOG_ROLL_ENABLE: ConfigEntry[Boolean]

85

86

/** Application master log roll size */

87

val AM_LOG_ROLL_SIZE: ConfigEntry[Long]

88

89

/** Application master log roll interval */

90

val AM_LOG_ROLL_INTERVAL: ConfigEntry[Long]

91

}

92

```

93

94

### Configuration Usage Examples

95

96

#### Basic YARN Configuration

97

98

```scala

99

import org.apache.spark.SparkConf

100

101

val conf = new SparkConf()

102

.setMaster("yarn")

103

.setAppName("BasicYarnConfiguration")

104

// Application settings

105

.set("spark.yarn.queue", "production")

106

.set("spark.yarn.tags", "spark,analytics,batch")

107

.set("spark.yarn.priority", "5")

108

// ApplicationMaster settings

109

.set("spark.yarn.am.memory", "2g")

110

.set("spark.yarn.am.cores", "2")

111

.set("spark.yarn.am.memoryOverhead", "512m")

112

// Executor settings

113

.set("spark.executor.instances", "10")

114

.set("spark.executor.memory", "4g")

115

.set("spark.executor.cores", "2")

116

.set("spark.executor.memoryOverhead", "1g")

117

```

118

119

#### Security Configuration

120

121

```scala

122

import org.apache.spark.SparkConf

123

124

val conf = new SparkConf()

125

.setMaster("yarn")

126

.setAppName("SecureYarnConfiguration")

127

// Kerberos authentication

128

.set("spark.yarn.keytab", "/path/to/user.keytab")

129

.set("spark.yarn.principal", "user@REALM.COM")

130

// Security settings

131

.set("spark.authenticate", "true")

132

.set("spark.authenticate.secret", "shared-secret")

133

.set("spark.network.crypto.enabled", "true")

134

.set("spark.io.encryption.enabled", "true")

135

// YARN security integration

136

.set("spark.yarn.security.credentials.hive.enabled", "true")

137

.set("spark.yarn.security.credentials.hbase.enabled", "true")

138

```

139

140

#### Advanced Resource Configuration

141

142

```scala

143

import org.apache.spark.SparkConf

144

145

val conf = new SparkConf()

146

.setMaster("yarn")

147

.setAppName("AdvancedResourceConfiguration")

148

// Node labeling and placement

149

.set("spark.yarn.executor.nodeLabelExpression", "compute-nodes")

150

.set("spark.yarn.am.nodeLabelExpression", "management-nodes")

151

// Custom resources (GPUs, FPGAs, etc.)

152

.set("spark.executor.resource.gpu.amount", "1")

153

.set("spark.executor.resource.gpu.discoveryScript", "/opt/spark/gpu-discovery.sh")

154

// Container settings

155

.set("spark.yarn.containerLauncherMaxThreads", "50")

156

.set("spark.yarn.scheduler.heartbeat.interval-ms", "5000")

157

.set("spark.yarn.scheduler.initial-allocation.interval", "100ms")

158

// Failure handling

159

.set("spark.yarn.maxAppAttempts", "3")

160

.set("spark.yarn.max.executor.failures", "10")

161

.set("spark.yarn.executor.failuresValidityInterval", "2h")

162

```

163

164

### ClientArguments

165

166

Argument parser for YARN client applications with support for various application types.

167

168

```scala { .api }

169

/**

170

* Argument parser for YARN client

171

* Handles command-line arguments for application submission

172

* @param args Command line arguments array

173

*/

174

class ClientArguments(args: Array[String]) {

175

176

/** User application JAR file path */

177

var userJar: String = null

178

179

/** Main class to execute */

180

var userClass: String = null

181

182

/** Primary Python file for PySpark applications */

183

var primaryPyFile: String = null

184

185

/** Primary R file for SparkR applications */

186

var primaryRFile: String = null

187

188

/** User application arguments */

189

var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()

190

191

/** Enable verbose logging */

192

var verbose: Boolean = false

193

194

/** Application name */

195

var name: String = "Spark"

196

197

/** Spark properties file */

198

var propertiesFile: String = null

199

200

/** Additional Python files */

201

var pyFiles: String = null

202

203

/** Additional files to distribute */

204

var files: String = null

205

206

/** Additional archives to distribute */

207

var archives: String = null

208

}

209

```

210

211

**Usage Example:**

212

213

```scala

214

import org.apache.spark.deploy.yarn.ClientArguments

215

216

// Parse command line arguments

217

val args = Array(

218

"--jar", "/path/to/myapp.jar",

219

"--class", "com.example.MyMainClass",

220

"--name", "MySparkApplication",

221

"--files", "config.properties,data.txt",

222

"--py-files", "utils.py,helpers.py",

223

"--archives", "data.zip#data",

224

"--verbose",

225

"--", "app-arg1", "app-arg2" // Application arguments after --

226

)

227

228

val clientArgs = new ClientArguments(args)

229

println(s"JAR: ${clientArgs.userJar}")

230

println(s"Main class: ${clientArgs.userClass}")

231

println(s"App args: ${clientArgs.userArgs}")

232

```

233

234

### ApplicationMasterArguments

235

236

Argument parser for ApplicationMaster with comprehensive application metadata.

237

238

```scala { .api }

239

/**

240

* Argument parser for ApplicationMaster

241

* Handles ApplicationMaster startup arguments and configuration

242

* @param args Command line arguments array

243

*/

244

class ApplicationMasterArguments(args: Array[String]) {

245

246

/** User application JAR file path */

247

var userJar: String = null

248

249

/** Main class name to execute */

250

var userClass: String = null

251

252

/** Primary Python file for PySpark */

253

var primaryPyFile: String = null

254

255

/** Primary R file for SparkR */

256

var primaryRFile: String = null

257

258

/** User application arguments sequence */

259

var userArgs: Seq[String] = Nil

260

261

/** Spark properties file path */

262

var propertiesFile: String = null

263

264

/** Distributed cache configuration */

265

var distCacheConf: String = null

266

267

/** Additional Python files */

268

var pyFiles: String = null

269

270

/** Additional files to distribute */

271

var files: String = null

272

273

/** Additional archives to distribute */

274

var archives: String = null

275

276

/** Executor memory setting */

277

var executorMemory: String = "1g"

278

279

/** Executor cores setting */

280

var executorCores: Int = 1

281

282

/** Number of executors */

283

var numExecutors: Int = 2

284

}

285

```

286

287

**Usage Example:**

288

289

```scala

290

import org.apache.spark.deploy.yarn.ApplicationMasterArguments

291

292

// ApplicationMaster arguments (typically set by YARN)

293

val amArgs = Array(

294

"--jar", "hdfs://cluster/apps/myapp.jar",

295

"--class", "com.example.MyMainClass",

296

"--properties-file", "__spark_conf__/spark-defaults.conf",

297

"--dist-cache-conf", "cache-config.txt",

298

"--executor-memory", "4g",

299

"--executor-cores", "2",

300

"--num-executors", "10",

301

"--", "user-arg1", "user-arg2"

302

)

303

304

val amArguments = new ApplicationMasterArguments(amArgs)

305

println(s"Executor memory: ${amArguments.executorMemory}")

306

println(s"Executor cores: ${amArguments.executorCores}")

307

println(s"Number of executors: ${amArguments.numExecutors}")

308

```

309

310

## Configuration Patterns

311

312

### Environment-Specific Configuration

313

314

```scala

315

import org.apache.spark.SparkConf

316

317

def createYarnConfig(environment: String): SparkConf = {

318

val baseConf = new SparkConf()

319

.setMaster("yarn")

320

.setAppName(s"MyApp-$environment")

321

322

environment match {

323

case "development" =>

324

baseConf

325

.set("spark.yarn.queue", "dev")

326

.set("spark.executor.instances", "2")

327

.set("spark.executor.memory", "2g")

328

.set("spark.yarn.am.memory", "1g")

329

330

case "staging" =>

331

baseConf

332

.set("spark.yarn.queue", "staging")

333

.set("spark.executor.instances", "5")

334

.set("spark.executor.memory", "4g")

335

.set("spark.yarn.am.memory", "2g")

336

.set("spark.yarn.max.executor.failures", "3")

337

338

case "production" =>

339

baseConf

340

.set("spark.yarn.queue", "production")

341

.set("spark.executor.instances", "20")

342

.set("spark.executor.memory", "8g")

343

.set("spark.yarn.am.memory", "4g")

344

.set("spark.yarn.max.executor.failures", "10")

345

.set("spark.yarn.keytab", "/etc/security/keytabs/spark.keytab")

346

.set("spark.yarn.principal", "spark-user@PRODUCTION.COM")

347

348

case _ =>

349

throw new IllegalArgumentException(s"Unknown environment: $environment")

350

}

351

}

352

```

353

354

### Configuration Validation

355

356

```scala

357

import org.apache.spark.SparkConf

358

359

def validateYarnConfiguration(conf: SparkConf): Unit = {

360

// Validate required settings

361

require(conf.get("spark.master", "").startsWith("yarn"),

362

"Master must be 'yarn' for YARN deployment")

363

364

// Validate memory settings

365

val amMemory = conf.get("spark.yarn.am.memory", "512m")

366

val executorMemory = conf.get("spark.executor.memory", "1g")

367

368

require(parseMemory(amMemory) >= 512,

369

"ApplicationMaster memory must be at least 512MB")

370

require(parseMemory(executorMemory) >= 1024,

371

"Executor memory must be at least 1GB")

372

373

// Validate core settings

374

val amCores = conf.getInt("spark.yarn.am.cores", 1)

375

val executorCores = conf.getInt("spark.executor.cores", 1)

376

377

require(amCores >= 1 && amCores <= 8,

378

"ApplicationMaster cores must be between 1 and 8")

379

require(executorCores >= 1 && executorCores <= 32,

380

"Executor cores must be between 1 and 32")

381

382

// Validate security settings if enabled

383

if (conf.getBoolean("spark.authenticate", false)) {

384

require(conf.contains("spark.yarn.keytab") && conf.contains("spark.yarn.principal"),

385

"Kerberos authentication requires both keytab and principal")

386

}

387

}

388

389

def parseMemory(memoryStr: String): Long = {

390

// Simple memory parsing logic

391

val pattern = """(\d+)([gmk]?)""".r

392

memoryStr.toLowerCase match {

393

case pattern(amount, unit) =>

394

val multiplier = unit match {

395

case "g" => 1024L * 1024 * 1024

396

case "m" => 1024L * 1024

397

case "k" => 1024L

398

case "" => 1L

399

}

400

amount.toLong * multiplier

401

case _ => throw new IllegalArgumentException(s"Invalid memory format: $memoryStr")

402

}

403

}

404

```

405

406

### Dynamic Configuration Updates

407

408

```scala

409

import org.apache.spark.SparkConf

410

import org.apache.spark.SparkContext

411

412

// Configuration that can be updated at runtime

413

def updateDynamicAllocation(sc: SparkContext,

414

minExecutors: Int,

415

maxExecutors: Int): Unit = {

416

417

// Update dynamic allocation settings

418

sc.conf.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)

419

sc.conf.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)

420

421

// Request executor changes

422

val targetExecutors = Math.max(minExecutors,

423

Math.min(maxExecutors, sc.executorIds.size))

424

425

if (targetExecutors > sc.executorIds.size) {

426

sc.requestTotalExecutors(targetExecutors, 0, Map.empty)

427

} else if (targetExecutors < sc.executorIds.size) {

428

val executorsToRemove = sc.executorIds.take(sc.executorIds.size - targetExecutors)

429

sc.killExecutors(executorsToRemove.toSeq)

430

}

431

}

432

```

433

434

### Configuration Templates

435

436

```scala

437

import org.apache.spark.SparkConf

438

439

object YarnConfigTemplates {

440

441

/** Template for batch processing workloads */

442

def batchProcessingConfig(appName: String): SparkConf = {

443

new SparkConf()

444

.setMaster("yarn")

445

.setAppName(appName)

446

.set("spark.yarn.queue", "batch")

447

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

448

.set("spark.sql.adaptive.enabled", "true")

449

.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

450

.set("spark.executor.instances", "10")

451

.set("spark.executor.memory", "6g")

452

.set("spark.executor.cores", "3")

453

.set("spark.executor.memoryOverhead", "1g")

454

.set("spark.yarn.am.memory", "2g")

455

.set("spark.yarn.am.cores", "2")

456

}

457

458

/** Template for streaming workloads */

459

def streamingConfig(appName: String): SparkConf = {

460

new SparkConf()

461

.setMaster("yarn")

462

.setAppName(appName)

463

.set("spark.yarn.queue", "streaming")

464

.set("spark.streaming.backpressure.enabled", "true")

465

.set("spark.streaming.kafka.maxRatePerPartition", "1000")

466

.set("spark.streaming.dynamicAllocation.enabled", "true")

467

.set("spark.executor.instances", "5")

468

.set("spark.executor.memory", "4g")

469

.set("spark.executor.cores", "2")

470

.set("spark.yarn.am.memory", "1g")

471

}

472

473

/** Template for machine learning workloads */

474

def mlConfig(appName: String): SparkConf = {

475

new SparkConf()

476

.setMaster("yarn")

477

.setAppName(appName)

478

.set("spark.yarn.queue", "ml")

479

.set("spark.executor.instances", "20")

480

.set("spark.executor.memory", "8g")

481

.set("spark.executor.cores", "4")

482

.set("spark.executor.memoryOverhead", "2g")

483

.set("spark.executor.resource.gpu.amount", "1")

484

.set("spark.task.resource.gpu.amount", "0.25")

485

.set("spark.yarn.am.memory", "4g")

486

.set("spark.yarn.am.cores", "2")

487

}

488

}

489

```

490

491

## Configuration Best Practices

492

493

### Resource Sizing Guidelines

494

495

```scala

496

import org.apache.spark.SparkConf

497

498

def calculateOptimalResources(

499

totalDataSize: Long,

500

availableNodes: Int,

501

coresPerNode: Int,

502

memoryPerNode: Long

503

): SparkConf = {

504

505

// Calculate optimal executor count and sizing

506

val optimalExecutorsPerNode = Math.max(1, coresPerNode / 4) // 4 cores per executor

507

val totalExecutors = availableNodes * optimalExecutorsPerNode

508

val executorCores = Math.min(5, coresPerNode / optimalExecutorsPerNode) // Max 5 cores

509

val executorMemory = (memoryPerNode * 0.8 / optimalExecutorsPerNode).toLong // 80% of node memory

510

val memoryOverhead = Math.max(384, (executorMemory * 0.1).toLong) // 10% overhead, min 384MB

511

512

new SparkConf()

513

.setMaster("yarn")

514

.set("spark.executor.instances", totalExecutors.toString)

515

.set("spark.executor.cores", executorCores.toString)

516

.set("spark.executor.memory", s"${executorMemory}m")

517

.set("spark.executor.memoryOverhead", s"${memoryOverhead}m")

518

.set("spark.yarn.am.memory", "2g")

519

.set("spark.yarn.am.cores", "2")

520

// Additional optimizations based on data size

521

.set("spark.sql.files.maxPartitionBytes",

522

Math.min(134217728, totalDataSize / (totalExecutors * executorCores)).toString) // 128MB max

523

}

524

```