or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-deployment.mdcluster-management.mdcommand-building.mdconfiguration-system.mdextension-points.mdindex.mdresource-management.mdsecurity-integration.mdyarn-shuffle-service.md

resource-management.mddocs/

0

# Resource Management

1

2

Container allocation and executor lifecycle management within YARN resource constraints and scheduling policies. This module handles the complex coordination between Spark's resource needs and YARN's resource management system.

3

4

## Capabilities

5

6

### YarnAllocator

7

8

Core resource allocator that manages container requests, allocation tracking, and executor lifecycle within YARN clusters.

9

10

```scala { .api }

11

class YarnAllocator(

12

driverUrl: String,

13

driverRef: RpcEndpointRef,

14

conf: YarnConfiguration,

15

sparkConf: SparkConf,

16

amClient: AMRMClient[ContainerRequest],

17

appAttemptId: ApplicationAttemptId,

18

securityMgr: SecurityManager,

19

localResources: Map[String, LocalResource],

20

resolver: SparkRackResolver,

21

clock: Clock = new SystemClock()

22

) {

23

def getNumExecutorsRunning: Int

24

def getNumExecutorsFailed: Int

25

def numContainersPendingAllocate: Int

26

def allocateResources(): Unit

27

def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit

28

def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit

29

def updateResourceRequests(): Unit

30

def killExecutor(executorId: String): Unit

31

def stop(): Unit

32

}

33

```

34

35

**Constructor Parameters:**

36

- `driverUrl`: RPC URL for driver communication

37

- `driverRef`: RPC endpoint reference for driver

38

- `conf`: YARN configuration from cluster

39

- `sparkConf`: Spark configuration with resource settings

40

- `amClient`: YARN ApplicationMaster ResourceManager client

41

- `appAttemptId`: YARN application attempt identifier

42

- `securityMgr`: Spark security manager instance

43

- `localResources`: Staged resources for executor containers

44

- `resolver`: Rack resolver for locality awareness

45

- `clock`: Clock instance for time-based operations (defaults to SystemClock)

46

47

**Status Query Methods:**

48

49

**`getNumExecutorsRunning: Int`**

50

- Returns count of currently running executors

51

- Includes executors in RUNNING state only

52

53

**`getNumExecutorsFailed: Int`**

54

- Returns count of failed executor attempts

55

- Includes permanent failures and exceeded retry attempts

56

57

**`numContainersPendingAllocate: Int`**

58

- Returns count of outstanding container requests

59

- Requests submitted but not yet allocated by YARN

60

61

**Resource Management Methods:**

62

63

**`allocateResources(): Unit`**

64

- Requests containers from YARN ResourceManager based on current needs

65

- Considers executor targets, pending requests, and failed containers

66

- Updates container allocation state

67

68

**`handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit`**

69

- Processes newly allocated containers from YARN

70

- Launches executor processes in allocated containers

71

- Updates tracking state for launched executors

72

73

**`processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit`**

74

- Handles container completion events from YARN

75

- Distinguishes between normal completion and failures

76

- Triggers replacement requests for failed executors

77

78

**`updateResourceRequests(): Unit`**

79

- Updates container resource requests with YARN ResourceManager

80

- Adjusts requested resources based on current executor targets

81

- Called periodically to sync allocation state

82

83

**`killExecutor(executorId: String): Unit`**

84

- Kills a specific executor by terminating its container

85

- Updates internal tracking state

86

- Used for explicit executor removal

87

88

**`stop(): Unit`**

89

- Stops the allocator and releases all resources

90

- Cancels pending container requests

91

- Performs cleanup of internal state

92

93

**Usage Example:**

94

95

```scala

96

import org.apache.spark.deploy.yarn.YarnAllocator

97

98

// YarnAllocator is typically used internally by ApplicationMaster

99

// Status can be monitored through scheduler backends

100

101

val backend = sc.schedulerBackend.asInstanceOf[YarnSchedulerBackend]

102

// Internal allocator state is managed automatically

103

104

// Monitor resource status through SparkContext

105

val statusTracker = sc.statusTracker

106

val execInfo = statusTracker.getExecutorInfos

107

108

execInfo.foreach { exec =>

109

println(s"Executor ${exec.executorId}: ${exec.totalCores} cores, ${exec.maxMemory} memory")

110

}

111

```

112

113

### ExecutorRunnable

114

115

Manages executor container launch and configuration within YARN containers.

116

117

