0
# Resource Management
1
2
Components responsible for allocating and managing YARN containers for Spark executors. These classes handle resource allocation strategies, placement policies, container management, and communication with YARN ResourceManager and NodeManagers.
3
4
## Capabilities
5
6
### YarnAllocator
7
8
Core component that manages resource allocation and executor lifecycle on YARN. Handles container requests, allocation monitoring, and executor management.
9
10
```scala { .api }
11
/**
12
* Manages resource allocation and executor lifecycle on YARN
13
* Handles container requests, allocation, and executor management
14
*/
15
class YarnAllocator {
16
17
/** Allocate containers from YARN ResourceManager */
18
def allocateResources(): Unit
19
20
/**
21
* Request executors with locality preferences
22
* @return true if request was successful
23
*/
24
def requestTotalExecutorsWithPreferredLocalities(): Boolean
25
26
/**
27
* Kill specific executor
28
* @param executorId ID of executor to kill
29
*/
30
def killExecutor(executorId: String): Unit
31
32
/** Get number of failed executors */
33
def getNumExecutorsFailed: Int
34
35
/** Stop allocator and cleanup resources */
36
def stop(): Unit
37
}
38
```
39
40
**Usage Pattern:**
41
42
```scala
43
// YarnAllocator is typically used internally by scheduler backends
44
// but understanding its role is important for configuration
45
46
import org.apache.spark.SparkConf
47
48
val conf = new SparkConf()
49
.setMaster("yarn")
50
.setAppName("ResourceAllocationExample")
51
// Configure executor resources
52
.set("spark.executor.instances", "10")
53
.set("spark.executor.cores", "2")
54
.set("spark.executor.memory", "4g")
55
.set("spark.executor.memoryOverhead", "512m")
56
// Configure allocation behavior
57
.set("spark.yarn.executor.failuresValidityInterval", "1h")
58
.set("spark.yarn.max.executor.failures", "3")
59
60
// YarnAllocator will use these settings for resource allocation
61
```
62
63
### YarnRMClient
64
65
Client for communicating with YARN ResourceManager. Handles ApplicationMaster registration and resource allocator creation.
66
67
```scala { .api }
68
/**
69
* Client for communicating with YARN ResourceManager
70
* Handles AM registration and resource allocation setup
71
*/
72
class YarnRMClient {
73
74
/** Register ApplicationMaster with ResourceManager */
75
def register(): Unit
76
77
/** Create resource allocator instance */
78
def createAllocator(): YarnAllocator
79
80
/**
81
* Unregister from ResourceManager
82
* @param status Final application status
83
* @param diagnostics Diagnostic message
84
*/
85
def unregister(status: FinalApplicationStatus, diagnostics: String): Unit
86
}
87
```
88
89
### ResourceRequestHelper
90
91
Helper object for creating YARN resource requests with proper resource profiles and constraints.
92
93
```scala { .api }
94
/**
95
* Helper for creating YARN resource requests
96
* Handles resource profiles, constraints, and locality preferences
97
*/
98
object ResourceRequestHelper {
99
100
/**
101
* Create resource request for executors
102
* @param resource Resource requirements
103
* @param nodes Preferred node list
104
* @param racks Preferred rack list
105
* @param numContainers Number of containers requested
106
* @return YARN ResourceRequest
107
*/
108
def createResourceRequest(
109
resource: Resource,
110
nodes: List[String],
111
racks: List[String],
112
numContainers: Int
113
): ResourceRequest
114
115
/**
116
* Build resource profile for container
117
* @param cores Number of CPU cores
118
* @param memory Memory in MB
119
* @param customResources Additional resource requirements
120
* @return YARN resource profile
121
*/
122
def buildResourceProfile(
123
cores: Int,
124
memory: Int,
125
customResources: Map[String, Long] = Map.empty
126
): Resource
127
}
128
```
129
130
**Usage Example:**
131
132
```scala
133
import org.apache.spark.deploy.yarn.ResourceRequestHelper
134
import org.apache.hadoop.yarn.api.records.Resource
135
136
// Create resource profile for executors
137
val executorResource = ResourceRequestHelper.buildResourceProfile(
138
cores = 2,
139
memory = 4096, // 4GB in MB
140
customResources = Map(
141
"yarn.io/gpu" -> 1L, // Request 1 GPU
142
"example.com/fpga" -> 2L // Request 2 FPGAs
143
)
144
)
145
146
// Create resource request with locality preferences
147
val resourceRequest = ResourceRequestHelper.createResourceRequest(
148
resource = executorResource,
149
nodes = List("worker1.example.com", "worker2.example.com"),
150
racks = List("/rack1", "/rack2"),
151
numContainers = 5
152
)
153
```
154
155
### Container Placement Strategies
156
157
#### LocalityPreferredContainerPlacementStrategy
158
159
Container placement strategy that optimizes for data locality and resource utilization.
160
161
```scala { .api }
162
/**
163
* Container placement strategy with locality preferences
164
* Optimizes container placement based on data locality and cluster topology
165
*/
166
class LocalityPreferredContainerPlacementStrategy {
167
168
/**
169
* Determine optimal container placement
170
* @param availableNodes Available cluster nodes
171
* @param requestedContainers Number of containers to place
172
* @param localityPreferences Data locality preferences
173
* @return Placement recommendations
174
*/
175
def getContainerPlacement(
176
availableNodes: Seq[String],
177
requestedContainers: Int,
178
localityPreferences: Map[String, Seq[String]]
179
): Map[String, Int]
180
}
181
```
182
183
### YarnAllocatorNodeHealthTracker
184
185
Tracks node health and executor failure patterns to optimize resource allocation decisions.
186
187
```scala { .api }
188
/**
189
* Tracks node health and executor failure patterns
190
* Used by YarnAllocator for intelligent resource allocation
191
*/
192
class YarnAllocatorNodeHealthTracker {
193
194
/**
195
* Record executor failure on a node
196
* @param nodeId Node identifier
197
* @param executorId Failed executor ID
198
*/
199
def recordFailure(nodeId: String, executorId: String): Unit
200
201
/**
202
* Check if node should be avoided for new allocations
203
* @param nodeId Node to check
204
* @return true if node should be avoided
205
*/
206
def shouldAvoidNode(nodeId: String): Boolean
207
208
/**
209
* Get healthy nodes for allocation
210
* @param candidateNodes Candidate nodes to filter
211
* @return Filtered list of healthy nodes
212
*/
213
def getHealthyNodes(candidateNodes: Seq[String]): Seq[String]
214
}
215
```
216
217
## Advanced Resource Management
218
219
### Dynamic Allocation Integration
220
221
```scala
222
import org.apache.spark.SparkConf
223
224
// Configure dynamic allocation with YARN
225
val conf = new SparkConf()
226
.setMaster("yarn")
227
.setAppName("DynamicAllocationExample")
228
// Enable dynamic allocation
229
.set("spark.dynamicAllocation.enabled", "true")
230
.set("spark.dynamicAllocation.minExecutors", "2")
231
.set("spark.dynamicAllocation.maxExecutors", "50")
232
.set("spark.dynamicAllocation.initialExecutors", "10")
233
// Configure allocation behavior
234
.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
235
.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "300s")
236
.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
237
.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "5s")
238
// Configure with external shuffle service
239
.set("spark.shuffle.service.enabled", "true")
240
.set("spark.yarn.shuffle.stopOnFailure", "false")
241
```
242
243
### Custom Resource Types
244
245
```scala
246
import org.apache.spark.SparkConf
247
248
// Configure custom resource types (GPUs, FPGAs, etc.)
249
val conf = new SparkConf()
250
.setMaster("yarn")
251
.setAppName("CustomResourceExample")
252
// Configure GPU resources
253
.set("spark.executor.resource.gpu.amount", "1")
254
.set("spark.executor.resource.gpu.discoveryScript", "/opt/spark/scripts/gpu_discovery.sh")
255
.set("spark.executor.resource.gpu.vendor", "nvidia.com")
256
// Configure custom FPGA resources
257
.set("spark.executor.resource.fpga.amount", "2")
258
.set("spark.executor.resource.fpga.discoveryScript", "/opt/spark/scripts/fpga_discovery.sh")
259
.set("spark.executor.resource.fpga.vendor", "xilinx.com")
260
// Task-level resource requests
261
.set("spark.task.resource.gpu.amount", "0.5") // Tasks can share GPUs
262
.set("spark.task.resource.fpga.amount", "1") // Tasks use dedicated FPGAs
263
```
264
265
### Resource Allocation Patterns
266
267
#### Static Allocation
268
269
```scala
270
import org.apache.spark.{SparkConf, SparkContext}
271
272
// Static resource allocation - fixed number of executors
273
val conf = new SparkConf()
274
.setMaster("yarn")
275
.setAppName("StaticAllocationExample")
276
.set("spark.dynamicAllocation.enabled", "false")
277
.set("spark.executor.instances", "20")
278
.set("spark.executor.cores", "4")
279
.set("spark.executor.memory", "8g")
280
.set("spark.executor.memoryOverhead", "1g")
281
282
val sc = new SparkContext(conf)
283
```
284
285
#### Memory Optimization
286
287
```scala
288
import org.apache.spark.SparkConf
289
290
// Optimize memory allocation for different workloads
291
val conf = new SparkConf()
292
.setMaster("yarn")
293
.setAppName("MemoryOptimizationExample")
294
// Total executor memory breakdown
295
.set("spark.executor.memory", "12g")
296
.set("spark.executor.memoryOverhead", "2g") // Additional overhead for YARN container
297
// Memory fractions for different purposes
298
.set("spark.executor.memoryFraction", "0.8") // JVM heap for RDD cache and execution
299
.set("spark.executor.storageFraction", "0.5") // Fraction of memoryFraction for RDD cache
300
// Off-heap memory (optional)
301
.set("spark.executor.memory.offHeap.enabled", "true")
302
.set("spark.executor.memory.offHeap.size", "4g")
303
```
304
305
#### Locality-Aware Allocation
306
307
```scala
308
import org.apache.spark.SparkConf
309
310
// Configure locality preferences for optimal data access
311
val conf = new SparkConf()
312
.setMaster("yarn")
313
.setAppName("LocalityAwareExample")
314
// Locality wait times
315
.set("spark.locality.wait", "3s") // Generic locality wait
316
.set("spark.locality.wait.process", "0") // Wait for process-local data
317
.set("spark.locality.wait.node", "1s") // Wait for node-local data
318
.set("spark.locality.wait.rack", "2s") // Wait for rack-local data
319
// Node labeling for placement
320
.set("spark.yarn.executor.nodeLabelExpression", "compute-intensive")
321
.set("spark.yarn.am.nodeLabelExpression", "management")
322
```
323
324
### Container Lifecycle Management
325
326
#### Preemption Handling
327
328
```scala
329
import org.apache.spark.SparkConf
330
331
// Configure preemption handling for spot instances or preemptible queues
332
val conf = new SparkConf()
333
.setMaster("yarn")
334
.setAppName("PreemptionHandlingExample")
335
// Enable preemption-aware scheduling
336
.set("spark.yarn.executor.failuresValidityInterval", "1h")
337
.set("spark.yarn.max.executor.failures", "10") // Higher tolerance for preemptions
338
.set("spark.yarn.submit.waitAppCompletion", "false") // Don't wait if using spot instances
339
// Configure checkpointing for fault tolerance
340
.set("spark.sql.streaming.checkpointLocation", "hdfs://cluster/checkpoints/myapp")
341
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Faster recovery
342
```
343
344
#### Container Resource Monitoring
345
346
```scala
347
// Monitor resource usage and allocation effectiveness
348
import org.apache.spark.SparkContext
349
350
val sc = SparkContext.getOrCreate()
351
352
// Access YARN-specific information
353
val yarnBackend = sc.schedulerBackend match {
354
case yarn: org.apache.spark.scheduler.cluster.YarnSchedulerBackend =>
355
Some(yarn)
356
case _ => None
357
}
358
359
yarnBackend.foreach { backend =>
360
println(s"Application ID: ${backend.applicationId()}")
361
println(s"Application Attempt: ${backend.applicationAttemptId()}")
362
363
// For cluster mode, get additional information
364
backend match {
365
case clusterBackend: org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend =>
366
val driverLogs = clusterBackend.getDriverLogUrls()
367
val driverAttrs = clusterBackend.getDriverAttributes()
368
println(s"Driver Container Logs: $driverLogs")
369
println(s"Driver Container Attributes: $driverAttrs")
370
case _ => // Client mode
371
}
372
}
373
```
374
375
## Error Handling and Diagnostics
376
377
### Resource Allocation Failures
378
379
```scala
380
import org.apache.spark.SparkConf
381
382
// Configure retry and failure handling for resource allocation
383
val conf = new SparkConf()
384
.setMaster("yarn")
385
.setAppName("AllocationFailureHandling")
386
// Retry configuration
387
.set("spark.yarn.maxAppAttempts", "3")
388
.set("spark.yarn.am.attemptFailuresValidityInterval", "1h")
389
.set("spark.yarn.executor.failuresValidityInterval", "1h")
390
.set("spark.yarn.max.executor.failures", "5")
391
// Resource allocation timeouts
392
.set("spark.yarn.scheduler.heartbeat.interval-ms", "3000")
393
.set("spark.yarn.scheduler.initial-allocation.interval", "200ms")
394
.set("spark.yarn.containerLauncherMaxThreads", "25")
395
// Enable detailed logging
396
.set("spark.yarn.am.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps")
397
.set("spark.yarn.executor.extraJavaOptions", "-XX:+PrintGCDetails")
398
```
399
400
### Resource Request Debugging
401
402
```scala
403
// Enable detailed logging for resource allocation debugging
404
import org.apache.spark.SparkConf
405
406
val conf = new SparkConf()
407
.setMaster("yarn")
408
.setAppName("ResourceDebuggingExample")
409
// Enable debug logging
410
.set("spark.yarn.am.extraJavaOptions",
411
"-Dlog4j.configuration=yarn-log4j.properties " +
412
"-Dyarn.app.container.log.dir=/tmp/yarn-logs " +
413
"-Dyarn.app.container.log.filesize=100MB")
414
// Resource request diagnostics
415
.set("spark.yarn.preserve.staging.files", "true") // Keep staging files for debugging
416
)set("spark.yarn.submit.file.replication", "3") // Ensure file availability
417
```