0
# Hadoop Utilities
1
2
The YARN module provides specialized Hadoop utilities that extend Spark's base Hadoop integration with YARN-specific functionality. These utilities handle security, configuration, environment management, and various operational tasks required for running Spark on YARN clusters.
3
4
## Imports
5
6
```scala
7
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
8
import org.apache.spark.deploy.SparkHadoopUtil
9
import org.apache.spark.{SecurityManager, SparkConf}
10
import org.apache.hadoop.conf.Configuration
11
import org.apache.hadoop.mapred.JobConf
12
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
13
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
14
import scala.collection.mutable.HashMap
15
```
16
17
## Capabilities
18
19
### YarnSparkHadoopUtil Class
20
21
Main utility class that extends Spark's base Hadoop utilities with YARN-specific functionality.
22
23
```scala { .api }
24
/**
25
* YARN-specific Hadoop utilities for Spark
26
* Extends base SparkHadoopUtil with YARN functionality
27
*/
28
class YarnSparkHadoopUtil extends SparkHadoopUtil {
29
30
/**
31
* Transfer credentials between user group information objects
32
* @param source Source user group information
33
* @param dest Destination user group information
34
*/
35
override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation): Unit
36
37
/**
38
* Returns true indicating YARN mode is enabled
39
* @return Always true for YARN mode
40
*/
41
override def isYarnMode(): Boolean
42
43
/**
44
* Creates a new Hadoop configuration with YARN-specific settings
45
* @param conf Spark configuration
46
* @return YarnConfiguration instance
47
*/
48
override def newConfiguration(conf: SparkConf): Configuration
49
50
/**
51
* Adds user credentials to job configuration for secure clusters
52
* @param conf Job configuration to modify
53
*/
54
override def addCredentials(conf: JobConf): Unit
55
56
/**
57
* Gets current user's security credentials
58
* @return Current user's credentials
59
*/
60
override def getCurrentUserCredentials(): Credentials
61
62
/**
63
* Adds credentials to current user's credential store
64
* @param creds Credentials to add
65
*/
66
override def addCurrentUserCredentials(creds: Credentials): Unit
67
68
/**
69
* Adds a secret key to current user's credentials
70
* @param key Secret key name
71
* @param secret Secret value as string
72
*/
73
override def addSecretKeyToUserCredentials(key: String, secret: String): Unit
74
75
/**
76
* Retrieves a secret key from current user's credentials
77
* @param key Secret key name
78
* @return Secret value as byte array, or null if not found
79
*/
80
override def getSecretKeyFromUserCredentials(key: String): Array[Byte]
81
}
82
```
83
84
**Usage Example:**
85
86
```scala
87
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
88
89
// Create YARN-specific Hadoop utilities
90
val yarnUtils = new YarnSparkHadoopUtil()
91
92
// Check if running in YARN mode
93
val isYarn = yarnUtils.isYarnMode() // Always true
94
95
// Create YARN configuration
96
val sparkConf = new SparkConf()
97
val yarnConf = yarnUtils.newConfiguration(sparkConf)
98
99
// Manage credentials for secure clusters
100
val currentCreds = yarnUtils.getCurrentUserCredentials()
101
yarnUtils.addSecretKeyToUserCredentials("myapp.secret", "secretValue")
102
```
103
104
### YarnSparkHadoopUtil Object
105
106
Companion object providing constants, utility methods, and shared functionality for YARN operations.
107
108
```scala { .api }
109
/**
110
* Companion object with YARN utility constants and methods
111
*/
112
object YarnSparkHadoopUtil {
113
114
// Memory overhead configuration
115
val MEMORY_OVERHEAD_FACTOR: Double = 0.07 // 7% memory overhead
116
val MEMORY_OVERHEAD_MIN: Int = 384 // Minimum 384MB overhead
117
118
// Host and resource constants
119
val ANY_HOST: String = "*" // Wildcard for any host
120
val DEFAULT_NUMBER_EXECUTORS: Int = 2 // Default executor count
121
val RM_REQUEST_PRIORITY: Int = 1 // ResourceManager request priority
122
123
/**
124
* Adds a path variable to environment map
125
* Appends to existing value if key already exists
126
* @param env Environment map to modify
127
* @param key Environment variable name
128
* @param value Path value to add
129
*/
130
def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit
131
132
/**
133
* Sets environment variables from input string
134
* Input format: "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3"
135
* @param env Environment map to modify
136
* @param inputString Comma-separated key=value pairs
137
*/
138
def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit
139
140
/**
141
* Escapes argument for shell execution in YARN
142
* Handles special characters for bash execution
143
* @param arg Argument to escape
144
* @return Shell-escaped argument
145
*/
146
def escapeForShell(arg: String): String
147
148
/**
149
* Looks up rack information for a host
150
* Uses cached rack topology information
151
* @param conf Hadoop configuration
152
* @param host Hostname to look up
153
* @return Rack name for the host
154
*/
155
def lookupRack(conf: Configuration, host: String): String
156
157
/**
158
* Populates rack information cache for a hostname
159
* Resolves rack topology and caches results
160
* @param conf Hadoop configuration
161
* @param hostname Hostname to resolve
162
*/
163
def populateRackInfo(conf: Configuration, hostname: String): Unit
164
165
/**
166
* Gets application ACLs formatted for YARN
167
* Converts Spark security manager ACLs to YARN format
168
* @param securityMgr Spark security manager
169
* @return Map of YARN application access types to ACL strings
170
*/
171
def getApplicationAclsForYarn(securityMgr: SecurityManager): Map[ApplicationAccessType, String]
172
}
173
```
174
175
## Environment Management
176
177
### Path Variable Handling
178
179
Managing environment variables and paths for YARN containers:
180
181
```scala
182
import scala.collection.mutable.HashMap
183
184
val env = HashMap[String, String]()
185
186
// Add path to existing environment variable
187
YarnSparkHadoopUtil.addPathToEnvironment(env, "CLASSPATH", "/path/to/jar")
188
YarnSparkHadoopUtil.addPathToEnvironment(env, "CLASSPATH", "/another/path")
189
// Result: env("CLASSPATH") = "/path/to/jar:/another/path"
190
191
// Set multiple environment variables from string
192
YarnSparkHadoopUtil.setEnvFromInputString(env, "JAVA_HOME=/usr/lib/jvm/java-8,SPARK_HOME=/opt/spark")
193
```
194
195
### Shell Argument Escaping
196
197
Properly escape arguments for YARN's bash command execution:
198
199
```scala
200
val unsafeArg = "my file with spaces and $special chars"
201
val safeArg = YarnSparkHadoopUtil.escapeForShell(unsafeArg)
202
// Result: 'my file with spaces and \$special chars'
203
204
// Use in YARN container commands
205
val command = s"java -cp ${YarnSparkHadoopUtil.escapeForShell(classpath)} MyClass"
206
```
207
208
## Security and Credentials
209
210
### Credential Management
211
212
Handle security credentials for secure Hadoop clusters:
213
214
```scala
215
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
216
217
val yarnUtils = new YarnSparkHadoopUtil()
218
219
// Get current user credentials
220
val currentCreds = yarnUtils.getCurrentUserCredentials()
221
222
// Add secret keys for application security
223
yarnUtils.addSecretKeyToUserCredentials("spark.authenticate.secret", "mySecretKey")
224
yarnUtils.addSecretKeyToUserCredentials("app.custom.token", "applicationToken")
225
226
// Retrieve secret keys
227
val authSecret = yarnUtils.getSecretKeyFromUserCredentials("spark.authenticate.secret")
228
val appToken = yarnUtils.getSecretKeyFromUserCredentials("app.custom.token")
229
230
// Transfer credentials between users
231
val sourceUGI = UserGroupInformation.getCurrentUser()
232
val destUGI = UserGroupInformation.createProxyUser("appuser", sourceUGI)
233
yarnUtils.transferCredentials(sourceUGI, destUGI)
234
```
235
236
### Kerberos Integration
237
238
Integration with Kerberos authentication:
239
240
```scala
241
import org.apache.hadoop.mapred.JobConf
242
243
val jobConf = new JobConf()
244
val yarnUtils = new YarnSparkHadoopUtil()
245
246
// Add current user's Kerberos credentials to job configuration
247
yarnUtils.addCredentials(jobConf)
248
249
// Credentials include:
250
// - Kerberos tickets
251
// - HDFS delegation tokens
252
// - Other service tokens (HBase, Hive, etc.)
253
```
254
255
### Application ACLs
256
257
Convert Spark security settings to YARN application ACLs:
258
259
```scala
260
import org.apache.spark.SecurityManager
261
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
262
263
val sparkConf = new SparkConf()
264
.set("spark.ui.view.acls", "user1,user2")
265
.set("spark.modify.acls", "admin1,admin2")
266
267
val securityMgr = new SecurityManager(sparkConf)
268
val yarnAcls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
269
270
// Result map:
271
// ApplicationAccessType.VIEW_APP -> "user1,user2"
272
// ApplicationAccessType.MODIFY_APP -> "admin1,admin2"
273
```
274
275
## Network Topology and Rack Awareness
276
277
### Rack Information Management
278
279
Efficient rack topology resolution and caching:
280
281
```scala
282
import org.apache.hadoop.conf.Configuration
283
284
val hadoopConf = new Configuration()
285
286
// Look up rack for a host (with caching)
287
val rack1 = YarnSparkHadoopUtil.lookupRack(hadoopConf, "node1.example.com")
288
val rack2 = YarnSparkHadoopUtil.lookupRack(hadoopConf, "node2.example.com")
289
290
// Explicitly populate rack information cache
291
YarnSparkHadoopUtil.populateRackInfo(hadoopConf, "node3.example.com")
292
293
// Benefits:
294
// - Cached lookups for performance
295
// - Data locality optimization
296
// - Cross-rack communication minimization
297
```
298
299
### Topology-Aware Scheduling
300
301
Integration with Spark's scheduler for optimal task placement:
302
303
```scala
304
// Used internally by schedulers for rack-aware task placement
305
class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
306
override def getRackForHost(hostPort: String): Option[String] = {
307
val host = Utils.parseHostPort(hostPort)._1
308
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
309
}
310
}
311
```
312
313
## Resource Configuration
314
315
### Memory Overhead Calculation
316
317
Automatic memory overhead calculation for YARN containers:
318
319
```scala
320
// Constants for memory overhead
321
val overheadFactor = YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR // 0.07 (7%)
322
val minOverhead = YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN // 384 MB
323
324
// Calculate memory overhead for executor
325
def calculateMemoryOverhead(executorMemoryMB: Int): Int = {
326
val calculatedOverhead = (executorMemoryMB * overheadFactor).toInt
327
math.max(calculatedOverhead, minOverhead)
328
}
329
330
// Example: 2GB executor memory
331
val executorMem = 2048 // 2GB
332
val overhead = calculateMemoryOverhead(executorMem) // max(143, 384) = 384 MB
333
val totalMemory = executorMem + overhead // 2432 MB requested from YARN
334
```
335
336
### Resource Request Configuration
337
338
Default values and constants for resource requests:
339
340
```scala
341
// Default configuration values
342
val defaultExecutors = YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS // 2
343
val requestPriority = YarnSparkHadoopUtil.RM_REQUEST_PRIORITY // 1
344
val anyHost = YarnSparkHadoopUtil.ANY_HOST // "*"
345
346
// Use in resource requests to YARN ResourceManager
347
val resourceRequest = Records.newRecord(classOf[ResourceRequest])
348
resourceRequest.setPriority(Priority.newInstance(requestPriority))
349
resourceRequest.setResourceName(anyHost) // No locality preference
350
```
351
352
## Configuration Integration
353
354
### YARN Configuration Creation
355
356
Create properly configured YarnConfiguration instances:
357
358
```scala
359
val sparkConf = new SparkConf()
360
.set("spark.yarn.queue", "production")
361
.set("spark.yarn.am.memory", "1g")
362
363
val yarnUtils = new YarnSparkHadoopUtil()
364
val yarnConf = yarnUtils.newConfiguration(sparkConf).asInstanceOf[YarnConfiguration]
365
366
// YarnConfiguration includes:
367
// - Spark configuration properties
368
// - Hadoop configuration from classpath
369
// - YARN-specific configuration
370
// - Security settings and credentials
371
```
372
373
### Mode Detection
374
375
Reliable detection of YARN execution mode:
376
377
```scala
378
val yarnUtils = new YarnSparkHadoopUtil()
379
380
// Always returns true in YARN module
381
val isYarnMode = yarnUtils.isYarnMode()
382
383
// Used by Spark core to enable YARN-specific behavior:
384
// - Different security handling
385
// - YARN-specific UI integration
386
// - Special configuration validation
387
// - YARN-aware error messages
388
```
389
390
## Error Handling and Diagnostics
391
392
### Robust Network Resolution
393
394
Reliable hostname and rack resolution with error handling:
395
396
```scala
397
// Handles various failure scenarios:
398
// - DNS resolution failures
399
// - Network connectivity issues
400
// - Rack resolver configuration problems
401
// - Invalid hostname formats
402
403
try {
404
YarnSparkHadoopUtil.populateRackInfo(hadoopConf, hostname)
405
val rack = YarnSparkHadoopUtil.lookupRack(hadoopConf, hostname)
406
} catch {
407
case e: Exception =>
408
// Graceful degradation - continue without rack information
409
logWarning(s"Failed to resolve rack for $hostname", e)
410
}
411
```
412
413
### Security Error Handling
414
415
Comprehensive error handling for security operations:
416
417
```scala
418
try {
419
yarnUtils.addSecretKeyToUserCredentials(key, secret)
420
} catch {
421
case e: SecurityException =>
422
logError("Failed to add secret key - insufficient permissions", e)
423
case e: IOException =>
424
logError("Failed to add secret key - credential store error", e)
425
}
426
```
427
428
## Integration with Spark Components
429
430
### SparkContext Integration
431
432
Automatic integration with SparkContext initialization:
433
434
```scala
435
// Automatic registration as Hadoop util implementation
436
SparkHadoopUtil.get // Returns YarnSparkHadoopUtil instance in YARN mode
437
438
// Used throughout Spark for:
439
// - Configuration management
440
// - Security operations
441
// - File system access
442
// - Credential handling
443
```
444
445
### FileSystem Integration
446
447
Enhanced file system integration for YARN environments:
448
449
```scala
450
val yarnConf = yarnUtils.newConfiguration(sparkConf)
451
val fs = FileSystem.get(yarnConf)
452
453
// FileSystem configured with:
454
// - YARN-specific authentication
455
// - Proper delegation tokens
456
// - Security credentials
457
// - Optimal configuration for YARN clusters
458
```