or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-management.mdconfiguration.mdindex.mdresource-management.mdscheduler-integration.md

resource-management.mddocs/

0

# Resource Management

1

2

Components responsible for allocating and managing YARN containers for Spark executors. These classes handle resource allocation strategies, placement policies, container management, and communication with YARN ResourceManager and NodeManagers.

3

4

## Capabilities

5

6

### YarnAllocator

7

8

Core component that manages resource allocation and executor lifecycle on YARN. Handles container requests, allocation monitoring, and executor management.

9

10

```scala { .api }

11

/**

12

* Manages resource allocation and executor lifecycle on YARN

13

* Handles container requests, allocation, and executor management

14

*/

15

class YarnAllocator {

16

17

/** Allocate containers from YARN ResourceManager */

18

def allocateResources(): Unit

19

20

/**

21

* Request executors with locality preferences

22

* @return true if request was successful

23

*/

24

def requestTotalExecutorsWithPreferredLocalities(): Boolean

25

26

/**

27

* Kill specific executor

28

* @param executorId ID of executor to kill

29

*/

30

def killExecutor(executorId: String): Unit

31

32

/** Get number of failed executors */

33

def getNumExecutorsFailed: Int

34

35

/** Stop allocator and cleanup resources */

36

def stop(): Unit

37

}

38

```

39

40

**Usage Pattern:**

41

42

```scala

43

// YarnAllocator is typically used internally by scheduler backends

44

// but understanding its role is important for configuration

45

46

import org.apache.spark.SparkConf

47

48

val conf = new SparkConf()

49

.setMaster("yarn")

50

.setAppName("ResourceAllocationExample")

51

// Configure executor resources

52

.set("spark.executor.instances", "10")

53

.set("spark.executor.cores", "2")

54

.set("spark.executor.memory", "4g")

55

.set("spark.executor.memoryOverhead", "512m")

56

// Configure allocation behavior

57

.set("spark.yarn.executor.failuresValidityInterval", "1h")

58

.set("spark.yarn.max.executor.failures", "3")

59

60

// YarnAllocator will use these settings for resource allocation

61

```

62

63

### YarnRMClient

64

65

Client for communicating with YARN ResourceManager. Handles ApplicationMaster registration and resource allocator creation.

66

67

```scala { .api }

68

/**

69

* Client for communicating with YARN ResourceManager

70

* Handles AM registration and resource allocation setup

71

*/

72

class YarnRMClient {

73

74

/** Register ApplicationMaster with ResourceManager */

75

def register(): Unit

76

77

/** Create resource allocator instance */

78

def createAllocator(): YarnAllocator

79

80

/**

81

* Unregister from ResourceManager

82

* @param status Final application status

83

* @param diagnostics Diagnostic message

84

*/

85

def unregister(status: FinalApplicationStatus, diagnostics: String): Unit

86

}

87

```

88

89

### ResourceRequestHelper

90

91

Helper object for creating YARN resource requests with proper resource profiles and constraints.

92

93

```scala { .api }

94

/**

95

* Helper for creating YARN resource requests

96

* Handles resource profiles, constraints, and locality preferences

97

*/

98

object ResourceRequestHelper {

99

100

/**

101

* Create resource request for executors

102

* @param resource Resource requirements

103

* @param nodes Preferred node list

104

* @param racks Preferred rack list

105

* @param numContainers Number of containers requested

106

* @return YARN ResourceRequest

107

*/

108

def createResourceRequest(

109

resource: Resource,

110

nodes: List[String],

111

racks: List[String],

112

numContainers: Int

113

): ResourceRequest

114

115

/**

116

* Build resource profile for container

117

* @param cores Number of CPU cores

118

* @param memory Memory in MB

119

* @param customResources Additional resource requirements

120

* @return YARN resource profile

121

*/

122

def buildResourceProfile(

123

cores: Int,

124

memory: Int,

125

customResources: Map[String, Long] = Map.empty

126

): Resource

127

}

128

```

129

130

**Usage Example:**

131

132

```scala

133

import org.apache.spark.deploy.yarn.ResourceRequestHelper

134

import org.apache.hadoop.yarn.api.records.Resource

135

136

// Create resource profile for executors

137

val executorResource = ResourceRequestHelper.buildResourceProfile(

138

cores = 2,

139

memory = 4096, // 4GB in MB

140

customResources = Map(

141

"yarn.io/gpu" -> 1L, // Request 1 GPU

142

"example.com/fpga" -> 2L // Request 2 FPGAs

143

)

144

)

145

146

// Create resource request with locality preferences

147

val resourceRequest = ResourceRequestHelper.createResourceRequest(

148

resource = executorResource,

149

nodes = List("worker1.example.com", "worker2.example.com"),

150

racks = List("/rack1", "/rack2"),

151

numContainers = 5

152

)

153

```

154

155

### Container Placement Strategies

156

157

#### LocalityPreferredContainerPlacementStrategy

158

159

