0
# Resource Management
1
2
Resource allocation and management components for negotiating and monitoring YARN cluster resources for Spark executors. These components handle the complex interactions with YARN's ResourceManager to obtain, monitor, and release container resources.
3
4
## Capabilities
5
6
### YarnRMClient Interface
7
8
Core interface for ResourceManager client operations, providing abstraction over YARN ResourceManager interactions.
9
10
```scala { .api }
11
/**
12
* Trait defining ResourceManager client interface
13
* Provides abstraction for YARN ResourceManager communication
14
*/
15
trait YarnRMClient {
16
// ResourceManager connection and authentication
17
// Container resource requests and releases
18
// Application status monitoring and reporting
19
// Resource allocation callbacks and event handling
20
}
21
```
22
23
**Usage Examples:**
24
25
```scala
26
import org.apache.spark.deploy.yarn.YarnRMClient
27
28
// YarnRMClient implementations are version-specific
29
// Created internally by ApplicationMaster and Client components
30
val rmClient: YarnRMClient = // Implementation instance
31
// Used for ResourceManager communication throughout application lifecycle
32
```
33
34
### YarnRMClientImpl
35
36
Version-specific implementations of the YarnRMClient trait for different Hadoop YARN API versions.
37
38
```scala { .api }
39
/**
40
* Implementation of YarnRMClient for specific YARN API versions
41
* Available in both alpha (deprecated) and stable modules
42
*/
43
class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient {
44
/**
45
* Register ApplicationMaster with ResourceManager
46
* @param host ApplicationMaster host
47
* @param port ApplicationMaster port
48
* @param trackingUrl Application tracking URL
49
* @return Registration response from ResourceManager
50
*/
51
def register(host: String, port: Int, trackingUrl: String): RegisterApplicationMasterResponse
52
53
/**
54
* Allocate resources from ResourceManager
55
* @param progressIndicator Application progress (0.0 to 1.0)
56
* @return Allocation response with assigned containers
57
*/
58
def allocate(progressIndicator: Float): AllocateResponse
59
60
/**
61
* Unregister ApplicationMaster from ResourceManager
62
* @param status Final application status
63
* @param appMessage Final message to ResourceManager
64
* @param trackingUrl Final tracking URL
65
*/
66
def unregister(status: FinalApplicationStatus, appMessage: String, trackingUrl: String): Unit
67
}
68
```
69
70
### YarnAllocator
71
72
Abstract base class providing core resource allocation logic for YARN containers.
73
74
```scala { .api }
75
/**
76
* Abstract base class for YARN resource allocation logic
77
* Manages executor container requests and lifecycle
78
*/
79
private[yarn] abstract class YarnAllocator(
80
conf: Configuration,
81
sparkConf: SparkConf,
82
appAttemptId: ApplicationAttemptId,
83
args: ApplicationMasterArguments,
84
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
85
securityMgr: SecurityManager
86
) extends Logging {
87
88
/** Abstract method to allocate containers from ResourceManager */
89
protected def allocateContainers(resourceRequests: JList[ResourceRequest]): Unit
90
91
/** Abstract method to release a specific container */
92
protected def releaseContainer(container: Container): Unit
93
94
/** Request total number of executors from cluster */
95
def requestTotalExecutors(requestedTotal: Int): Unit
96
97
/** Kill a specific executor by ID */
98
def killExecutor(executorId: String): Unit
99
100
/** Allocate resources and handle responses from ResourceManager */
101
def allocateResources(): Unit
102
103
/** Update resource requests for executor containers */
104
def updateResourceRequests(): Unit
105
}
106
107
/**
108
* Companion object with constants and utilities
109
*/
110
object YarnAllocator {
111
// Internal constants for resource allocation
112
}
113
```
114
115
**Usage Examples:**
116
117
```scala
118
import org.apache.spark.deploy.yarn.YarnAllocator
119
120
// YarnAllocator is extended by version-specific implementations
121
// Handles the core logic for:
122
// - Requesting executor containers from ResourceManager
123
// - Launching executor processes in allocated containers
124
// - Monitoring executor health and handling failures
125
// - Releasing containers when executors complete
126
```
127
128
### AllocationType Enumeration
129
130
Enumeration defining allocation strategies for YARN resource requests.
131
132
```scala { .api }
133
/**
134
* Enumeration for YARN allocation types
135
* Defines resource allocation strategies for container placement
136
*/
137
object AllocationType extends Enumeration {
138
type AllocationType = Value
139
140
/** Host-specific allocation - request container on specific host */
141
val HOST = Value
142
143
/** Rack-specific allocation - request container within specific rack */
144
val RACK = Value
145
146
/** Any allocation - allow ResourceManager to place container anywhere */
147
val ANY = Value
148
}
149
```
150
151
### YarnAllocationHandler
152
153
Version-specific resource allocation implementations that extend the base YarnAllocator functionality.
154
155
```scala { .api }
156
/**
157
* Version-specific resource allocation implementation
158
* Available in both alpha and stable API versions
159
*/
160
private[yarn] class YarnAllocationHandler extends YarnAllocator {
161
// Concrete implementation of resource allocation logic
162
// YARN API version-specific container operations
163
// Executor container launch and monitoring
164
// Resource optimization and failure recovery
165
}
166
```
167
168
### AllocationType Enumeration
169
170
Enumeration defining different types of YARN resource allocations.
171
172
```scala { .api }
173
/**
174
* Enumeration for YARN allocation types
175
* Defines different categories of resource requests
176
*/
177
object AllocationType extends Enumeration {
178
// Different allocation type values for resource categorization
179
// Used for tracking and managing different types of resource requests
180
}
181
```
182
183
**Usage Examples:**
184
185
```scala
186
import org.apache.spark.deploy.yarn.AllocationType
187
188
// Used internally for categorizing resource allocation requests
189
val allocationType = AllocationType.someValue
190
// Helps track and manage different types of container requests
191
```
192
193
## Resource Allocation Workflow
194
195
### Container Request Process
196
197
```scala
198
// Typical resource allocation workflow
199
abstract class YarnAllocator {
200
// 1. Calculate resource requirements based on Spark configuration
201
protected def calculateResourceRequirements(): ContainerRequest
202
203
// 2. Submit resource requests to ResourceManager
204
protected def requestContainers(requests: List[ContainerRequest]): Unit
205
206
// 3. Handle container allocation responses from ResourceManager
207
protected def onContainersAllocated(containers: List[Container]): Unit
208
209
// 4. Launch executor processes in allocated containers
210
protected def launchExecutors(containers: List[Container]): Unit
211
212
// 5. Monitor executor health and handle failures
213
protected def monitorExecutors(): Unit
214
215
// 6. Release containers when executors complete or fail
216
protected def releaseContainers(containers: List[Container]): Unit
217
}
218
```
219
220
### Resource Requirements Calculation
221
222
```scala
223
// Resource calculation based on Spark configuration
224
class YarnAllocator {
225
private def buildContainerRequest(): ContainerRequest = {
226
val executorMemory = sparkConf.get("spark.executor.memory", "1024m")
227
val executorCores = sparkConf.getInt("spark.executor.cores", 1)
228
val priority = sparkConf.getInt("spark.yarn.priority", 0)
229
230
// Build YARN ContainerRequest with calculated resources
231
// Include locality preferences and resource constraints
232
// Set appropriate priority and capability requirements
233
}
234
}
235
```
236
237
## Container Lifecycle Management
238
239
### Executor Container Launch
240
241
```scala
242
// Container launch process for executor deployment
243
class YarnAllocationHandler {
244
private def launchExecutorContainer(container: Container): Unit = {
245
// 1. Prepare executor launch context
246
val launchContext = createExecutorLaunchContext()
247
248
// 2. Set up environment variables and classpath
249
setupExecutorEnvironment(launchContext)
250
251
// 3. Configure executor JVM parameters
252
configureExecutorJVM(launchContext)
253
254
// 4. Launch container with NodeManager
255
nodeManager.startContainer(container, launchContext)
256
257
// 5. Track container for monitoring
258
trackExecutorContainer(container.getId)
259
}
260
}
261
```
262
263
### Container Monitoring
264
265
```scala
266
// Continuous monitoring of executor containers
267
abstract class YarnAllocator {
268
private def monitorContainerHealth(): Unit = {
269
// Monitor executor heartbeats and health status
270
// Detect container failures and exits
271
// Handle node failures and blacklisting
272
// Report container status to ApplicationMaster
273
}
274
}
275
```
276
277
## Resource Optimization Strategies
278
279
### Locality Preferences
280
281
```scala
282
// Optimizing container allocation for data locality
283
class YarnAllocator {
284
private def buildLocalityRequest(
285
preferredNodes: List[String],
286
preferredRacks: List[String]
287
): ContainerRequest = {
288
// Request containers on preferred nodes (highest priority)
289
// Fall back to preferred racks (medium priority)
290
// Accept any available nodes (lowest priority)
291
// Balance locality with resource availability
292
}
293
}
294
```
295
296
### Dynamic Resource Adjustment
297
298
```scala
299
// Dynamic executor allocation integration
300
abstract class YarnAllocator {
301
def requestAdditionalExecutors(numExecutors: Int): Unit = {
302
// Support for Spark's dynamic allocation feature
303
// Scale executor count based on workload demands
304
// Integrate with cluster resource availability
305
}
306
307
def removeExecutors(executorIds: Set[String]): Unit = {
308
// Gracefully shutdown and release executor containers
309
// Ensure running tasks complete before container release
310
// Update resource tracking and allocation state
311
}
312
}
313
```
314
315
## Configuration Integration
316
317
### Resource Configuration
318
319
```scala
320
// Key configuration properties affecting resource management
321
spark.executor.memory=2g // Executor container memory
322
spark.executor.cores=2 // Executor CPU cores
323
spark.executor.memoryFraction=0.8 // Memory allocation within container
324
spark.executor.memoryOffHeap.enabled=true // Off-heap memory usage
325
326
// YARN-specific resource configuration
327
spark.yarn.executor.memoryOverhead=384 // Additional memory overhead
328
spark.yarn.executor.nodeLabelExpression // Node label constraints
329
spark.yarn.priority=0 // Application priority
330
spark.yarn.queue=production // YARN queue assignment
331
```
332
333
### Container Configuration
334
335
```scala
336
// Container-level configuration options
337
spark.yarn.containerLauncherMaxThreads=25 // Parallel container launch limit
338
spark.yarn.executor.failuresValidityInterval=1h // Failure tracking window
339
spark.yarn.max.executor.failures=3 // Max executor failures per node
340
spark.yarn.nodemanager.resource.memory-mb // NodeManager memory limits
341
```
342
343
## Error Handling and Recovery
344
345
### Container Failure Recovery
346
347
```scala
348
// Robust error handling for container failures
349
class YarnAllocator {
350
private def handleContainerFailure(containerId: ContainerId, exitStatus: Int): Unit = {
351
// Log container failure details
352
// Determine if failure should trigger node blacklisting
353
// Request replacement container if within failure limits
354
// Report failure to Spark scheduler for task rescheduling
355
}
356
357
private def blacklistNode(nodeId: String, reason: String): Unit = {
358
// Add node to blacklist to avoid future allocations
359
// Implement exponential backoff for blacklist duration
360
// Provide mechanisms for node rehabilitation
361
}
362
}
363
```
364
365
### ResourceManager Connection Handling
366
367
```scala
368
// Handling ResourceManager failures and reconnection
369
trait YarnRMClient {
370
private def handleRMFailure(): Unit = {
371
// Detect ResourceManager connection loss
372
// Implement reconnection with exponential backoff
373
// Recover application state after reconnection
374
// Resubmit pending resource requests
375
}
376
}
377
```
378
379
## Performance Monitoring
380
381
### Resource Utilization Metrics
382
383
```scala
384
// Integration with Spark metrics system
385
abstract class YarnAllocator {
386
// Track container allocation success rates
387
private val containerAllocationRate = metrics.meter("containerAllocationRate")
388
389
// Monitor resource request fulfillment times
390
private val resourceRequestLatency = metrics.histogram("resourceRequestLatency")
391
392
// Track executor launch success and failure rates
393
private val executorLaunchSuccess = metrics.counter("executorLaunchSuccess")
394
private val executorLaunchFailure = metrics.counter("executorLaunchFailure")
395
}
396
```
397
398
### Allocation Efficiency
399
400
The resource management components provide detailed metrics for:
401
402
- Container allocation latency and success rates
403
- Resource utilization efficiency across cluster nodes
404
- Executor failure patterns and recovery times
405
- Queue wait times and priority effectiveness
406
- Node blacklisting and rehabilitation statistics
407
408
These metrics enable optimization of resource allocation strategies and troubleshooting of cluster resource issues.