or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

context-config.mdindex.mdjava-api.mdrdd-operations.mdresource-management.mdserialization.mdshared-variables.mdstorage-caching.mdtask-context.md

resource-management.mddocs/

0

# Resource Management

1

2

Apache Spark Core provides modern resource management capabilities including resource profiles, executor and task resource requests, and fine-grained resource allocation for heterogeneous workloads.

3

4

## ResourceProfile

5

6

Defines resource requirements for executors and tasks, enabling heterogeneous resource allocation within a single Spark application.

7

8

```scala { .api }

9

class ResourceProfile(

10

val executorResources: Map[String, ExecutorResourceRequest],

11

val taskResources: Map[String, TaskResourceRequest]

12

) {

13

def id: Int

14

def equals(other: Any): Boolean

15

def hashCode(): Int

16

def toString: String

17

}

18

19

object ResourceProfile {

20

val DEFAULT_RESOURCE_PROFILE_ID = 0

21

def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile

22

}

23

```

24

25

## ResourceProfileBuilder

26

27

Builder pattern for constructing resource profiles with fluent API.

28

29

```scala { .api }

30

class ResourceProfileBuilder {

31

def require(resourceRequest: ExecutorResourceRequest): ResourceProfileBuilder

32

def require(resourceRequest: TaskResourceRequest): ResourceProfileBuilder

33

def build(): ResourceProfile

34

}

35

```

36

37

## Resource Request Types

38

39

### ExecutorResourceRequest

40

```scala { .api }

41

class ExecutorResourceRequest(

42

val resourceName: String,

43

val amount: Long,

44

val discoveryScript: String = "",

45

val vendor: String = ""

46

) {

47

def resourceName: String

48

def amount: Long

49

def discoveryScript: String

50

def vendor: String

51

def equals(other: Any): Boolean

52

def hashCode(): Int

53

def toString: String

54

}

55

56

object ExecutorResourceRequest {

57

val CORES = "cores"

58

val MEMORY = "memory"

59

val OVERHEAD_MEM = "memoryOverhead"

60

val PYSPARK_MEM = "pysparkMemory"

61

val OFFHEAP_MEM = "offHeapMemory"

62

val FPGA = "fpga"

63

val GPU = "gpu"

64

}

65

```

66

67

### TaskResourceRequest

68

```scala { .api }

69

class TaskResourceRequest(

70

val resourceName: String,

71

val amount: Double

72

) {

73

def resourceName: String

74

def amount: Double

75

def equals(other: Any): Boolean

76

def hashCode(): Int

77

def toString: String

78

}

79

80

object TaskResourceRequest {

81

val CPUS = "cpus"

82

val FPGA = "fpga"

83

val GPU = "gpu"

84

}

85

```

86

87

## Resource Information

88

89

Runtime resource information available to tasks and executors.

90

91

```scala { .api }

92

class ResourceInformation(

93

val name: String,

94

val addresses: Array[String]

95

) {

96

def name: String

97

def addresses: Array[String]

98

def equals(other: Any): Boolean

99

def hashCode(): Int

100

def toString: String

101

}

102

```

103

104

## RDD Resource Profiles

105

106

Methods for associating RDDs with specific resource profiles.

107

108

```scala { .api }

109

// From RDD[T]

110

def withResources(profile: ResourceProfile): RDD[T]

111

def getResourceProfile(): ResourceProfile

112

```

113

114

## SparkContext Resource Methods

115

116

SparkContext methods for resource profile management.

117

118

```scala { .api }

119

// From SparkContext

120

def addResourceProfile(rp: ResourceProfile): Unit

121

def getResourceProfile(id: Int): Option[ResourceProfile]

122

def getResourceProfiles(): Map[Int, ResourceProfile]

123

def requestTotalExecutors(numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], nodeBlacklist: Set[String]): Boolean

124

def requestExecutors(numAdditionalExecutors: Int): Boolean

125

def killExecutors(executorIds: Seq[String]): Seq[String]

126

def killExecutor(executorId: String): Boolean

127

def getExecutorMemoryStatus: Map[String, (Long, Long)]

128

```

129

130

## Configuration Properties

131

132

Key resource management configuration options.

133

134

### Dynamic Allocation

135

- `spark.dynamicAllocation.enabled` - Enable dynamic executor allocation

136

- `spark.dynamicAllocation.minExecutors` - Minimum number of executors

137

- `spark.dynamicAllocation.maxExecutors` - Maximum number of executors

138

- `spark.dynamicAllocation.initialExecutors` - Initial number of executors

139

- `spark.dynamicAllocation.executorIdleTimeout` - Timeout for idle executors

140

- `spark.dynamicAllocation.schedulerBacklogTimeout` - Timeout for pending tasks

141

142

### Resource Discovery

143

- `spark.executor.resource.{resourceName}.discoveryScript` - Script to discover resource