Container placement strategy that optimizes for data locality and resource utilization.

160

161

```scala { .api }

162

/**

163

* Container placement strategy with locality preferences

164

* Optimizes container placement based on data locality and cluster topology

165

*/

166

class LocalityPreferredContainerPlacementStrategy {

167

168

/**

169

* Determine optimal container placement

170

* @param availableNodes Available cluster nodes

171

* @param requestedContainers Number of containers to place

172

* @param localityPreferences Data locality preferences

173

* @return Placement recommendations

174

*/

175

def getContainerPlacement(

176

availableNodes: Seq[String],

177

requestedContainers: Int,

178

localityPreferences: Map[String, Seq[String]]

179

): Map[String, Int]

180

}

181

```

182

183

### YarnAllocatorNodeHealthTracker

184

185

Tracks node health and executor failure patterns to optimize resource allocation decisions.

186

187

```scala { .api }

188

/**

189

* Tracks node health and executor failure patterns

190

* Used by YarnAllocator for intelligent resource allocation

191

*/

192

class YarnAllocatorNodeHealthTracker {

193

194

/**

195

* Record executor failure on a node

196

* @param nodeId Node identifier

197

* @param executorId Failed executor ID

198

*/

199

def recordFailure(nodeId: String, executorId: String): Unit

200

201

/**

202

* Check if node should be avoided for new allocations

203

* @param nodeId Node to check

204

* @return true if node should be avoided

205

*/

206

def shouldAvoidNode(nodeId: String): Boolean

207

208

/**

209

* Get healthy nodes for allocation

210

* @param candidateNodes Candidate nodes to filter

211

* @return Filtered list of healthy nodes

212

*/

213

def getHealthyNodes(candidateNodes: Seq[String]): Seq[String]

214

}

215

```

216

217

## Advanced Resource Management

218

219

### Dynamic Allocation Integration

220

221

```scala

222

import org.apache.spark.SparkConf

223

224

// Configure dynamic allocation with YARN

225

val conf = new SparkConf()

226

.setMaster("yarn")

227

.setAppName("DynamicAllocationExample")

228

// Enable dynamic allocation

229

.set("spark.dynamicAllocation.enabled", "true")

230

.set("spark.dynamicAllocation.minExecutors", "2")

231

.set("spark.dynamicAllocation.maxExecutors", "50")

232

.set("spark.dynamicAllocation.initialExecutors", "10")

233

// Configure allocation behavior

234

.set("spark.dynamicAllocation.executorIdleTimeout", "60s")

235

.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "300s")

236

.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")

237

.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "5s")

238

// Configure with external shuffle service

239

.set("spark.shuffle.service.enabled", "true")

240

.set("spark.yarn.shuffle.stopOnFailure", "false")

241

```

242

243

### Custom Resource Types

244

245

```scala

246

import org.apache.spark.SparkConf

247

248

// Configure custom resource types (GPUs, FPGAs, etc.)

249

val conf = new SparkConf()

250

.setMaster("yarn")

251

.setAppName("CustomResourceExample")

252

// Configure GPU resources

253

.set("spark.executor.resource.gpu.amount", "1")

254

.set("spark.executor.resource.gpu.discoveryScript", "/opt/spark/scripts/gpu_discovery.sh")

255

.set("spark.executor.resource.gpu.vendor", "nvidia.com")

256

// Configure custom FPGA resources

257

.set("spark.executor.resource.fpga.amount", "2")

258

.set("spark.executor.resource.fpga.discoveryScript", "/opt/spark/scripts/fpga_discovery.sh")

259

.set("spark.executor.resource.fpga.vendor", "xilinx.com")

260

// Task-level resource requests

261

.set("spark.task.resource.gpu.amount", "0.5") // Tasks can share GPUs

262

.set("spark.task.resource.fpga.amount", "1") // Tasks use dedicated FPGAs

263

```

264

265

### Resource Allocation Patterns

266

267

#### Static Allocation

268

269

```scala

270

import org.apache.spark.{SparkConf, SparkContext}

271

272

// Static resource allocation - fixed number of executors

273

val conf = new SparkConf()

274

.setMaster("yarn")

275

.setAppName("StaticAllocationExample")

276

.set("spark.dynamicAllocation.enabled", "false")

277

.set("spark.executor.instances", "20")

278

.set("spark.executor.cores", "4")

279

.set("spark.executor.memory", "8g")

280

.set("spark.executor.memoryOverhead", "1g")

281

282

val sc = new SparkContext(conf)

283

```

284

285

#### Memory Optimization

286

287

```scala

288

import org.apache.spark.SparkConf

289

290

// Optimize memory allocation for different workloads

291

val conf = new SparkConf()

292

.setMaster("yarn")

293

.setAppName("MemoryOptimizationExample")

294

// Total executor memory breakdown

295

.set("spark.executor.memory", "12g")

296

.set("spark.executor.memoryOverhead", "2g") // Additional overhead for YARN container

297

// Memory fractions for different purposes

298

.set("spark.executor.memoryFraction", "0.8") // JVM heap for RDD cache and execution

299

.set("spark.executor.storageFraction", "0.5") // Fraction of memoryFraction for RDD cache

300

// Off-heap memory (optional)

301

.set("spark.executor.memory.offHeap.enabled", "true")

302

.set("spark.executor.memory.offHeap.size", "4g")

303

```

