0
# Utilities and Configuration
1
2
Utility classes and configuration management for YARN-specific operations, distributed cache management, and executor container handling. These components provide essential support functionality for YARN integration.
3
4
## Capabilities
5
6
### YarnSparkHadoopUtil
7
8
YARN-specific Hadoop utilities extending Spark's core Hadoop integration with YARN-specific functionality.
9
10
```scala { .api }
11
/**
12
* YARN-specific Hadoop utilities
13
* Extends SparkHadoopUtil with YARN-specific operations and configurations
14
*/
15
class YarnSparkHadoopUtil extends SparkHadoopUtil {
16
// YARN-specific security token handling
17
// Hadoop configuration management for YARN
18
// YARN service discovery and integration
19
// Kerberos authentication support for YARN clusters
20
}
21
22
/**
23
* Companion object with YARN-specific constants and utility methods
24
*/
25
object YarnSparkHadoopUtil {
26
/** Memory overhead factor for containers (7% of container memory) */
27
val MEMORY_OVERHEAD_FACTOR = 0.07
28
29
/** Minimum memory overhead for containers in MB */
30
val MEMORY_OVERHEAD_MIN = 384
31
32
/** Wildcard host for resource requests */
33
val ANY_HOST = "*"
34
35
/** Default number of executors when not specified */
36
val DEFAULT_NUMBER_EXECUTORS = 2
37
38
/** Resource manager request priority */
39
val RM_REQUEST_PRIORITY = 1
40
41
/** Get YarnSparkHadoopUtil singleton instance */
42
def get: YarnSparkHadoopUtil
43
}
44
```
45
46
**Usage Examples:**
47
48
```scala
49
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
50
51
// Access YARN-specific Hadoop utilities
52
val yarnUtil = YarnSparkHadoopUtil.get
53
54
// Use for YARN-specific operations like:
55
// - Security token management
56
// - YARN service discovery
57
// - Hadoop configuration handling specific to YARN
58
// - Kerberos integration for secure clusters
59
```
60
61
### ClientDistributedCacheManager
62
63
Manages distributed cache functionality for YARN applications, handling file and archive distribution to executor nodes.
64
65
```scala { .api }
66
/**
67
* Manages distributed cache for YARN applications
68
* Handles distribution of files, JARs, and archives to executor containers
69
*/
70
private[spark] class ClientDistributedCacheManager {
71
// Distributed cache file management
72
// Archive and JAR distribution coordination
73
// Local resource preparation for container launch
74
// Cache cleanup and resource lifecycle management
75
}
76
```
77
78
**Usage Examples:**
79
80
```scala
81
import org.apache.spark.deploy.yarn.ClientDistributedCacheManager
82
83
// Created internally by Client for managing distributed resources
84
val cacheManager = new ClientDistributedCacheManager()
85
86
// Handles:
87
// - Files specified via --files argument
88
// - Archives specified via --archives argument
89
// - Additional JARs from --addJars argument
90
// - Proper cleanup when application completes
91
```
92
93
### ExecutorRunnableUtil
94
95
Utility trait providing common functionality for executor container management across different YARN API versions.
96
97
```scala { .api }
98
/**
99
* Utility trait for executor container management
100
* Provides common executor launch and management functionality
101
*/
102
trait ExecutorRunnableUtil {
103
// Common executor container launch logic
104
// Environment variable setup for executors
105
// Classpath configuration and JAR distribution
106
// Resource allocation and container configuration
107
}
108
```
109
110
**Usage Examples:**
111
112
```scala
113
import org.apache.spark.deploy.yarn.ExecutorRunnableUtil
114
115
// Mixed into ExecutorRunnable implementations
116
class ExecutorRunnable extends ExecutorRunnableUtil {
117
// Inherits common executor management functionality
118
// Used for launching executor containers on YARN nodes
119
// Handles environment setup and resource configuration
120
}
121
```
122
123
### ExecutorRunnable
124
125
Version-specific executor container implementation classes for managing executor containers on YARN.
126
127
```scala { .api }
128
/**
129
* Version-specific executor container implementation
130
* Available in both alpha (deprecated) and stable modules
131
* Manages executor containers on YARN NodeManager
132
*/
133
class ExecutorRunnable extends ExecutorRunnableUtil {
134
// Executor container launch and lifecycle management
135
// Integration with specific YARN API versions
136
// Resource allocation and environment setup
137
// Container monitoring and cleanup
138
}
139
```
140
141
**Usage Examples:**
142
143
```scala
144
import org.apache.spark.deploy.yarn.ExecutorRunnable
145
146
// Created by YarnAllocator for each executor container
147
val executorRunnable = new ExecutorRunnable(
148
container = yarnContainer,
149
conf = sparkConf,
150
sparkJar = distributedSparkJar
151
)
152
153
// Handles complete executor container lifecycle:
154
// - Container launch on NodeManager
155
// - Executor JVM startup and configuration
156
// - Resource monitoring and cleanup
157
```
158
159
## Distributed Cache Management
160
161
### File Distribution
162
163
```scala
164
// Distributed cache handles various file types
165
class ClientDistributedCacheManager {
166
// Regular files (--files argument)
167
private def distributeFiles(files: String): Map[String, LocalResource] = {
168
// Upload files to HDFS staging directory
169
// Create LocalResource entries for YARN
170
// Configure file permissions and visibility
171
// Return resource map for container launch context
172
}
173
174
// Archive files (--archives argument)
175
private def distributeArchives(archives: String): Map[String, LocalResource] = {
176
// Handle tar.gz, zip, and other archive formats
177
// Configure automatic extraction in containers
178
// Set up proper working directory structure
179
}
180
181
// JAR files (--addJars argument)
182
private def distributeJars(jars: String): Map[String, LocalResource] = {
183
// Add JARs to executor classpath
184
// Handle both local and HDFS JAR locations
185
// Optimize JAR distribution across cluster nodes
186
}
187
}
188
```
189
190
### Resource Lifecycle
191
192
```scala
193
// Complete resource lifecycle management
194
class ClientDistributedCacheManager {
195
def setupDistributedCache(): Map[String, LocalResource] = {
196
// 1. Analyze resource requirements from arguments
197
// 2. Upload local resources to HDFS staging area
198
// 3. Create YARN LocalResource descriptors
199
// 4. Return resource map for container launch
200
}
201
202
def cleanupStagingDir(): Unit = {
203
// Clean up HDFS staging directory after application completion
204
// Remove temporary files and directories
205
// Handle cleanup failures gracefully
206
}
207
}
208
```
209
210
## Executor Container Management
211
212
### Container Launch Context
213
214
```scala
215
// ExecutorRunnable creates complete launch context
216
class ExecutorRunnable {
217
private def createContainerLaunchContext(): ContainerLaunchContext = {
218
// 1. Set up executor command line
219
val executorCommand = buildExecutorCommand()
220
221
// 2. Configure environment variables
222
val environment = setupExecutorEnvironment()
223
224
// 3. Set up local resources (JARs, files, archives)
225
val localResources = setupLocalResources()
226
227
// 4. Configure security tokens
228
val tokens = setupSecurityTokens()
229
230
// 5. Build complete launch context
231
ContainerLaunchContext.newInstance(
232
localResources, environment, executorCommand, tokens
233
)
234
}
235
}
236
```
237
238
### Executor Environment Setup
239
240
```scala
241
// Environment configuration for executor containers
242
trait ExecutorRunnableUtil {
243
protected def setupExecutorEnvironment(): Map[String, String] = {
244
Map(
245
"SPARK_HOME" -> sparkHome,
246
"CLASSPATH" -> buildExecutorClasspath(),
247
"JAVA_HOME" -> javaHome,
248
"PYTHONPATH" -> pythonPath,
249
"HADOOP_CONF_DIR" -> hadoopConfDir,
250
// YARN-specific environment variables
251
"CONTAINER_ID" -> containerId,
252
"APPLICATION_WEB_PROXY_BASE" -> webProxyBase
253
)
254
}
255
256
protected def buildExecutorClasspath(): String = {
257
// Spark JARs and dependencies
258
// User application JARs (--addJars)
259
// Hadoop and YARN libraries
260
// Custom classpaths from configuration
261
}
262
}
263
```
264
265
## Configuration Utilities
266
267
### YARN Configuration Integration
268
269
```scala
270
// YarnSparkHadoopUtil handles configuration integration
271
class YarnSparkHadoopUtil {
272
def getYarnConfiguration(sparkConf: SparkConf): Configuration = {
273
// Load base Hadoop configuration
274
val hadoopConf = new Configuration()
275
276
// Apply Spark-specific YARN overrides
277
applySparkYarnConfiguration(hadoopConf, sparkConf)
278
279
// Handle security configuration for Kerberos clusters
280
setupSecurityConfiguration(hadoopConf)
281
282
hadoopConf
283
}
284
285
private def applySparkYarnConfiguration(
286
hadoopConf: Configuration,
287
sparkConf: SparkConf
288
): Unit = {
289
// Map Spark configuration to Hadoop configuration
290
// Set YARN-specific properties
291
// Configure resource manager addresses
292
// Set up queue and priority configurations
293
}
294
}
295
```
296
297
### Security Token Management
298
299
```scala
300
// Security integration for Kerberos-enabled clusters
301
object YarnSparkHadoopUtil {
302
def obtainTokensForNamenodes(
303
paths: Set[Path],
304
conf: Configuration
305
): Map[String, Token[_]] = {
306
// Obtain delegation tokens for HDFS access
307
// Handle multiple namenode configurations
308
// Support for secure cluster authentication
309
}
310
311
def obtainTokensForHBase(conf: Configuration): Map[String, Token[_]] = {
312
// Obtain tokens for HBase integration
313
// Support for secure HBase clusters
314
}
315
316
def obtainTokensForHive(conf: Configuration): Map[String, Token[_]] = {
317
// Obtain tokens for Hive metastore access
318
// Support for secure Hive integration
319
}
320
}
321
```
322
323
## Performance Optimizations
324
325
### Resource Caching
326
327
```scala
328
// Efficient resource distribution and caching
329
class ClientDistributedCacheManager {
330
private def optimizeResourceDistribution(): Unit = {
331
// Cache frequently used files in HDFS
332
// Use symbolic links for shared resources
333
// Minimize network transfer overhead
334
// Leverage YARN's distributed cache capabilities
335
}
336
}
337
```
338
339
### Container Launch Optimization
340
341
```scala
342
// Optimized executor container launch
343
class ExecutorRunnable {
344
private def launchContainerAsync(): Future[Unit] = {
345
// Parallel container launch to reduce startup time
346
// Pre-warm executor JVMs when possible
347
// Optimize classpath and resource loading
348
// Monitor launch success and retry on failures
349
}
350
}
351
```
352
353
## Error Handling and Diagnostics
354
355
### Container Launch Failures
356
357
```scala
358
// Robust error handling for container operations
359
class ExecutorRunnable {
360
private def handleLaunchFailure(exception: Exception): Unit = {
361
// Log detailed failure information
362
// Classify failure types (resource, network, configuration)
363
// Implement retry logic for transient failures
364
// Report failures to ApplicationMaster for resource reallocation
365
}
366
}
367
```
368
369
### Distributed Cache Failures
370
371
```scala
372
// Error handling for resource distribution
373
class ClientDistributedCacheManager {
374
private def handleDistributionFailure(resource: String, error: Throwable): Unit = {
375
// Log resource distribution failures
376
// Attempt alternative distribution strategies
377
// Fail fast for critical resources
378
// Provide detailed error messages for troubleshooting
379
}
380
}
381
```
382
383
## Integration Points
384
385
### Spark Core Integration
386
387
The utilities integrate deeply with Spark core components:
388
389
- **SparkContext**: Configuration and environment setup
390
- **SparkConf**: YARN-specific configuration properties
391
- **SecurityManager**: Token management and authentication
392
- **SparkHadoopUtil**: Base Hadoop utility functionality
393
394
### YARN Service Integration
395
396
Integration with YARN cluster services:
397
398
- **ResourceManager**: Application submission and monitoring
399
- **NodeManager**: Container launch and management
400
- **Timeline Service**: Application history and metrics
401
- **Web Application Proxy**: Secure application web UI access
402
403
### Hadoop Ecosystem Integration
404
405
Support for broader Hadoop ecosystem:
406
407
- **HDFS**: Distributed file system access and security
408
- **HBase**: NoSQL database integration with token support
409
- **Hive**: Data warehouse integration and metastore access
410
- **Security**: Kerberos authentication and delegation tokens