```scala { .api }

118

class ExecutorRunnable(

119

container: Container,

120

conf: SparkConf,

121

spConf: SparkConf,

122

masterAddress: String,

123

executorId: String,

124

hostname: String,

125

executorMemory: Int,

126

executorCores: Int,

127

appId: String,

128

securityMgr: SecurityManager,

129

localResources: Map[String, LocalResource]

130

) {

131

def run(): Unit

132

}

133

```

134

135

**Responsibilities:**

136

- Container launch context creation

137

- Environment variable setup

138

- Java options and classpath configuration

139

- Security context establishment

140

- Executor process startup

141

142

**Container Launch Process:**

143

1. Prepares container environment (PATH, JAVA_HOME, etc.)

144

2. Constructs executor command line with JVM options

145

3. Sets up security credentials and tokens

146

4. Configures local resources and file permissions

147

5. Submits container launch request to NodeManager

148

149

### YarnRMClient

150

151

Handles ResourceManager registration and communication for the ApplicationMaster.

152

153

```scala { .api }

154

class YarnRMClient(amClient: AMRMClient[ContainerRequest]) {

155

def register(

156

driverHost: String,

157

driverPort: Int,

158

conf: YarnConfiguration,

159

sparkConf: SparkConf,

160

uiAddress: Option[String],

161

uiHistoryAddress: String

162

): Unit

163

def createAllocator(

164

conf: YarnConfiguration,

165

sparkConf: SparkConf,

166

driverUrl: String,

167

driverRef: RpcEndpointRef,

168

securityMgr: SecurityManager,

169

localResources: Map[String, LocalResource]

170

): YarnAllocator

171

def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit

172

def getAttemptId(): ApplicationAttemptId

173

def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int

174

}

175

```

176

177

**ResourceManager Integration:**

178

179

**`register(...): Unit`**

180

- Registers ApplicationMaster with YARN ResourceManager

181

- Provides driver connection details and web UI URLs

182

- Establishes communication channel for resource requests

183

184

**`createAllocator(...): YarnAllocator`**

185

- Creates and configures YarnAllocator instance

186

- Sets up container allocation and executor lifecycle management

187

- Integrates with security manager and local resources

188

189

**`unregister(status, diagnostics): Unit`**

190

- Unregisters ApplicationMaster from ResourceManager

191

- Reports final application status and diagnostic information

192

- Performs cleanup of ResourceManager communication

193

194

**`getAttemptId(): ApplicationAttemptId`**

195

- Returns current YARN application attempt identifier

196

- Used for tracking and logging purposes

197

198

**`getMaxRegAttempts(sparkConf, yarnConf): Int`**

199

- Determines maximum ApplicationMaster registration retry attempts

200

- Based on Spark and YARN configuration settings

201

202

### ExecutorRunnable

203

204

Manages executor container launch and configuration within YARN containers. This class handles the low-level details of starting Spark executors in YARN containers.

205

206

```scala { .api }

207

class ExecutorRunnable(

208

container: Container,

209

conf: Configuration,

210

sparkConf: SparkConf,

211

masterAddress: String,

212

executorId: String,

213

hostname: String,

214

executorMemory: Int,

215

executorCores: Int,

216

appId: String,

217

securityMgr: SecurityManager,

218

localResources: Map[String, LocalResource]

219

) {

220

def run(): Unit

221

def launchContextDebugInfo(): String

222

def startContainer(): java.util.Map[String, ByteBuffer]

223

}

224

```

225

226

**Container Launch Management:**

227

228

**`run(): Unit`**

229

- Main execution method that launches executor in YARN container

230

- Sets up container environment and security context

231

- Submits container launch request to NodeManager

232

- Handles launch failures and error reporting

233

234

**`launchContextDebugInfo(): String`**

235

- Returns detailed debug information about container launch context

236

- Includes environment variables, command line, and resource information

237

- Used for troubleshooting container launch issues

238

239

**`startContainer(): java.util.Map[String, ByteBuffer]`**

240

- Initiates container startup process

241

- Returns container tokens and security information

242

- Manages authentication and authorization setup

243

244

**Container Launch Process:**

245

1. **Environment Setup**: Configures PATH, JAVA_HOME, and other environment variables

246

2. **Command Construction**: Builds executor command line with JVM options and classpath

247

3. **Security Context**: Sets up Kerberos credentials and security tokens

248

4. **Resource Configuration**: Configures local resources and file permissions

249

5. **Container Submission**: Submits launch request to NodeManager

250

6. **Monitoring**: Tracks container launch status and handles failures