304

305

#### Locality-Aware Allocation

306

307

```scala

308

import org.apache.spark.SparkConf

309

310

// Configure locality preferences for optimal data access

311

val conf = new SparkConf()

312

.setMaster("yarn")

313

.setAppName("LocalityAwareExample")

314

// Locality wait times

315

.set("spark.locality.wait", "3s") // Generic locality wait

316

.set("spark.locality.wait.process", "0") // Wait for process-local data

317

.set("spark.locality.wait.node", "1s") // Wait for node-local data

318

.set("spark.locality.wait.rack", "2s") // Wait for rack-local data

319

// Node labeling for placement

320

.set("spark.yarn.executor.nodeLabelExpression", "compute-intensive")

321

.set("spark.yarn.am.nodeLabelExpression", "management")

322

```

323

324

### Container Lifecycle Management

325

326

#### Preemption Handling

327

328

```scala

329

import org.apache.spark.SparkConf

330

331

// Configure preemption handling for spot instances or preemptible queues

332

val conf = new SparkConf()

333

.setMaster("yarn")

334

.setAppName("PreemptionHandlingExample")

335

// Enable preemption-aware scheduling

336

.set("spark.yarn.executor.failuresValidityInterval", "1h")

337

.set("spark.yarn.max.executor.failures", "10") // Higher tolerance for preemptions

338

.set("spark.yarn.submit.waitAppCompletion", "false") // Don't wait if using spot instances

339

// Configure checkpointing for fault tolerance

340

.set("spark.sql.streaming.checkpointLocation", "hdfs://cluster/checkpoints/myapp")

341

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Faster recovery

342

```

343

344

#### Container Resource Monitoring

345

346

```scala

347

// Monitor resource usage and allocation effectiveness

348

import org.apache.spark.SparkContext

349

350

val sc = SparkContext.getOrCreate()

351

352

// Access YARN-specific information

353

val yarnBackend = sc.schedulerBackend match {

354

case yarn: org.apache.spark.scheduler.cluster.YarnSchedulerBackend =>

355

Some(yarn)

356

case _ => None

357

}

358

359

yarnBackend.foreach { backend =>

360

println(s"Application ID: ${backend.applicationId()}")

361

println(s"Application Attempt: ${backend.applicationAttemptId()}")

362

363

// For cluster mode, get additional information

364

backend match {

365

case clusterBackend: org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend =>

366

val driverLogs = clusterBackend.getDriverLogUrls()

367

val driverAttrs = clusterBackend.getDriverAttributes()

368

println(s"Driver Container Logs: $driverLogs")

369

println(s"Driver Container Attributes: $driverAttrs")

370

case _ => // Client mode

371

}

372

}

373

```

374

375

## Error Handling and Diagnostics

376

377

### Resource Allocation Failures

378

379

```scala

380

import org.apache.spark.SparkConf

381

382

// Configure retry and failure handling for resource allocation

383

val conf = new SparkConf()

384

.setMaster("yarn")

385

.setAppName("AllocationFailureHandling")

386

// Retry configuration

387

.set("spark.yarn.maxAppAttempts", "3")

388

.set("spark.yarn.am.attemptFailuresValidityInterval", "1h")

389

.set("spark.yarn.executor.failuresValidityInterval", "1h")

390

.set("spark.yarn.max.executor.failures", "5")

391

// Resource allocation timeouts

392

.set("spark.yarn.scheduler.heartbeat.interval-ms", "3000")

393

.set("spark.yarn.scheduler.initial-allocation.interval", "200ms")

394

.set("spark.yarn.containerLauncherMaxThreads", "25")

395

// Enable detailed logging

396

.set("spark.yarn.am.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps")

397

.set("spark.yarn.executor.extraJavaOptions", "-XX:+PrintGCDetails")

398

```

399

400

### Resource Request Debugging

401

402

```scala

403

// Enable detailed logging for resource allocation debugging

404

import org.apache.spark.SparkConf

405

406

val conf = new SparkConf()

407

.setMaster("yarn")

408

.setAppName("ResourceDebuggingExample")

409

// Enable debug logging

410

.set("spark.yarn.am.extraJavaOptions",

411

"-Dlog4j.configuration=yarn-log4j.properties " +

412

"-Dyarn.app.container.log.dir=/tmp/yarn-logs " +

413

"-Dyarn.app.container.log.filesize=100MB")

414

// Resource request diagnostics

415

.set("spark.yarn.preserve.staging.files", "true") // Keep staging files for debugging

416

)set("spark.yarn.submit.file.replication", "3") // Ensure file availability

417

```