144

- `spark.executor.resource.{resourceName}.vendor` - Resource vendor

145

- `spark.task.resource.{resourceName}.amount` - Amount of resource per task

146

147

### GPU Configuration

148

- `spark.executor.resource.gpu.amount` - Number of GPUs per executor

149

- `spark.executor.resource.gpu.discoveryScript` - GPU discovery script

150

- `spark.task.resource.gpu.amount` - GPU fraction per task

151

152

## Usage Examples

153

154

### Basic Resource Profile

155

```scala

156

import org.apache.spark.resource._

157

158

// Create resource profile with specific requirements

159

val resourceProfile = new ResourceProfileBuilder()

160

.require(new ExecutorResourceRequest("memory", 8, "", "")) // 8GB memory

161

.require(new ExecutorResourceRequest("cores", 4, "", "")) // 4 cores

162

.require(new TaskResourceRequest("cpus", 1.0)) // 1 CPU per task

163

.build()

164

165

// Register profile with SparkContext

166

sc.addResourceProfile(resourceProfile)

167

168

// Use profile with RDD

169

val data = sc.parallelize(1 to 1000)

170

val resourcedRDD = data.withResources(resourceProfile)

171

172

val result = resourcedRDD.map(heavyComputation).collect()

173

```

174

175

### GPU Resource Profile

176

```scala

177

// GPU-enabled resource profile

178

val gpuProfile = new ResourceProfileBuilder()

179

.require(new ExecutorResourceRequest("memory", 16, "", ""))

180

.require(new ExecutorResourceRequest("cores", 8, "", ""))

181

.require(new ExecutorResourceRequest("gpu", 2, "/opt/spark/gpu-discovery.sh", "nvidia"))

182

.require(new TaskResourceRequest("cpus", 2.0))

183

.require(new TaskResourceRequest("gpu", 0.5)) // Half GPU per task

184

.build()

185

186

sc.addResourceProfile(gpuProfile)

187

188

// Use for GPU-intensive workloads

189

val gpuData = sc.parallelize(largeDataset)

190

val gpuProcessedData = gpuData

191

.withResources(gpuProfile)

192

.map(gpuAcceleratedProcessing)

193

.collect()

194

```

195

196

### Mixed Workload Resource Management

197

```scala

198

// Light processing profile

199

val lightProfile = new ResourceProfileBuilder()

200

.require(new ExecutorResourceRequest("memory", 2, "", "")) // 2GB

201

.require(new ExecutorResourceRequest("cores", 2, "", "")) // 2 cores

202

.require(new TaskResourceRequest("cpus", 0.5)) // Half CPU per task

203

.build()

204

205

// Heavy processing profile

206

val heavyProfile = new ResourceProfileBuilder()

207

.require(new ExecutorResourceRequest("memory", 16, "", "")) // 16GB

208

.require(new ExecutorResourceRequest("cores", 8, "", "")) // 8 cores

209

.require(new TaskResourceRequest("cpus", 2.0)) // 2 CPUs per task

210

.build()

211

212

sc.addResourceProfile(lightProfile)

213

sc.addResourceProfile(heavyProfile)

214

215

// Apply different profiles to different stages

216

val rawData = sc.textFile("input.txt")

217

218

// Light processing for data cleaning

219

val cleanedData = rawData

220

.withResources(lightProfile)

221

.map(simpleCleanup)

222

.filter(isValid)

223

224

// Heavy processing for complex analytics

225

val analyticsResults = cleanedData

226

.withResources(heavyProfile)

227

.map(complexAnalytics)

228

.reduce(combineResults)

229

```

230

231

### Dynamic Resource Allocation

232

```scala

233

// Configure dynamic allocation through SparkConf

234

val conf = new SparkConf()

235

.setAppName("Dynamic Resource Example")

236

.set("spark.dynamicAllocation.enabled", "true")

237

.set("spark.dynamicAllocation.minExecutors", "2")

238

.set("spark.dynamicAllocation.maxExecutors", "20")

239

.set("spark.dynamicAllocation.initialExecutors", "5")

240

.set("spark.dynamicAllocation.executorIdleTimeout", "60s")

241

.set("spark.dynamicAllocation.schedulerBacklogTimeout", "10s")

242

243

val sc = new SparkContext(conf)

244

245

// Request additional executors programmatically

246

val additionalExecutors = 5

247

val requested = sc.requestExecutors(additionalExecutors)

248

println(s"Requested $additionalExecutors additional executors: $requested")

249

250

// Monitor executor status

251

val memoryStatus = sc.getExecutorMemoryStatus

252

memoryStatus.foreach { case (executorId, (maxMem, remainingMem)) =>

253

val usedMem = maxMem - remainingMem

254

println(s"Executor $executorId: ${usedMem / (1024 * 1024)}MB used of ${maxMem / (1024 * 1024)}MB")

255

}

256

257

// Kill specific executors if needed

258

val executorsToKill = Seq("executor-1", "executor-2")

259

val killed = sc.killExecutors(executorsToKill)

260

println(s"Killed executors: ${killed.mkString(", ")}")

261

```

