0
# Resource Management
1
2
Container allocation and executor lifecycle management within YARN resource constraints and scheduling policies. This module handles the complex coordination between Spark's resource needs and YARN's resource management system.
3
4
## Capabilities
5
6
### YarnAllocator
7
8
Core resource allocator that manages container requests, allocation tracking, and executor lifecycle within YARN clusters.
9
10
```scala { .api }
11
class YarnAllocator(
12
driverUrl: String,
13
driverRef: RpcEndpointRef,
14
conf: YarnConfiguration,
15
sparkConf: SparkConf,
16
amClient: AMRMClient[ContainerRequest],
17
appAttemptId: ApplicationAttemptId,
18
securityMgr: SecurityManager,
19
localResources: Map[String, LocalResource],
20
resolver: SparkRackResolver,
21
clock: Clock = new SystemClock()
22
) {
23
def getNumExecutorsRunning: Int
24
def getNumExecutorsFailed: Int
25
def numContainersPendingAllocate: Int
26
def allocateResources(): Unit
27
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit
28
def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit
29
def updateResourceRequests(): Unit
30
def killExecutor(executorId: String): Unit
31
def stop(): Unit
32
}
33
```
34
35
**Constructor Parameters:**
36
- `driverUrl`: RPC URL for driver communication
37
- `driverRef`: RPC endpoint reference for driver
38
- `conf`: YARN configuration from cluster
39
- `sparkConf`: Spark configuration with resource settings
40
- `amClient`: YARN ApplicationMaster ResourceManager client
41
- `appAttemptId`: YARN application attempt identifier
42
- `securityMgr`: Spark security manager instance
43
- `localResources`: Staged resources for executor containers
44
- `resolver`: Rack resolver for locality awareness
45
- `clock`: Clock instance for time-based operations (defaults to SystemClock)
46
47
**Status Query Methods:**
48
49
**`getNumExecutorsRunning: Int`**
50
- Returns count of currently running executors
51
- Includes executors in RUNNING state only
52
53
**`getNumExecutorsFailed: Int`**
54
- Returns count of failed executor attempts
55
- Includes permanent failures and exceeded retry attempts
56
57
**`numContainersPendingAllocate: Int`**
58
- Returns count of outstanding container requests
59
- Requests submitted but not yet allocated by YARN
60
61
**Resource Management Methods:**
62
63
**`allocateResources(): Unit`**
64
- Requests containers from YARN ResourceManager based on current needs
65
- Considers executor targets, pending requests, and failed containers
66
- Updates container allocation state
67
68
**`handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit`**
69
- Processes newly allocated containers from YARN
70
- Launches executor processes in allocated containers
71
- Updates tracking state for launched executors
72
73
**`processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit`**
74
- Handles container completion events from YARN
75
- Distinguishes between normal completion and failures
76
- Triggers replacement requests for failed executors
77
78
**`updateResourceRequests(): Unit`**
79
- Updates container resource requests with YARN ResourceManager
80
- Adjusts requested resources based on current executor targets
81
- Called periodically to sync allocation state
82
83
**`killExecutor(executorId: String): Unit`**
84
- Kills a specific executor by terminating its container
85
- Updates internal tracking state
86
- Used for explicit executor removal
87
88
**`stop(): Unit`**
89
- Stops the allocator and releases all resources
90
- Cancels pending container requests
91
- Performs cleanup of internal state
92
93
**Usage Example:**
94
95
```scala
96
import org.apache.spark.deploy.yarn.YarnAllocator
97
98
// YarnAllocator is typically used internally by ApplicationMaster
99
// Status can be monitored through scheduler backends
100
101
val backend = sc.schedulerBackend.asInstanceOf[YarnSchedulerBackend]
102
// Internal allocator state is managed automatically
103
104
// Monitor resource status through SparkContext
105
val statusTracker = sc.statusTracker
106
val execInfo = statusTracker.getExecutorInfos
107
108
execInfo.foreach { exec =>
109
println(s"Executor ${exec.executorId}: ${exec.totalCores} cores, ${exec.maxMemory} memory")
110
}
111
```
112
113
### ExecutorRunnable
114
115
Manages executor container launch and configuration within YARN containers.
116
117
```scala { .api }
118
class ExecutorRunnable(
119
container: Container,
120
conf: SparkConf,
121
spConf: SparkConf,
122
masterAddress: String,
123
executorId: String,
124
hostname: String,
125
executorMemory: Int,
126
executorCores: Int,
127
appId: String,
128
securityMgr: SecurityManager,
129
localResources: Map[String, LocalResource]
130
) {
131
def run(): Unit
132
}
133
```
134
135
**Responsibilities:**
136
- Container launch context creation
137
- Environment variable setup
138
- Java options and classpath configuration
139
- Security context establishment
140
- Executor process startup
141
142
**Container Launch Process:**
143
1. Prepares container environment (PATH, JAVA_HOME, etc.)
144
2. Constructs executor command line with JVM options
145
3. Sets up security credentials and tokens
146
4. Configures local resources and file permissions
147
5. Submits container launch request to NodeManager
148
149
### YarnRMClient
150
151
Handles ResourceManager registration and communication for the ApplicationMaster.
152
153
```scala { .api }
154
class YarnRMClient(amClient: AMRMClient[ContainerRequest]) {
155
def register(
156
driverHost: String,
157
driverPort: Int,
158
conf: YarnConfiguration,
159
sparkConf: SparkConf,
160
uiAddress: Option[String],
161
uiHistoryAddress: String
162
): Unit
163
def createAllocator(
164
conf: YarnConfiguration,
165
sparkConf: SparkConf,
166
driverUrl: String,
167
driverRef: RpcEndpointRef,
168
securityMgr: SecurityManager,
169
localResources: Map[String, LocalResource]
170
): YarnAllocator
171
def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit
172
def getAttemptId(): ApplicationAttemptId
173
def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int
174
}
175
```
176
177
**ResourceManager Integration:**
178
179
**`register(...): Unit`**
180
- Registers ApplicationMaster with YARN ResourceManager
181
- Provides driver connection details and web UI URLs
182
- Establishes communication channel for resource requests
183
184
**`createAllocator(...): YarnAllocator`**
185
- Creates and configures YarnAllocator instance
186
- Sets up container allocation and executor lifecycle management
187
- Integrates with security manager and local resources
188
189
**`unregister(status, diagnostics): Unit`**
190
- Unregisters ApplicationMaster from ResourceManager
191
- Reports final application status and diagnostic information
192
- Performs cleanup of ResourceManager communication
193
194
**`getAttemptId(): ApplicationAttemptId`**
195
- Returns current YARN application attempt identifier
196
- Used for tracking and logging purposes
197
198
**`getMaxRegAttempts(sparkConf, yarnConf): Int`**
199
- Determines maximum ApplicationMaster registration retry attempts
200
- Based on Spark and YARN configuration settings
201
202
### ExecutorRunnable
203
204
Manages executor container launch and configuration within YARN containers. This class handles the low-level details of starting Spark executors in YARN containers.
205
206
```scala { .api }
207
class ExecutorRunnable(
208
container: Container,
209
conf: Configuration,
210
sparkConf: SparkConf,
211
masterAddress: String,
212
executorId: String,
213
hostname: String,
214
executorMemory: Int,
215
executorCores: Int,
216
appId: String,
217
securityMgr: SecurityManager,
218
localResources: Map[String, LocalResource]
219
) {
220
def run(): Unit
221
def launchContextDebugInfo(): String
222
def startContainer(): java.util.Map[String, ByteBuffer]
223
}
224
```
225
226
**Container Launch Management:**
227
228
**`run(): Unit`**
229
- Main execution method that launches executor in YARN container
230
- Sets up container environment and security context
231
- Submits container launch request to NodeManager
232
- Handles launch failures and error reporting
233
234
**`launchContextDebugInfo(): String`**
235
- Returns detailed debug information about container launch context
236
- Includes environment variables, command line, and resource information
237
- Used for troubleshooting container launch issues
238
239
**`startContainer(): java.util.Map[String, ByteBuffer]`**
240
- Initiates container startup process
241
- Returns container tokens and security information
242
- Manages authentication and authorization setup
243
244
**Container Launch Process:**
245
1. **Environment Setup**: Configures PATH, JAVA_HOME, and other environment variables
246
2. **Command Construction**: Builds executor command line with JVM options and classpath
247
3. **Security Context**: Sets up Kerberos credentials and security tokens
248
4. **Resource Configuration**: Configures local resources and file permissions
249
5. **Container Submission**: Submits launch request to NodeManager
250
6. **Monitoring**: Tracks container launch status and handles failures
251
252
**Usage Example:**
253
```scala
254
// ExecutorRunnable is typically used internally by YarnAllocator
255
// when launching new executor containers
256
257
def launchExecutorContainer(
258
container: Container,
259
executorId: String): Unit = {
260
261
val runnable = new ExecutorRunnable(
262
container = container,
263
conf = yarnConf,
264
sparkConf = sparkConf,
265
masterAddress = driverUrl,
266
executorId = executorId,
267
hostname = container.getNodeId.getHost,
268
executorMemory = executorMemoryMB,
269
executorCores = executorCores,
270
appId = appId,
271
securityMgr = securityManager,
272
localResources = localResourceMap
273
)
274
275
// Launch in separate thread
276
launcherPool.execute(runnable)
277
}
278
```
279
280
## Resource Allocation Strategies
281
282
### Dynamic Allocation
283
284
Integration with Spark's dynamic allocation for elastic resource scaling.
285
286
```scala
287
// Configuration for dynamic allocation
288
val conf = new SparkConf()
289
.set("spark.dynamicAllocation.enabled", "true")
290
.set("spark.dynamicAllocation.minExecutors", "2")
291
.set("spark.dynamicAllocation.maxExecutors", "100")
292
.set("spark.dynamicAllocation.initialExecutors", "10")
293
.set("spark.dynamicAllocation.targetUtilization", "0.8")
294
```
295
296
**Scaling Behavior:**
297
- Requests additional containers when task queue grows
298
- Releases idle containers after configured timeout
299
- Respects YARN queue and cluster resource limits
300
- Coordinates with YARN fair/capacity schedulers
301
302
### Locality Preferences
303
304
Leverages YARN's rack awareness for optimal data locality.
305
306
```scala { .api }
307
class SparkRackResolver(conf: SparkConf, yarnConf: YarnConfiguration) {
308
def resolve(hostName: String): String
309
}
310
```
311
312
**Locality Levels:**
313
1. **NODE_LOCAL**: Same node as data
314
2. **RACK_LOCAL**: Same rack as data
315
3. **ANY**: Any available node
316
317
**Container Request Strategy:**
318
- Requests containers with locality preferences
319
- Falls back to lower locality levels if needed
320
- Balances locality with resource availability
321
322
### Resource Constraints
323
324
YARN-specific resource constraint handling.
325
326
**Memory Management:**
327
```scala
328
// Memory overhead calculation
329
val executorMemory = conf.get("spark.executor.memory", "1g")
330
val memoryOverhead = conf.get("spark.yarn.executor.memoryOverhead",
331
math.max((executorMemory * 0.1).toLong, 384L).toString)
332
val totalMemory = executorMemory + memoryOverhead
333
```
334
335
**CPU Allocation:**
336
```scala
337
// Core allocation with YARN constraints
338
val executorCores = conf.getInt("spark.executor.cores", 1)
339
val maxCores = yarnConf.getInt("yarn.scheduler.maximum-allocation-vcores", Int.MaxValue)
340
val requestCores = math.min(executorCores, maxCores)
341
```
342
343
## Container Lifecycle Management
344
345
### Container States
346
347
**Container Lifecycle:**
348
1. **REQUESTED**: Container request submitted to YARN
349
2. **ALLOCATED**: Container allocated by ResourceManager
350
3. **LAUNCHING**: Container being launched on NodeManager
351
4. **RUNNING**: Executor process running successfully
352
5. **COMPLETED**: Container finished (success or failure)
353
354
### Failure Handling
355
356
**Failure Categories:**
357
- **Preemption**: Container killed by YARN for resource rebalancing
358
- **Node Failure**: NodeManager or hardware failure
359
- **Application Error**: Executor process crashed or failed
360
- **Resource Exhaustion**: Out of memory or disk space
361
362
**Recovery Strategies:**
363
```scala
364
// Retry configuration
365
val maxExecutorFailures = conf.getInt("spark.yarn.max.executor.failures",
366
math.max(2 * numExecutors, 3))
367
368
// Blacklist configuration
369
val nodeBlacklistEnabled = conf.getBoolean("spark.blacklist.enabled", false)
370
val maxNodeBlacklist = conf.getInt("spark.blacklist.application.maxFailedTasksPerNode", 2)
371
```
372
373
### Container Monitoring
374
375
**Health Monitoring:**
376
- Container resource usage tracking
377
- Executor heartbeat monitoring
378
- Node failure detection
379
- Container exit code analysis
380
381
**Metrics Integration:**
382
```scala
383
// Monitoring through ApplicationMaster metrics
384
val source = new ApplicationMasterSource()
385
source.registerGauge("numExecutorsRunning", () => numExecutorsRunning)
386
source.registerGauge("numExecutorsFailed", () => numExecutorsFailed)
387
source.registerGauge("numPendingContainers", () => numPendingAllocate)
388
```
389
390
### Cleanup and Termination
391
392
**Graceful Shutdown:**
393
1. Stop accepting new container requests
394
2. Complete running tasks where possible
395
3. Release allocated but unused containers
396
4. Clean up staged resources
397
5. Unregister from ResourceManager
398
399
**Emergency Shutdown:**
400
1. Immediately kill all executor containers
401
2. Force cleanup of staged resources
402
3. Report failure status to ResourceManager
403
404
## Configuration Integration
405
406
### Memory Configuration
407
408
```scala
409
// Executor memory settings
410
"spark.executor.memory" -> "4g" // Heap memory
411
"spark.yarn.executor.memoryOverhead" -> "1g" // Off-heap overhead
412
"spark.executor.memoryFraction" -> "0.8" // Storage/execution split
413
414
// ApplicationMaster memory
415
"spark.yarn.am.memory" -> "2g" // AM heap memory
416
"spark.yarn.am.memoryOverhead" -> "384m" // AM overhead
417
```
418
419
### CPU Configuration
420
421
```scala
422
// Executor CPU settings
423
"spark.executor.cores" -> "4" // Cores per executor
424
"spark.executor.instances" -> "10" // Static executor count
425
426
// ApplicationMaster CPU
427
"spark.yarn.am.cores" -> "2" // AM CPU cores
428
```
429
430
### Resource Queue Configuration
431
432
```scala
433
// YARN queue and scheduling
434
"spark.yarn.queue" -> "production" // YARN queue name
435
"spark.yarn.priority" -> "1" // Application priority
436
"spark.yarn.maxAppAttempts" -> "3" // Max application attempts
437
```
438
439
## Error Handling and Diagnostics
440
441
### Common Resource Errors
442
443
**Insufficient Resources:**
444
```scala
445
throw new YarnException("Could not allocate container within timeout")
446
throw new IllegalStateException("Requested memory exceeds queue maximum")
447
```
448
449
**Container Launch Failures:**
450
```scala
451
throw new IOException("Failed to launch container on node")
452
throw new SecurityException("Container launch denied due to security policy")
453
```
454
455
**Resource Constraint Violations:**
456
```scala
457
throw new IllegalArgumentException("Executor memory must be at least 384MB")
458
throw new SparkException("Total requested cores exceed cluster capacity")
459
```
460
461
### Diagnostic Information
462
463
**Resource Status Reporting:**
464
```scala
465
def getResourceStatus: String = {
466
s"""
467
|Executors Running: $getNumExecutorsRunning
468
|Executors Failed: $getNumExecutorsFailed
469
|Containers Pending: $getNumPendingAllocate
470
|Total Containers: $getNumContainersRunning
471
""".stripMargin
472
}
473
```
474
475
**Container Exit Analysis:**
476
```scala
477
def analyzeContainerExit(status: ContainerStatus): String = {
478
status.getExitStatus match {
479
case 0 => "Container completed successfully"
480
case -100 => "Container killed by YARN (preemption)"
481
case -101 => "Container killed due to exceeding memory limits"
482
case other => s"Container failed with exit code: $other"
483
}
484
}
485
```
486
487
## Performance Optimization
488
489
### Resource Tuning Guidelines
490
491
**Memory Optimization:**
492
- Set executor memory to 80-90% of container memory
493
- Reserve sufficient overhead for off-heap operations
494
- Consider GC tuning for large heap sizes
495
496
**CPU Optimization:**
497
- Balance executor cores with parallelism needs
498
- Avoid over-subscribing CPU resources
499
- Consider NUMA topology on large nodes
500
501
**Container Sizing:**
502
- Larger containers reduce scheduling overhead
503
- Smaller containers provide better resource utilization
504
- Balance based on workload characteristics
505
506
### Locality Optimization
507
508
**Data Locality:**
509
- Co-locate executors with data when possible
510
- Use rack-aware placement for distributed data
511
- Consider storage system topology
512
513
**Network Locality:**
514
- Minimize cross-rack network traffic
515
- Optimize for cluster network topology
516
- Consider bandwidth constraints