251

252

**Usage Example:**

253

```scala

254

// ExecutorRunnable is typically used internally by YarnAllocator

255

// when launching new executor containers

256

257

def launchExecutorContainer(

258

container: Container,

259

executorId: String): Unit = {

260

261

val runnable = new ExecutorRunnable(

262

container = container,

263

conf = yarnConf,

264

sparkConf = sparkConf,

265

masterAddress = driverUrl,

266

executorId = executorId,

267

hostname = container.getNodeId.getHost,

268

executorMemory = executorMemoryMB,

269

executorCores = executorCores,

270

appId = appId,

271

securityMgr = securityManager,

272

localResources = localResourceMap

273

)

274

275

// Launch in separate thread

276

launcherPool.execute(runnable)

277

}

278

```

279

280

## Resource Allocation Strategies

281

282

### Dynamic Allocation

283

284

Integration with Spark's dynamic allocation for elastic resource scaling.

285

286

```scala

287

// Configuration for dynamic allocation

288

val conf = new SparkConf()

289

.set("spark.dynamicAllocation.enabled", "true")

290

.set("spark.dynamicAllocation.minExecutors", "2")

291

.set("spark.dynamicAllocation.maxExecutors", "100")

292

.set("spark.dynamicAllocation.initialExecutors", "10")

293

.set("spark.dynamicAllocation.targetUtilization", "0.8")

294

```

295

296

**Scaling Behavior:**

297

- Requests additional containers when task queue grows

298

- Releases idle containers after configured timeout

299

- Respects YARN queue and cluster resource limits

300

- Coordinates with YARN fair/capacity schedulers

301

302

### Locality Preferences

303

304

Leverages YARN's rack awareness for optimal data locality.

305

306

```scala { .api }

307

class SparkRackResolver(conf: SparkConf, yarnConf: YarnConfiguration) {

308

def resolve(hostName: String): String

309

}

310

```

311

312

**Locality Levels:**

313

1. **NODE_LOCAL**: Same node as data

314

2. **RACK_LOCAL**: Same rack as data

315

3. **ANY**: Any available node

316

317

**Container Request Strategy:**

318

- Requests containers with locality preferences

319

- Falls back to lower locality levels if needed

320

- Balances locality with resource availability

321

322

### Resource Constraints

323

324

YARN-specific resource constraint handling.

325

326

**Memory Management:**

327

```scala

328

// Memory overhead calculation

329

val executorMemory = conf.get("spark.executor.memory", "1g")

330

val memoryOverhead = conf.get("spark.yarn.executor.memoryOverhead",

331

math.max((executorMemory * 0.1).toLong, 384L).toString)

332

val totalMemory = executorMemory + memoryOverhead

333

```

334

335

**CPU Allocation:**

336

```scala

337

// Core allocation with YARN constraints

338

val executorCores = conf.getInt("spark.executor.cores", 1)

339

val maxCores = yarnConf.getInt("yarn.scheduler.maximum-allocation-vcores", Int.MaxValue)

340

val requestCores = math.min(executorCores, maxCores)

341

```

342

343

## Container Lifecycle Management

344

345

### Container States

346

347

**Container Lifecycle:**

348

1. **REQUESTED**: Container request submitted to YARN

349

2. **ALLOCATED**: Container allocated by ResourceManager

350

3. **LAUNCHING**: Container being launched on NodeManager

351

4. **RUNNING**: Executor process running successfully

352

5. **COMPLETED**: Container finished (success or failure)

353

354

### Failure Handling

355

356

**Failure Categories:**

357

- **Preemption**: Container killed by YARN for resource rebalancing

358

- **Node Failure**: NodeManager or hardware failure

359

- **Application Error**: Executor process crashed or failed

360

- **Resource Exhaustion**: Out of memory or disk space

361

362

**Recovery Strategies:**

363

```scala

364

// Retry configuration

365

val maxExecutorFailures = conf.getInt("spark.yarn.max.executor.failures",

366

math.max(2 * numExecutors, 3))

367

368

// Blacklist configuration

369

val nodeBlacklistEnabled = conf.getBoolean("spark.blacklist.enabled", false)

370

val maxNodeBlacklist = conf.getInt("spark.blacklist.application.maxFailedTasksPerNode", 2)

371

```

372

373

### Container Monitoring

374

375

**Health Monitoring:**

376

- Container resource usage tracking

377

- Executor heartbeat monitoring

378

- Node failure detection

379

