0
# Configuration System
1
2
YARN-specific configuration options for controlling resource allocation, security, and deployment behavior. This module provides comprehensive configuration management for all aspects of YARN integration.
3
4
## Capabilities
5
6
### Core Configuration Entries
7
8
Key configuration entries for YARN integration, organized by functional area.
9
10
```scala { .api }
11
// Application metadata and submission
12
val APPLICATION_TAGS: ConfigEntry[Seq[String]]
13
val MAX_APP_ATTEMPTS: ConfigEntry[Int]
14
val QUEUE_NAME: ConfigEntry[String]
15
val PRIORITY: ConfigEntry[Int]
16
17
// Resource allocation
18
val EXECUTOR_INSTANCES: ConfigEntry[Int]
19
val EXECUTOR_CORES: ConfigEntry[Int]
20
val EXECUTOR_MEMORY: ConfigEntry[String]
21
val EXECUTOR_MEMORY_OVERHEAD: OptionalConfigEntry[Long]
22
23
// ApplicationMaster configuration
24
val AM_MEMORY: ConfigEntry[String]
25
val AM_CORES: ConfigEntry[Int]
26
val AM_MEMORY_OVERHEAD: ConfigEntry[Long]
27
28
// File distribution and staging
29
val SPARK_ARCHIVE: OptionalConfigEntry[String]
30
val ARCHIVES: OptionalConfigEntry[Seq[String]]
31
val FILES: OptionalConfigEntry[Seq[String]]
32
val JARS: OptionalConfigEntry[Seq[String]]
33
34
// Classpath and dependency management
35
val USER_CLASS_PATH_FIRST: ConfigEntry[Boolean]
36
val POPULATE_HADOOP_CLASSPATH: ConfigEntry[Boolean]
37
val CLASSPATH_PREP_TASK: ConfigEntry[Boolean]
38
39
// Security configuration
40
val KERBEROS_PRINCIPAL: OptionalConfigEntry[String]
41
val KERBEROS_KEYTAB: OptionalConfigEntry[String]
42
val ACCESS_NAMENODES: ConfigEntry[Seq[String]]
43
val CREDENTIALS_FILE: OptionalConfigEntry[String]
44
45
// Memory and resource constraints
46
val MEMORY_OVERHEAD_FACTOR: ConfigEntry[Double]
47
val MEMORY_OVERHEAD_MIN: ConfigEntry[Long]
48
49
// Staging and cleanup
50
val STAGING_DIR: ConfigEntry[String]
51
val PRESERVE_STAGING_FILES: ConfigEntry[Boolean]
52
val REPLACE_STAGING_DIR: ConfigEntry[Boolean]
53
54
// Container management
55
val CONTAINER_LAUNCHER_MAX_THREADS: ConfigEntry[Int]
56
val RM_HEARTBEAT_INTERVAL: ConfigEntry[Long]
57
val AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL: ConfigEntry[Long]
58
59
// Container and node management
60
val CONTAINER_PLACEMENT_STRATEGY: ConfigEntry[String]
61
val NODE_LABEL_EXPRESSION: OptionalConfigEntry[String]
62
val ROLLED_LOG_INCLUDE_PATTERN: OptionalConfigEntry[String]
63
val ROLLED_LOG_EXCLUDE_PATTERN: OptionalConfigEntry[String]
64
65
// Additional internal configuration entries
66
val APP_JAR: OptionalConfigEntry[String]
67
val SECONDARY_JARS: OptionalConfigEntry[Seq[String]]
68
val CACHED_FILES: ConfigEntry[Seq[String]]
69
val CACHED_FILES_SIZES: ConfigEntry[Seq[Long]]
70
val CACHED_FILES_TIMESTAMPS: ConfigEntry[Seq[Long]]
71
val CACHED_FILES_VISIBILITIES: ConfigEntry[Seq[String]]
72
val CACHED_FILES_TYPES: ConfigEntry[Seq[String]]
73
val CACHED_CONF_ARCHIVE: OptionalConfigEntry[String]
74
75
// Security configuration
76
val KERBEROS_RELOGIN_PERIOD: ConfigEntry[Long]
77
val NAMENODES_TO_ACCESS: ConfigEntry[Seq[String]]
78
val FILESYSTEMS_TO_ACCESS: ConfigEntry[Seq[String]]
79
80
// Executor node placement
81
val EXECUTOR_NODE_LABEL_EXPRESSION: OptionalConfigEntry[String]
82
83
// YARN allocator-level blacklisting
84
val YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED: ConfigEntry[Boolean]
85
```
86
87
## Application Configuration
88
89
### Basic Application Settings
90
91
```scala
92
// Application identification and metadata
93
val sparkConf = new SparkConf()
94
.setAppName("MySparkApplication")
95
.set("spark.yarn.tags", "batch,analytics,production") // APPLICATION_TAGS
96
.set("spark.yarn.maxAppAttempts", "3") // MAX_APP_ATTEMPTS
97
.set("spark.yarn.queue", "production") // QUEUE_NAME
98
.set("spark.yarn.priority", "10") // PRIORITY
99
```
100
101
**Configuration Details:**
102
103
**`spark.yarn.tags`**
104
- Comma-separated list of tags for application categorization
105
- Used by YARN UI and monitoring tools for filtering and organization
106
- Example: `"etl,daily,finance"`
107
108
**`spark.yarn.maxAppAttempts`**
109
- Maximum number of application attempts before failure
110
- Default: YARN cluster default (typically 2)
111
- Range: 1 to cluster maximum
112
113
**`spark.yarn.queue`**
114
- YARN queue name for resource allocation
115
- Must be an existing queue with appropriate user permissions
116
- Default: "default"
117
118
**`spark.yarn.priority`**
119
- Application priority within the queue (higher number = higher priority)
120
- Subject to queue and cluster priority policies
121
- Default: 0
122
123
### Deploy Mode Configuration
124
125
```scala
126
// Client mode configuration
127
val clientConf = new SparkConf()
128
.setMaster("yarn")
129
.set("spark.submit.deployMode", "client")
130
.set("spark.yarn.am.waitTime", "100s") // Wait time for AM startup
131
132
// Cluster mode configuration
133
val clusterConf = new SparkConf()
134
.setMaster("yarn")
135
.set("spark.submit.deployMode", "cluster")
136
.set("spark.yarn.submit.waitAppCompletion", "true") // Wait for completion
137
```
138
139
## Resource Configuration
140
141
### Executor Configuration
142
143
```scala
144
val sparkConf = new SparkConf()
145
// Executor resource allocation
146
.set("spark.executor.instances", "20") // EXECUTOR_INSTANCES
147
.set("spark.executor.cores", "4") // EXECUTOR_CORES
148
.set("spark.executor.memory", "8g") // EXECUTOR_MEMORY
149
.set("spark.yarn.executor.memoryOverhead", "1g") // EXECUTOR_MEMORY_OVERHEAD
150
151
// Dynamic allocation (alternative to static instances)
152
.set("spark.dynamicAllocation.enabled", "true")
153
.set("spark.dynamicAllocation.minExecutors", "5")
154
.set("spark.dynamicAllocation.maxExecutors", "100")
155
.set("spark.dynamicAllocation.initialExecutors", "20")
156
```
157
158
**Memory Configuration:**
159
- `spark.executor.memory`: JVM heap memory per executor
160
- `spark.yarn.executor.memoryOverhead`: Off-heap memory (native libraries, JVM overhead)
161
- Total container memory = heap + overhead
162
- Overhead default: max(384MB, 10% of heap)
163
164
**Core Configuration:**
165
- `spark.executor.cores`: CPU cores per executor container
166
- Limited by YARN NodeManager available cores
167
- Consider NUMA topology and hyperthreading
168
169
### ApplicationMaster Configuration
170
171
```scala
172
val sparkConf = new SparkConf()
173
.set("spark.yarn.am.memory", "2g") // AM_MEMORY
174
.set("spark.yarn.am.cores", "2") // AM_CORES
175
.set("spark.yarn.am.memoryOverhead", "384m") // AM_MEMORY_OVERHEAD
176
.set("spark.yarn.am.nodeLabelExpression", "compute") // NODE_LABEL_EXPRESSION
177
```
178
179
**ApplicationMaster Resources:**
180
- **Client Mode**: AM coordinates with external driver
181
- **Cluster Mode**: AM runs the driver (needs more resources)
182
- Default AM memory: 512MB (client), 1GB (cluster)
183
184
### Container Placement
185
186
```scala
187
val sparkConf = new SparkConf()
188
.set("spark.yarn.containerLauncherMaxThreads", "25")
189
.set("spark.yarn.executor.failuresValidityInterval", "1h")
190
.set("spark.yarn.max.executor.failures", "3")
191
.set("spark.yarn.executor.launch.blacklist.enabled", "true")
192
```
193
194
## File Distribution
195
196
### Archive and File Management
197
198
```scala
199
val sparkConf = new SparkConf()
200
// Spark distribution archive
201
.set("spark.yarn.archive", "hdfs://namenode:8020/spark-libs/spark-2.4.8.tgz") // SPARK_ARCHIVE
202
203
// Additional files and archives
204
.set("spark.yarn.dist.files", "config.properties,data.txt") // FILES
205
.set("spark.yarn.dist.archives", "libs.tar.gz,resources.zip") // ARCHIVES
206
.set("spark.yarn.dist.jars", "external-lib.jar,utils.jar") // JARS
207
```
208
209
**File Distribution Types:**
210
211
**`spark.yarn.archive`**
212
- Pre-built Spark distribution archive on HDFS
213
- Avoids uploading Spark JARs for each application
214
- Significant performance improvement for frequent submissions
215
216
**`spark.yarn.dist.files`**
217
- Additional files to distribute to executors
218
- Available in executor working directory
219
- Comma-separated list of local or HDFS paths
220
221
**`spark.yarn.dist.archives`**
222
- Archive files (tar, zip, tgz) to extract on executors
223
- Extracted in executor working directory
224
- Useful for large dependency sets
225
226
**`spark.yarn.dist.jars`**
227
- Additional JAR files for executor classpath
228
- Distributed but not automatically added to classpath
229
- Use `spark.jars` for automatic classpath inclusion
230
231
### Classpath Management
232
233
```scala
234
val sparkConf = new SparkConf()
235
.set("spark.yarn.user.classpath.first", "true") // USER_CLASS_PATH_FIRST
236
.set("spark.yarn.populateHadoopClasspath", "true") // POPULATE_HADOOP_CLASSPATH
237
.set("spark.yarn.classpath.prepTask", "true") // CLASSPATH_PREP_TASK
238
```
239
240
**Classpath Configuration:**
241
242
**`spark.yarn.user.classpath.first`**
243
- Places user JARs before Spark JARs in classpath
244
- Useful for overriding Spark dependencies
245
- Can cause compatibility issues if not used carefully
246
247
**`spark.yarn.populateHadoopClasspath`**
248
- Includes Hadoop classpath in executor environment
249
- Necessary for some Hadoop integrations
250
- May increase container startup time
251
252
## Security Configuration
253
254
### Kerberos Authentication
255
256
```scala
257
val sparkConf = new SparkConf()
258
.set("spark.yarn.principal", "spark/_HOST@REALM") // KERBEROS_PRINCIPAL
259
.set("spark.yarn.keytab", "/etc/security/keytabs/spark.keytab") // KERBEROS_KEYTAB
260
.set("spark.yarn.access.hadoopFileSystems",
261
"hdfs://namenode1:8020,hdfs://namenode2:8020") // ACCESS_NAMENODES
262
```
263
264
**Kerberos Configuration:**
265
266
**`spark.yarn.principal`**
267
- Kerberos principal for Spark service authentication
268
- Use `_HOST` token for hostname substitution
269
- Example: `spark/hostname.domain.com@REALM`
270
271
**`spark.yarn.keytab`**
272
- Path to keytab file containing service credentials
273
- Must be readable by Spark processes
274
- Should be protected with appropriate file permissions
275
276
**`spark.yarn.access.hadoopFileSystems`**
277
- Hadoop filesystems requiring delegation tokens
278
- Comma-separated list of filesystem URIs
279
- Tokens obtained automatically during submission
280
281
### Credential Management
282
283
```scala
284
val sparkConf = new SparkConf()
285
.set("spark.yarn.credentials.file", "/tmp/hadoop-tokens") // CREDENTIALS_FILE
286
.set("spark.yarn.credentials.renewalTime", "24h")
287
.set("spark.yarn.credentials.updateTime", "1h")
288
289
// Service-specific credential providers
290
.set("spark.yarn.security.credentials.hive.enabled", "true")
291
.set("spark.yarn.security.credentials.hbase.enabled", "true")
292
.set("spark.yarn.security.credentials.hadoopfs.enabled", "true")
293
```
294
295
## Advanced Configuration
296
297
### Performance Tuning
298
299
```scala
300
val sparkConf = new SparkConf()
301
// Container launch optimization
302
.set("spark.yarn.containerLauncherMaxThreads", "50")
303
.set("spark.yarn.launchContainer.timeout", "120s")
304
305
// Memory tuning
306
.set("spark.yarn.executor.memoryOverheadFactor", "0.15") // 15% overhead
307
.set("spark.yarn.am.memoryOverhead", "512m")
308
309
// Network optimization
310
.set("spark.yarn.network.timeout", "300s")
311
.set("spark.yarn.nodemanager.address", "0.0.0.0:8034")
312
```
313
314
### Logging Configuration
315
316
```scala
317
val sparkConf = new SparkConf()
318
.set("spark.yarn.submit.file.replication", "3") // HDFS replication for logs
319
.set("spark.yarn.preserve.staging.files", "false") // Cleanup staging files
320
.set("spark.yarn.rolling.strategy", "time") // Log rolling strategy
321
.set("spark.yarn.rolling.time.interval", "daily") // Rolling interval
322
```
323
324
### Container Resource Constraints
325
326
```scala
327
val sparkConf = new SparkConf()
328
// Resource limits
329
.set("spark.yarn.executor.resource.memory", "8g")
330
.set("spark.yarn.executor.resource.cpu", "4")
331
332
// YARN node labels for placement
333
.set("spark.yarn.executor.nodeLabelExpression", "compute")
334
.set("spark.yarn.am.nodeLabelExpression", "management")
335
```
336
337
## Configuration Validation
338
339
### Required Configuration Checks
340
341
```scala
342
def validateYarnConfiguration(conf: SparkConf): Unit = {
343
// Check master URL
344
require(conf.get("spark.master") == "yarn", "Master must be 'yarn' for YARN mode")
345
346
// Validate deploy mode
347
val deployMode = conf.get("spark.submit.deployMode", "client")
348
require(deployMode == "client" || deployMode == "cluster",
349
s"Invalid deploy mode: $deployMode")
350
351
// Check resource configuration
352
val executorMemory = conf.getSizeAsBytes("spark.executor.memory", "1g")
353
require(executorMemory >= 384 * 1024 * 1024, "Executor memory must be at least 384MB")
354
355
// Validate queue name if specified
356
conf.getOption("spark.yarn.queue").foreach { queue =>
357
require(queue.nonEmpty, "YARN queue name cannot be empty")
358
}
359
}
360
```
361
362
### Configuration Conflicts
363
364
```scala
365
def checkConfigurationConflicts(conf: SparkConf): Unit = {
366
// Dynamic allocation vs static instances
367
val dynamicEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
368
val staticInstances = conf.getOption("spark.executor.instances")
369
370
if (dynamicEnabled && staticInstances.isDefined) {
371
logWarning("Both dynamic allocation and static executor instances configured. " +
372
"Dynamic allocation will take precedence.")
373
}
374
375
// Security configuration consistency
376
val principal = conf.getOption("spark.yarn.principal")
377
val keytab = conf.getOption("spark.yarn.keytab")
378
379
if (principal.isDefined != keytab.isDefined) {
380
throw new IllegalArgumentException(
381
"Both spark.yarn.principal and spark.yarn.keytab must be specified together")
382
}
383
}
384
```
385
386
## Environment-Specific Configuration
387
388
### Development Environment
389
390
```scala
391
val devConf = new SparkConf()
392
.setMaster("yarn")
393
.set("spark.submit.deployMode", "client")
394
.set("spark.yarn.queue", "dev")
395
.set("spark.executor.instances", "2")
396
.set("spark.executor.memory", "2g")
397
.set("spark.executor.cores", "2")
398
.set("spark.yarn.preserve.staging.files", "true") // Keep files for debugging
399
```
400
401
### Production Environment
402
403
```scala
404
val prodConf = new SparkConf()
405
.setMaster("yarn")
406
.set("spark.submit.deployMode", "cluster")
407
.set("spark.yarn.queue", "production")
408
.set("spark.dynamicAllocation.enabled", "true")
409
.set("spark.dynamicAllocation.minExecutors", "10")
410
.set("spark.dynamicAllocation.maxExecutors", "200")
411
.set("spark.yarn.maxAppAttempts", "3")
412
.set("spark.yarn.archive", "hdfs://namenode:8020/spark-libs/spark-2.4.8.tgz")
413
.set("spark.yarn.principal", "spark-prod/_HOST@PROD.REALM")
414
.set("spark.yarn.keytab", "/etc/security/keytabs/spark-prod.keytab")
415
```
416
417
### Testing Environment
418
419
```scala
420
val testConf = new SparkConf()
421
.setMaster("yarn")
422
.set("spark.submit.deployMode", "client")
423
.set("spark.yarn.queue", "test")
424
.set("spark.executor.instances", "1")
425
.set("spark.executor.memory", "1g")
426
.set("spark.executor.cores", "1")
427
.set("spark.sql.adaptive.enabled", "false") // Disable adaptive query execution
428
.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
429
```
430
431
## Configuration Best Practices
432
433
### Resource Sizing Guidelines
434
435
```scala
436
// Memory sizing formula
437
val nodeMemory = 64 * 1024 // 64GB node
438
val systemReserved = 8 * 1024 // 8GB for OS and services
439
val yarnMemory = nodeMemory - systemReserved
440
441
val executorMemory = 6 * 1024 // 6GB heap
442
val executorOverhead = math.max(executorMemory * 0.1, 384).toInt // 10% overhead
443
val totalExecutorMemory = executorMemory + executorOverhead
444
445
val executorsPerNode = yarnMemory / totalExecutorMemory // ~9 executors per node
446
```
447
448
### Security Best Practices
449
450
```scala
451
val secureConf = new SparkConf()
452
// Use service keytabs, not user tickets
453
.set("spark.yarn.principal", "spark-service/_HOST@REALM")
454
.set("spark.yarn.keytab", "/etc/security/keytabs/spark.headless.keytab")
455
456
// Enable delegation token renewal
457
.set("spark.yarn.credentials.renewalTime", "12h")
458
459
// Limit application attempts in secure mode
460
.set("spark.yarn.maxAppAttempts", "1")
461
462
// Enable secure communication
463
.set("spark.authenticate", "true")
464
.set("spark.network.crypto.enabled", "true")
465
```
466
467
### Performance Optimization
468
469
```scala
470
val optimizedConf = new SparkConf()
471
// Use Spark archive for faster container startup
472
.set("spark.yarn.archive", "hdfs://namenode:8020/spark-libs/spark-2.4.8.tgz")
473
474
// Optimize container launching
475
.set("spark.yarn.containerLauncherMaxThreads", "25")
476
477
// Enable dynamic allocation for variable workloads
478
.set("spark.dynamicAllocation.enabled", "true")
479
.set("spark.dynamicAllocation.minExecutors", "5")
480
.set("spark.dynamicAllocation.maxExecutors", "500")
481
.set("spark.dynamicAllocation.targetUtilization", "0.8")
482
483
// Optimize shuffle and serialization
484
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
485
.set("spark.sql.adaptive.enabled", "true")
486
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
487
```