0
# Resource Management
1
2
Apache Spark Core provides modern resource management capabilities including resource profiles, executor and task resource requests, and fine-grained resource allocation for heterogeneous workloads.
3
4
## ResourceProfile
5
6
Defines resource requirements for executors and tasks, enabling heterogeneous resource allocation within a single Spark application.
7
8
```scala { .api }
9
class ResourceProfile(
10
val executorResources: Map[String, ExecutorResourceRequest],
11
val taskResources: Map[String, TaskResourceRequest]
12
) {
13
def id: Int
14
def equals(other: Any): Boolean
15
def hashCode(): Int
16
def toString: String
17
}
18
19
object ResourceProfile {
20
val DEFAULT_RESOURCE_PROFILE_ID = 0
21
def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile
22
}
23
```
24
25
## ResourceProfileBuilder
26
27
Builder pattern for constructing resource profiles with fluent API.
28
29
```scala { .api }
30
class ResourceProfileBuilder {
31
def require(resourceRequest: ExecutorResourceRequest): ResourceProfileBuilder
32
def require(resourceRequest: TaskResourceRequest): ResourceProfileBuilder
33
def build(): ResourceProfile
34
}
35
```
36
37
## Resource Request Types
38
39
### ExecutorResourceRequest
40
```scala { .api }
41
class ExecutorResourceRequest(
42
val resourceName: String,
43
val amount: Long,
44
val discoveryScript: String = "",
45
val vendor: String = ""
46
) {
47
def resourceName: String
48
def amount: Long
49
def discoveryScript: String
50
def vendor: String
51
def equals(other: Any): Boolean
52
def hashCode(): Int
53
def toString: String
54
}
55
56
object ExecutorResourceRequest {
57
val CORES = "cores"
58
val MEMORY = "memory"
59
val OVERHEAD_MEM = "memoryOverhead"
60
val PYSPARK_MEM = "pysparkMemory"
61
val OFFHEAP_MEM = "offHeapMemory"
62
val FPGA = "fpga"
63
val GPU = "gpu"
64
}
65
```
66
67
### TaskResourceRequest
68
```scala { .api }
69
class TaskResourceRequest(
70
val resourceName: String,
71
val amount: Double
72
) {
73
def resourceName: String
74
def amount: Double
75
def equals(other: Any): Boolean
76
def hashCode(): Int
77
def toString: String
78
}
79
80
object TaskResourceRequest {
81
val CPUS = "cpus"
82
val FPGA = "fpga"
83
val GPU = "gpu"
84
}
85
```
86
87
## Resource Information
88
89
Runtime resource information available to tasks and executors.
90
91
```scala { .api }
92
class ResourceInformation(
93
val name: String,
94
val addresses: Array[String]
95
) {
96
def name: String
97
def addresses: Array[String]
98
def equals(other: Any): Boolean
99
def hashCode(): Int
100
def toString: String
101
}
102
```
103
104
## RDD Resource Profiles
105
106
Methods for associating RDDs with specific resource profiles.
107
108
```scala { .api }
109
// From RDD[T]
110
def withResources(profile: ResourceProfile): RDD[T]
111
def getResourceProfile(): ResourceProfile
112
```
113
114
## SparkContext Resource Methods
115
116
SparkContext methods for resource profile management.
117
118
```scala { .api }
119
// From SparkContext
120
def addResourceProfile(rp: ResourceProfile): Unit
121
def getResourceProfile(id: Int): Option[ResourceProfile]
122
def getResourceProfiles(): Map[Int, ResourceProfile]
123
def requestTotalExecutors(numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], nodeBlacklist: Set[String]): Boolean
124
def requestExecutors(numAdditionalExecutors: Int): Boolean
125
def killExecutors(executorIds: Seq[String]): Seq[String]
126
def killExecutor(executorId: String): Boolean
127
def getExecutorMemoryStatus: Map[String, (Long, Long)]
128
```
129
130
## Configuration Properties
131
132
Key resource management configuration options.
133
134
### Dynamic Allocation
135
- `spark.dynamicAllocation.enabled` - Enable dynamic executor allocation
136
- `spark.dynamicAllocation.minExecutors` - Minimum number of executors
137
- `spark.dynamicAllocation.maxExecutors` - Maximum number of executors
138
- `spark.dynamicAllocation.initialExecutors` - Initial number of executors
139
- `spark.dynamicAllocation.executorIdleTimeout` - Timeout for idle executors
140
- `spark.dynamicAllocation.schedulerBacklogTimeout` - Timeout for pending tasks
141
142
### Resource Discovery
143
- `spark.executor.resource.{resourceName}.discoveryScript` - Script to discover resource
144
- `spark.executor.resource.{resourceName}.vendor` - Resource vendor
145
- `spark.task.resource.{resourceName}.amount` - Amount of resource per task
146
147
### GPU Configuration
148
- `spark.executor.resource.gpu.amount` - Number of GPUs per executor
149
- `spark.executor.resource.gpu.discoveryScript` - GPU discovery script
150
- `spark.task.resource.gpu.amount` - GPU fraction per task
151
152
## Usage Examples
153
154
### Basic Resource Profile
155
```scala
156
import org.apache.spark.resource._
157
158
// Create resource profile with specific requirements
159
val resourceProfile = new ResourceProfileBuilder()
160
.require(new ExecutorResourceRequest("memory", 8, "", "")) // 8GB memory
161
.require(new ExecutorResourceRequest("cores", 4, "", "")) // 4 cores
162
.require(new TaskResourceRequest("cpus", 1.0)) // 1 CPU per task
163
.build()
164
165
// Register profile with SparkContext
166
sc.addResourceProfile(resourceProfile)
167
168
// Use profile with RDD
169
val data = sc.parallelize(1 to 1000)
170
val resourcedRDD = data.withResources(resourceProfile)
171
172
val result = resourcedRDD.map(heavyComputation).collect()
173
```
174
175
### GPU Resource Profile
176
```scala
177
// GPU-enabled resource profile
178
val gpuProfile = new ResourceProfileBuilder()
179
.require(new ExecutorResourceRequest("memory", 16, "", ""))
180
.require(new ExecutorResourceRequest("cores", 8, "", ""))
181
.require(new ExecutorResourceRequest("gpu", 2, "/opt/spark/gpu-discovery.sh", "nvidia"))
182
.require(new TaskResourceRequest("cpus", 2.0))
183
.require(new TaskResourceRequest("gpu", 0.5)) // Half GPU per task
184
.build()
185
186
sc.addResourceProfile(gpuProfile)
187
188
// Use for GPU-intensive workloads
189
val gpuData = sc.parallelize(largeDataset)
190
val gpuProcessedData = gpuData
191
.withResources(gpuProfile)
192
.map(gpuAcceleratedProcessing)
193
.collect()
194
```
195
196
### Mixed Workload Resource Management
197
```scala
198
// Light processing profile
199
val lightProfile = new ResourceProfileBuilder()
200
.require(new ExecutorResourceRequest("memory", 2, "", "")) // 2GB
201
.require(new ExecutorResourceRequest("cores", 2, "", "")) // 2 cores
202
.require(new TaskResourceRequest("cpus", 0.5)) // Half CPU per task
203
.build()
204
205
// Heavy processing profile
206
val heavyProfile = new ResourceProfileBuilder()
207
.require(new ExecutorResourceRequest("memory", 16, "", "")) // 16GB
208
.require(new ExecutorResourceRequest("cores", 8, "", "")) // 8 cores
209
.require(new TaskResourceRequest("cpus", 2.0)) // 2 CPUs per task
210
.build()
211
212
sc.addResourceProfile(lightProfile)
213
sc.addResourceProfile(heavyProfile)
214
215
// Apply different profiles to different stages
216
val rawData = sc.textFile("input.txt")
217
218
// Light processing for data cleaning
219
val cleanedData = rawData
220
.withResources(lightProfile)
221
.map(simpleCleanup)
222
.filter(isValid)
223
224
// Heavy processing for complex analytics
225
val analyticsResults = cleanedData
226
.withResources(heavyProfile)
227
.map(complexAnalytics)
228
.reduce(combineResults)
229
```
230
231
### Dynamic Resource Allocation
232
```scala
233
// Configure dynamic allocation through SparkConf
234
val conf = new SparkConf()
235
.setAppName("Dynamic Resource Example")
236
.set("spark.dynamicAllocation.enabled", "true")
237
.set("spark.dynamicAllocation.minExecutors", "2")
238
.set("spark.dynamicAllocation.maxExecutors", "20")
239
.set("spark.dynamicAllocation.initialExecutors", "5")
240
.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
241
.set("spark.dynamicAllocation.schedulerBacklogTimeout", "10s")
242
243
val sc = new SparkContext(conf)
244
245
// Request additional executors programmatically
246
val additionalExecutors = 5
247
val requested = sc.requestExecutors(additionalExecutors)
248
println(s"Requested $additionalExecutors additional executors: $requested")
249
250
// Monitor executor status
251
val memoryStatus = sc.getExecutorMemoryStatus
252
memoryStatus.foreach { case (executorId, (maxMem, remainingMem)) =>
253
val usedMem = maxMem - remainingMem
254
println(s"Executor $executorId: ${usedMem / (1024 * 1024)}MB used of ${maxMem / (1024 * 1024)}MB")
255
}
256
257
// Kill specific executors if needed
258
val executorsToKill = Seq("executor-1", "executor-2")
259
val killed = sc.killExecutors(executorsToKill)
260
println(s"Killed executors: ${killed.mkString(", ")}")
261
```
262
263
### Resource-Aware Task Scheduling
264
```scala
265
val data = sc.parallelize(1 to 10000, 100)
266
267
val processedData = data.mapPartitions { iter =>
268
val context = TaskContext.get()
269
270
// Get available resources for this task
271
val resources = context.resources()
272
val cpus = context.cpus()
273
274
// Adapt processing based on available resources
275
val processingStrategy = resources.get("gpu") match {
276
case Some(gpuInfo) =>
277
println(s"Using GPU acceleration: ${gpuInfo.addresses.mkString(", ")}")
278
"gpu-accelerated"
279
case None =>
280
if (cpus >= 4) {
281
println(s"Using multi-threaded CPU processing with $cpus cores")
282
"multi-threaded"
283
} else {
284
println(s"Using single-threaded processing with $cpus cores")
285
"single-threaded"
286
}
287
}
288
289
// Process data according to available resources
290
iter.map { value =>
291
processingStrategy match {
292
case "gpu-accelerated" => gpuProcess(value)
293
case "multi-threaded" => parallelProcess(value, cpus)
294
case "single-threaded" => serialProcess(value)
295
}
296
}
297
}
298
299
processedData.collect()
300
```
301
302
### FPGA Resource Management
303
```scala
304
// FPGA resource profile for specialized workloads
305
val fpgaProfile = new ResourceProfileBuilder()
306
.require(new ExecutorResourceRequest("memory", 32, "", ""))
307
.require(new ExecutorResourceRequest("cores", 16, "", ""))
308
.require(new ExecutorResourceRequest("fpga", 1, "/opt/spark/fpga-discovery.sh", "intel"))
309
.require(new TaskResourceRequest("cpus", 4.0))
310
.require(new TaskResourceRequest("fpga", 1.0)) // One FPGA per task
311
.build()
312
313
sc.addResourceProfile(fpgaProfile)
314
315
// Use FPGA for specialized computation
316
val fpgaWorkload = sc.parallelize(specializedDataset)
317
val fpgaResults = fpgaWorkload
318
.withResources(fpgaProfile)
319
.mapPartitions { iter =>
320
val context = TaskContext.get()
321
val fpgaResources = context.resources().get("fpga")
322
323
fpgaResources match {
324
case Some(fpga) =>
325
// Initialize FPGA with available device
326
val fpgaDevice = initializeFPGA(fpga.addresses.head)
327
try {
328
iter.map(processOnFPGA(_, fpgaDevice))
329
} finally {
330
fpgaDevice.close()
331
}
332
case None =>
333
// Fallback to CPU processing
334
iter.map(processByCPU)
335
}
336
}
337
.collect()
338
```
339
340
### Resource Profile Monitoring
341
```scala
342
// Get all registered resource profiles
343
val profiles = sc.getResourceProfiles()
344
profiles.foreach { case (id, profile) =>
345
println(s"Profile ID: $id")
346
println(s"Executor Resources: ${profile.executorResources}")
347
println(s"Task Resources: ${profile.taskResources}")
348
println("---")
349
}
350
351
// Monitor RDD resource associations
352
val data = sc.parallelize(1 to 100)
353
val profiledRDD = data.withResources(heavyProfile)
354
355
println(s"RDD resource profile ID: ${profiledRDD.getResourceProfile().id}")
356
println(s"Default profile ID: ${ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID}")
357
```
358
359
## Best Practices
360
361
### Resource Profile Design
362
1. **Match workload characteristics**: Choose resources based on computation type
363
2. **Consider task parallelism**: Balance task resources with executor resources
364
3. **Plan for heterogeneity**: Use different profiles for different processing stages
365
4. **Monitor utilization**: Track resource usage to optimize allocations
366
5. **Test configurations**: Benchmark different resource combinations
367
368
### Dynamic Allocation
369
1. **Set appropriate bounds**: Configure min/max executors based on workload
370
2. **Tune timeouts**: Adjust idle and backlog timeouts for responsiveness
371
3. **Monitor scaling**: Watch for oscillation in executor counts
372
4. **Consider costs**: Balance performance with resource costs
373
5. **Plan for failures**: Ensure minimum executors for fault tolerance
374
375
### GPU/FPGA Usage
376
1. **Efficient utilization**: Ensure full utilization of accelerator resources
377
2. **Memory management**: Consider GPU/FPGA memory limitations
378
3. **Fallback strategies**: Provide CPU fallbacks for resource unavailability
379
4. **Driver compatibility**: Ensure proper driver installation and discovery
380
5. **Task granularity**: Size tasks appropriately for accelerator efficiency