- Container exit code analysis

380

381

**Metrics Integration:**

382

```scala

383

// Monitoring through ApplicationMaster metrics

384

val source = new ApplicationMasterSource()

385

source.registerGauge("numExecutorsRunning", () => numExecutorsRunning)

386

source.registerGauge("numExecutorsFailed", () => numExecutorsFailed)

387

source.registerGauge("numPendingContainers", () => numPendingAllocate)

388

```

389

390

### Cleanup and Termination

391

392

**Graceful Shutdown:**

393

1. Stop accepting new container requests

394

2. Complete running tasks where possible

395

3. Release allocated but unused containers

396

4. Clean up staged resources

397

5. Unregister from ResourceManager

398

399

**Emergency Shutdown:**

400

1. Immediately kill all executor containers

401

2. Force cleanup of staged resources

402

3. Report failure status to ResourceManager

403

404

## Configuration Integration

405

406

### Memory Configuration

407

408

```scala

409

// Executor memory settings

410

"spark.executor.memory" -> "4g" // Heap memory

411

"spark.yarn.executor.memoryOverhead" -> "1g" // Off-heap overhead

412

"spark.executor.memoryFraction" -> "0.8" // Storage/execution split

413

414

// ApplicationMaster memory

415

"spark.yarn.am.memory" -> "2g" // AM heap memory

416

"spark.yarn.am.memoryOverhead" -> "384m" // AM overhead

417

```

418

419

### CPU Configuration

420

421

```scala

422

// Executor CPU settings

423

"spark.executor.cores" -> "4" // Cores per executor

424

"spark.executor.instances" -> "10" // Static executor count

425

426

// ApplicationMaster CPU

427

"spark.yarn.am.cores" -> "2" // AM CPU cores

428

```

429

430

### Resource Queue Configuration

431

432

```scala

433

// YARN queue and scheduling

434

"spark.yarn.queue" -> "production" // YARN queue name

435

"spark.yarn.priority" -> "1" // Application priority

436

"spark.yarn.maxAppAttempts" -> "3" // Max application attempts

437

```

438

439

## Error Handling and Diagnostics

440

441

### Common Resource Errors

442

443

**Insufficient Resources:**

444

```scala

445

throw new YarnException("Could not allocate container within timeout")

446

throw new IllegalStateException("Requested memory exceeds queue maximum")

447

```

448

449

**Container Launch Failures:**

450

```scala

451

throw new IOException("Failed to launch container on node")

452

throw new SecurityException("Container launch denied due to security policy")

453

```

454

455

**Resource Constraint Violations:**

456

```scala

457

throw new IllegalArgumentException("Executor memory must be at least 384MB")

458

throw new SparkException("Total requested cores exceed cluster capacity")

459

```

460

461

### Diagnostic Information

462

463

**Resource Status Reporting:**

464

```scala

465

def getResourceStatus: String = {

466

s"""

467

|Executors Running: $getNumExecutorsRunning

468

|Executors Failed: $getNumExecutorsFailed

469

|Containers Pending: $getNumPendingAllocate

470

|Total Containers: $getNumContainersRunning

471

""".stripMargin

472

}

473

```

474

475

**Container Exit Analysis:**

476

```scala

477

def analyzeContainerExit(status: ContainerStatus): String = {

478

status.getExitStatus match {

479

case 0 => "Container completed successfully"

480

case -100 => "Container killed by YARN (preemption)"

481

case -101 => "Container killed due to exceeding memory limits"

482

case other => s"Container failed with exit code: $other"

483

}

484

}

485

```

486

487

## Performance Optimization

488

489

### Resource Tuning Guidelines

490

491

**Memory Optimization:**

492

- Set executor memory to 80-90% of container memory

493

- Reserve sufficient overhead for off-heap operations

494

- Consider GC tuning for large heap sizes

495

496

**CPU Optimization:**

497

- Balance executor cores with parallelism needs

498

- Avoid over-subscribing CPU resources

499

- Consider NUMA topology on large nodes

500

501

**Container Sizing:**

502

- Larger containers reduce scheduling overhead

503

- Smaller containers provide better resource utilization

504

- Balance based on workload characteristics

505

506

### Locality Optimization

507

508

**Data Locality:**

509

- Co-locate executors with data when possible

510

- Use rack-aware placement for distributed data

511

- Consider storage system topology

512

513

**Network Locality:**

514

- Minimize cross-rack network traffic

515

- Optimize for cluster network topology

516

- Consider bandwidth constraints