0
# Configuration System
1
2
Comprehensive configuration system for YARN-specific settings including resource allocation, security, deployment options, and application parameters. Provides both programmatic configuration through SparkConf and command-line argument parsing.
3
4
## Capabilities
5
6
### YARN Configuration Properties
7
8
The YARN module provides extensive configuration options through the `config` package object.
9
10
```scala { .api }
11
/**
12
* YARN-specific configuration keys and defaults
13
* All configuration properties are accessed through SparkConf
14
*/
15
package object config {
16
17
/** YARN application tags for resource management and monitoring */
18
val APPLICATION_TAGS: ConfigEntry[Set[String]]
19
20
/** Application priority in YARN queue (0-10, higher = more priority) */
21
val APPLICATION_PRIORITY: ConfigEntry[Int]
22
23
/** YARN queue name for application submission */
24
val QUEUE_NAME: ConfigEntry[String]
25
26
/** Maximum application attempts allowed by YARN */
27
val MAX_APP_ATTEMPTS: ConfigEntry[Int]
28
29
/** ApplicationMaster memory allocation */
30
val AM_MEMORY: ConfigEntry[Long]
31
32
/** ApplicationMaster CPU core allocation */
33
val AM_CORES: ConfigEntry[Int]
34
35
/** ApplicationMaster memory overhead for YARN container */
36
val AM_MEMORY_OVERHEAD: ConfigEntry[Long]
37
38
/** Node label expression for executor placement */
39
val EXECUTOR_NODE_LABEL_EXPRESSION: OptionalConfigEntry[String]
40
41
/** Node label expression for ApplicationMaster placement */
42
val AM_NODE_LABEL_EXPRESSION: OptionalConfigEntry[String]
43
44
/** Location of Spark archive file for distribution */
45
val SPARK_ARCHIVE: OptionalConfigEntry[String]
46
47
/** Whether to wait for application completion */
48
val WAIT_FOR_APP_COMPLETION: ConfigEntry[Boolean]
49
50
/** Kerberos keytab file location for security */
51
val KEYTAB: OptionalConfigEntry[String]
52
53
/** Kerberos principal name for authentication */
54
val PRINCIPAL: OptionalConfigEntry[String]
55
56
/** Staging directory for application files */
57
val STAGING_DIR: OptionalConfigEntry[String]
58
59
/** Executor failure validity interval */
60
val EXECUTOR_FAILURES_VALIDITY_INTERVAL: ConfigEntry[Long]
61
62
/** Maximum executor failures before application failure */
63
val MAX_EXECUTOR_FAILURES: ConfigEntry[Int]
64
65
/** ApplicationMaster attempt failures validity interval */
66
val AM_ATTEMPT_FAILURES_VALIDITY_INTERVAL: ConfigEntry[Long]
67
68
/** Container launcher maximum threads */
69
val CONTAINER_LAUNCHER_MAX_THREADS: ConfigEntry[Int]
70
71
/** Scheduler heartbeat interval */
72
val SCHEDULER_HEARTBEAT_INTERVAL: ConfigEntry[Long]
73
74
/** Initial allocation interval */
75
val SCHEDULER_INITIAL_ALLOCATION_INTERVAL: ConfigEntry[Long]
76
77
/** Whether to preserve staging files after completion */
78
val PRESERVE_STAGING_FILES: ConfigEntry[Boolean]
79
80
/** File replication factor for staging files */
81
val STAGING_FILE_REPLICATION: ConfigEntry[Short]
82
83
/** Whether to roll application master logs */
84
val AM_LOG_ROLL_ENABLE: ConfigEntry[Boolean]
85
86
/** Application master log roll size */
87
val AM_LOG_ROLL_SIZE: ConfigEntry[Long]
88
89
/** Application master log roll interval */
90
val AM_LOG_ROLL_INTERVAL: ConfigEntry[Long]
91
}
92
```
93
94
### Configuration Usage Examples
95
96
#### Basic YARN Configuration
97
98
```scala
99
import org.apache.spark.SparkConf
100
101
val conf = new SparkConf()
102
.setMaster("yarn")
103
.setAppName("BasicYarnConfiguration")
104
// Application settings
105
.set("spark.yarn.queue", "production")
106
.set("spark.yarn.tags", "spark,analytics,batch")
107
.set("spark.yarn.priority", "5")
108
// ApplicationMaster settings
109
.set("spark.yarn.am.memory", "2g")
110
.set("spark.yarn.am.cores", "2")
111
.set("spark.yarn.am.memoryOverhead", "512m")
112
// Executor settings
113
.set("spark.executor.instances", "10")
114
.set("spark.executor.memory", "4g")
115
.set("spark.executor.cores", "2")
116
.set("spark.executor.memoryOverhead", "1g")
117
```
118
119
#### Security Configuration
120
121
```scala
122
import org.apache.spark.SparkConf
123
124
val conf = new SparkConf()
125
.setMaster("yarn")
126
.setAppName("SecureYarnConfiguration")
127
// Kerberos authentication
128
.set("spark.yarn.keytab", "/path/to/user.keytab")
129
.set("spark.yarn.principal", "user@REALM.COM")
130
// Security settings
131
.set("spark.authenticate", "true")
132
.set("spark.authenticate.secret", "shared-secret")
133
.set("spark.network.crypto.enabled", "true")
134
.set("spark.io.encryption.enabled", "true")
135
// YARN security integration
136
.set("spark.yarn.security.credentials.hive.enabled", "true")
137
.set("spark.yarn.security.credentials.hbase.enabled", "true")
138
```
139
140
#### Advanced Resource Configuration
141
142
```scala
143
import org.apache.spark.SparkConf
144
145
val conf = new SparkConf()
146
.setMaster("yarn")
147
.setAppName("AdvancedResourceConfiguration")
148
// Node labeling and placement
149
.set("spark.yarn.executor.nodeLabelExpression", "compute-nodes")
150
.set("spark.yarn.am.nodeLabelExpression", "management-nodes")
151
// Custom resources (GPUs, FPGAs, etc.)
152
.set("spark.executor.resource.gpu.amount", "1")
153
.set("spark.executor.resource.gpu.discoveryScript", "/opt/spark/gpu-discovery.sh")
154
// Container settings
155
.set("spark.yarn.containerLauncherMaxThreads", "50")
156
.set("spark.yarn.scheduler.heartbeat.interval-ms", "5000")
157
.set("spark.yarn.scheduler.initial-allocation.interval", "100ms")
158
// Failure handling
159
.set("spark.yarn.maxAppAttempts", "3")
160
.set("spark.yarn.max.executor.failures", "10")
161
.set("spark.yarn.executor.failuresValidityInterval", "2h")
162
```
163
164
### ClientArguments
165
166
Argument parser for YARN client applications with support for various application types.
167
168
```scala { .api }
169
/**
170
* Argument parser for YARN client
171
* Handles command-line arguments for application submission
172
* @param args Command line arguments array
173
*/
174
class ClientArguments(args: Array[String]) {
175
176
/** User application JAR file path */
177
var userJar: String = null
178
179
/** Main class to execute */
180
var userClass: String = null
181
182
/** Primary Python file for PySpark applications */
183
var primaryPyFile: String = null
184
185
/** Primary R file for SparkR applications */
186
var primaryRFile: String = null
187
188
/** User application arguments */
189
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
190
191
/** Enable verbose logging */
192
var verbose: Boolean = false
193
194
/** Application name */
195
var name: String = "Spark"
196
197
/** Spark properties file */
198
var propertiesFile: String = null
199
200
/** Additional Python files */
201
var pyFiles: String = null
202
203
/** Additional files to distribute */
204
var files: String = null
205
206
/** Additional archives to distribute */
207
var archives: String = null
208
}
209
```
210
211
**Usage Example:**
212
213
```scala
214
import org.apache.spark.deploy.yarn.ClientArguments
215
216
// Parse command line arguments
217
val args = Array(
218
"--jar", "/path/to/myapp.jar",
219
"--class", "com.example.MyMainClass",
220
"--name", "MySparkApplication",
221
"--files", "config.properties,data.txt",
222
"--py-files", "utils.py,helpers.py",
223
"--archives", "data.zip#data",
224
"--verbose",
225
"--", "app-arg1", "app-arg2" // Application arguments after --
226
)
227
228
val clientArgs = new ClientArguments(args)
229
println(s"JAR: ${clientArgs.userJar}")
230
println(s"Main class: ${clientArgs.userClass}")
231
println(s"App args: ${clientArgs.userArgs}")
232
```
233
234
### ApplicationMasterArguments
235
236
Argument parser for ApplicationMaster with comprehensive application metadata.
237
238
```scala { .api }
239
/**
240
* Argument parser for ApplicationMaster
241
* Handles ApplicationMaster startup arguments and configuration
242
* @param args Command line arguments array
243
*/
244
class ApplicationMasterArguments(args: Array[String]) {
245
246
/** User application JAR file path */
247
var userJar: String = null
248
249
/** Main class name to execute */
250
var userClass: String = null
251
252
/** Primary Python file for PySpark */
253
var primaryPyFile: String = null
254
255
/** Primary R file for SparkR */
256
var primaryRFile: String = null
257
258
/** User application arguments sequence */
259
var userArgs: Seq[String] = Nil
260
261
/** Spark properties file path */
262
var propertiesFile: String = null
263
264
/** Distributed cache configuration */
265
var distCacheConf: String = null
266
267
/** Additional Python files */
268
var pyFiles: String = null
269
270
/** Additional files to distribute */
271
var files: String = null
272
273
/** Additional archives to distribute */
274
var archives: String = null
275
276
/** Executor memory setting */
277
var executorMemory: String = "1g"
278
279
/** Executor cores setting */
280
var executorCores: Int = 1
281
282
/** Number of executors */
283
var numExecutors: Int = 2
284
}
285
```
286
287
**Usage Example:**
288
289
```scala
290
import org.apache.spark.deploy.yarn.ApplicationMasterArguments
291
292
// ApplicationMaster arguments (typically set by YARN)
293
val amArgs = Array(
294
"--jar", "hdfs://cluster/apps/myapp.jar",
295
"--class", "com.example.MyMainClass",
296
"--properties-file", "__spark_conf__/spark-defaults.conf",
297
"--dist-cache-conf", "cache-config.txt",
298
"--executor-memory", "4g",
299
"--executor-cores", "2",
300
"--num-executors", "10",
301
"--", "user-arg1", "user-arg2"
302
)
303
304
val amArguments = new ApplicationMasterArguments(amArgs)
305
println(s"Executor memory: ${amArguments.executorMemory}")
306
println(s"Executor cores: ${amArguments.executorCores}")
307
println(s"Number of executors: ${amArguments.numExecutors}")
308
```
309
310
## Configuration Patterns
311
312
### Environment-Specific Configuration
313
314
```scala
315
import org.apache.spark.SparkConf
316
317
def createYarnConfig(environment: String): SparkConf = {
318
val baseConf = new SparkConf()
319
.setMaster("yarn")
320
.setAppName(s"MyApp-$environment")
321
322
environment match {
323
case "development" =>
324
baseConf
325
.set("spark.yarn.queue", "dev")
326
.set("spark.executor.instances", "2")
327
.set("spark.executor.memory", "2g")
328
.set("spark.yarn.am.memory", "1g")
329
330
case "staging" =>
331
baseConf
332
.set("spark.yarn.queue", "staging")
333
.set("spark.executor.instances", "5")
334
.set("spark.executor.memory", "4g")
335
.set("spark.yarn.am.memory", "2g")
336
.set("spark.yarn.max.executor.failures", "3")
337
338
case "production" =>
339
baseConf
340
.set("spark.yarn.queue", "production")
341
.set("spark.executor.instances", "20")
342
.set("spark.executor.memory", "8g")
343
.set("spark.yarn.am.memory", "4g")
344
.set("spark.yarn.max.executor.failures", "10")
345
.set("spark.yarn.keytab", "/etc/security/keytabs/spark.keytab")
346
.set("spark.yarn.principal", "spark-user@PRODUCTION.COM")
347
348
case _ =>
349
throw new IllegalArgumentException(s"Unknown environment: $environment")
350
}
351
}
352
```
353
354
### Configuration Validation
355
356
```scala
357
import org.apache.spark.SparkConf
358
359
def validateYarnConfiguration(conf: SparkConf): Unit = {
360
// Validate required settings
361
require(conf.get("spark.master", "").startsWith("yarn"),
362
"Master must be 'yarn' for YARN deployment")
363
364
// Validate memory settings
365
val amMemory = conf.get("spark.yarn.am.memory", "512m")
366
val executorMemory = conf.get("spark.executor.memory", "1g")
367
368
require(parseMemory(amMemory) >= 512,
369
"ApplicationMaster memory must be at least 512MB")
370
require(parseMemory(executorMemory) >= 1024,
371
"Executor memory must be at least 1GB")
372
373
// Validate core settings
374
val amCores = conf.getInt("spark.yarn.am.cores", 1)
375
val executorCores = conf.getInt("spark.executor.cores", 1)
376
377
require(amCores >= 1 && amCores <= 8,
378
"ApplicationMaster cores must be between 1 and 8")
379
require(executorCores >= 1 && executorCores <= 32,
380
"Executor cores must be between 1 and 32")
381
382
// Validate security settings if enabled
383
if (conf.getBoolean("spark.authenticate", false)) {
384
require(conf.contains("spark.yarn.keytab") && conf.contains("spark.yarn.principal"),
385
"Kerberos authentication requires both keytab and principal")
386
}
387
}
388
389
def parseMemory(memoryStr: String): Long = {
390
// Simple memory parsing logic
391
val pattern = """(\d+)([gmk]?)""".r
392
memoryStr.toLowerCase match {
393
case pattern(amount, unit) =>
394
val multiplier = unit match {
395
case "g" => 1024L * 1024 * 1024
396
case "m" => 1024L * 1024
397
case "k" => 1024L
398
case "" => 1L
399
}
400
amount.toLong * multiplier
401
case _ => throw new IllegalArgumentException(s"Invalid memory format: $memoryStr")
402
}
403
}
404
```
405
406
### Dynamic Configuration Updates
407
408
```scala
409
import org.apache.spark.SparkConf
410
import org.apache.spark.SparkContext
411
412
// Configuration that can be updated at runtime
413
def updateDynamicAllocation(sc: SparkContext,
414
minExecutors: Int,
415
maxExecutors: Int): Unit = {
416
417
// Update dynamic allocation settings
418
sc.conf.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
419
sc.conf.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
420
421
// Request executor changes
422
val targetExecutors = Math.max(minExecutors,
423
Math.min(maxExecutors, sc.executorIds.size))
424
425
if (targetExecutors > sc.executorIds.size) {
426
sc.requestTotalExecutors(targetExecutors, 0, Map.empty)
427
} else if (targetExecutors < sc.executorIds.size) {
428
val executorsToRemove = sc.executorIds.take(sc.executorIds.size - targetExecutors)
429
sc.killExecutors(executorsToRemove.toSeq)
430
}
431
}
432
```
433
434
### Configuration Templates
435
436
```scala
437
import org.apache.spark.SparkConf
438
439
object YarnConfigTemplates {
440
441
/** Template for batch processing workloads */
442
def batchProcessingConfig(appName: String): SparkConf = {
443
new SparkConf()
444
.setMaster("yarn")
445
.setAppName(appName)
446
.set("spark.yarn.queue", "batch")
447
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
448
.set("spark.sql.adaptive.enabled", "true")
449
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
450
.set("spark.executor.instances", "10")
451
.set("spark.executor.memory", "6g")
452
.set("spark.executor.cores", "3")
453
.set("spark.executor.memoryOverhead", "1g")
454
.set("spark.yarn.am.memory", "2g")
455
.set("spark.yarn.am.cores", "2")
456
}
457
458
/** Template for streaming workloads */
459
def streamingConfig(appName: String): SparkConf = {
460
new SparkConf()
461
.setMaster("yarn")
462
.setAppName(appName)
463
.set("spark.yarn.queue", "streaming")
464
.set("spark.streaming.backpressure.enabled", "true")
465
.set("spark.streaming.kafka.maxRatePerPartition", "1000")
466
.set("spark.streaming.dynamicAllocation.enabled", "true")
467
.set("spark.executor.instances", "5")
468
.set("spark.executor.memory", "4g")
469
.set("spark.executor.cores", "2")
470
.set("spark.yarn.am.memory", "1g")
471
}
472
473
/** Template for machine learning workloads */
474
def mlConfig(appName: String): SparkConf = {
475
new SparkConf()
476
.setMaster("yarn")
477
.setAppName(appName)
478
.set("spark.yarn.queue", "ml")
479
.set("spark.executor.instances", "20")
480
.set("spark.executor.memory", "8g")
481
.set("spark.executor.cores", "4")
482
.set("spark.executor.memoryOverhead", "2g")
483
.set("spark.executor.resource.gpu.amount", "1")
484
.set("spark.task.resource.gpu.amount", "0.25")
485
.set("spark.yarn.am.memory", "4g")
486
.set("spark.yarn.am.cores", "2")
487
}
488
}
489
```
490
491
## Configuration Best Practices
492
493
### Resource Sizing Guidelines
494
495
```scala
496
import org.apache.spark.SparkConf
497
498
def calculateOptimalResources(
499
totalDataSize: Long,
500
availableNodes: Int,
501
coresPerNode: Int,
502
memoryPerNode: Long
503
): SparkConf = {
504
505
// Calculate optimal executor count and sizing
506
val optimalExecutorsPerNode = Math.max(1, coresPerNode / 4) // 4 cores per executor
507
val totalExecutors = availableNodes * optimalExecutorsPerNode
508
val executorCores = Math.min(5, coresPerNode / optimalExecutorsPerNode) // Max 5 cores
509
val executorMemory = (memoryPerNode * 0.8 / optimalExecutorsPerNode).toLong // 80% of node memory
510
val memoryOverhead = Math.max(384, (executorMemory * 0.1).toLong) // 10% overhead, min 384MB
511
512
new SparkConf()
513
.setMaster("yarn")
514
.set("spark.executor.instances", totalExecutors.toString)
515
.set("spark.executor.cores", executorCores.toString)
516
.set("spark.executor.memory", s"${executorMemory}m")
517
.set("spark.executor.memoryOverhead", s"${memoryOverhead}m")
518
.set("spark.yarn.am.memory", "2g")
519
.set("spark.yarn.am.cores", "2")
520
// Additional optimizations based on data size
521
.set("spark.sql.files.maxPartitionBytes",
522
Math.min(134217728, totalDataSize / (totalExecutors * executorCores)).toString) // 128MB max
523
}
524
```