262

263

### Resource-Aware Task Scheduling

264

```scala

265

val data = sc.parallelize(1 to 10000, 100)

266

267

val processedData = data.mapPartitions { iter =>

268

val context = TaskContext.get()

269

270

// Get available resources for this task

271

val resources = context.resources()

272

val cpus = context.cpus()

273

274

// Adapt processing based on available resources

275

val processingStrategy = resources.get("gpu") match {

276

case Some(gpuInfo) =>

277

println(s"Using GPU acceleration: ${gpuInfo.addresses.mkString(", ")}")

278

"gpu-accelerated"

279

case None =>

280

if (cpus >= 4) {

281

println(s"Using multi-threaded CPU processing with $cpus cores")

282

"multi-threaded"

283

} else {

284

println(s"Using single-threaded processing with $cpus cores")

285

"single-threaded"

286

}

287

}

288

289

// Process data according to available resources

290

iter.map { value =>

291

processingStrategy match {

292

case "gpu-accelerated" => gpuProcess(value)

293

case "multi-threaded" => parallelProcess(value, cpus)

294

case "single-threaded" => serialProcess(value)

295

}

296

}

297

}

298

299

processedData.collect()

300

```

301

302

### FPGA Resource Management

303

```scala

304

// FPGA resource profile for specialized workloads

305

val fpgaProfile = new ResourceProfileBuilder()

306

.require(new ExecutorResourceRequest("memory", 32, "", ""))

307

.require(new ExecutorResourceRequest("cores", 16, "", ""))

308

.require(new ExecutorResourceRequest("fpga", 1, "/opt/spark/fpga-discovery.sh", "intel"))

309

.require(new TaskResourceRequest("cpus", 4.0))

310

.require(new TaskResourceRequest("fpga", 1.0)) // One FPGA per task

311

.build()

312

313

sc.addResourceProfile(fpgaProfile)

314

315

// Use FPGA for specialized computation

316

val fpgaWorkload = sc.parallelize(specializedDataset)

317

val fpgaResults = fpgaWorkload

318

.withResources(fpgaProfile)

319

.mapPartitions { iter =>

320

val context = TaskContext.get()

321

val fpgaResources = context.resources().get("fpga")

322

323

fpgaResources match {

324

case Some(fpga) =>

325

// Initialize FPGA with available device

326

val fpgaDevice = initializeFPGA(fpga.addresses.head)

327

try {

328

iter.map(processOnFPGA(_, fpgaDevice))

329

} finally {

330

fpgaDevice.close()

331

}

332

case None =>

333

// Fallback to CPU processing

334

iter.map(processByCPU)

335

}

336

}

337

.collect()

338

```

339

340

### Resource Profile Monitoring

341

```scala

342

// Get all registered resource profiles

343

val profiles = sc.getResourceProfiles()

344

profiles.foreach { case (id, profile) =>

345

println(s"Profile ID: $id")

346

println(s"Executor Resources: ${profile.executorResources}")

347

println(s"Task Resources: ${profile.taskResources}")

348

println("---")

349

}

350

351

// Monitor RDD resource associations

352

val data = sc.parallelize(1 to 100)

353

val profiledRDD = data.withResources(heavyProfile)

354

355

println(s"RDD resource profile ID: ${profiledRDD.getResourceProfile().id}")

356

println(s"Default profile ID: ${ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID}")

357

```

358

359

## Best Practices

360

361

### Resource Profile Design

362

1. **Match workload characteristics**: Choose resources based on computation type

363

2. **Consider task parallelism**: Balance task resources with executor resources

364

3. **Plan for heterogeneity**: Use different profiles for different processing stages

365

4. **Monitor utilization**: Track resource usage to optimize allocations

366

5. **Test configurations**: Benchmark different resource combinations

367

368

### Dynamic Allocation

369

1. **Set appropriate bounds**: Configure min/max executors based on workload

370

2. **Tune timeouts**: Adjust idle and backlog timeouts for responsiveness

371

3. **Monitor scaling**: Watch for oscillation in executor counts

372

4. **Consider costs**: Balance performance with resource costs

373

5. **Plan for failures**: Ensure minimum executors for fault tolerance

374

375

### GPU/FPGA Usage

376

1. **Efficient utilization**: Ensure full utilization of accelerator resources

377

2. **Memory management**: Consider GPU/FPGA memory limitations

378

3. **Fallback strategies**: Provide CPU fallbacks for resource unavailability

379

4. **Driver compatibility**: Ensure proper driver installation and discovery

380

5. **Task granularity**: Size tasks appropriately for accelerator efficiency