or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-master.mdindex.mdresource-management.mdscheduler-backends.mdutilities.mdyarn-client.md

resource-management.mddocs/

0

# Resource Management

1

2

Resource allocation and management components for negotiating and monitoring YARN cluster resources for Spark executors. These components handle the complex interactions with YARN's ResourceManager to obtain, monitor, and release container resources.

3

4

## Capabilities

5

6

### YarnRMClient Interface

7

8

Core interface for ResourceManager client operations, providing abstraction over YARN ResourceManager interactions.

9

10

```scala { .api }

11

/**

12

* Trait defining ResourceManager client interface

13

* Provides abstraction for YARN ResourceManager communication

14

*/

15

trait YarnRMClient {

16

// ResourceManager connection and authentication

17

// Container resource requests and releases

18

// Application status monitoring and reporting

19

// Resource allocation callbacks and event handling

20

}

21

```

22

23

**Usage Examples:**

24

25

```scala

26

import org.apache.spark.deploy.yarn.YarnRMClient

27

28

// YarnRMClient implementations are version-specific

29

// Created internally by ApplicationMaster and Client components

30

val rmClient: YarnRMClient = // Implementation instance

31

// Used for ResourceManager communication throughout application lifecycle

32

```

33

34

### YarnRMClientImpl

35

36

Version-specific implementations of the YarnRMClient trait for different Hadoop YARN API versions.

37

38

```scala { .api }

39

/**

40

* Implementation of YarnRMClient for specific YARN API versions

41

* Available in both alpha (deprecated) and stable modules

42

*/

43

class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient {

44

/**

45

* Register ApplicationMaster with ResourceManager

46

* @param host ApplicationMaster host

47

* @param port ApplicationMaster port

48

* @param trackingUrl Application tracking URL

49

* @return Registration response from ResourceManager

50

*/

51

def register(host: String, port: Int, trackingUrl: String): RegisterApplicationMasterResponse

52

53

/**

54

* Allocate resources from ResourceManager

55

* @param progressIndicator Application progress (0.0 to 1.0)

56

* @return Allocation response with assigned containers

57

*/

58

def allocate(progressIndicator: Float): AllocateResponse

59

60

/**

61

* Unregister ApplicationMaster from ResourceManager

62

* @param status Final application status

63

* @param appMessage Final message to ResourceManager

64

* @param trackingUrl Final tracking URL

65

*/

66

def unregister(status: FinalApplicationStatus, appMessage: String, trackingUrl: String): Unit

67

}

68

```

69

70

### YarnAllocator

71

72

Abstract base class providing core resource allocation logic for YARN containers.

73

74

```scala { .api }

75

/**

76

* Abstract base class for YARN resource allocation logic

77

* Manages executor container requests and lifecycle

78

*/

79

private[yarn] abstract class YarnAllocator(

80

conf: Configuration,

81

sparkConf: SparkConf,

82

appAttemptId: ApplicationAttemptId,

83

args: ApplicationMasterArguments,

84

preferredNodes: collection.Map[String, collection.Set[SplitInfo]],

85

securityMgr: SecurityManager

86

) extends Logging {

87

88

/** Abstract method to allocate containers from ResourceManager */

89

protected def allocateContainers(resourceRequests: JList[ResourceRequest]): Unit

90

91

/** Abstract method to release a specific container */

92

protected def releaseContainer(container: Container): Unit

93

94

/** Request total number of executors from cluster */

95

def requestTotalExecutors(requestedTotal: Int): Unit

96

97

/** Kill a specific executor by ID */

98

def killExecutor(executorId: String): Unit

99

100

/** Allocate resources and handle responses from ResourceManager */

101

def allocateResources(): Unit

102

103

/** Update resource requests for executor containers */

104

def updateResourceRequests(): Unit

105

}

106

107

/**

108

* Companion object with constants and utilities

109

*/

110

object YarnAllocator {

111

// Internal constants for resource allocation

112

}

113

```

114

115

**Usage Examples:**

116

117

```scala

118

import org.apache.spark.deploy.yarn.YarnAllocator

119

120

// YarnAllocator is extended by version-specific implementations

121

// Handles the core logic for:

122

// - Requesting executor containers from ResourceManager

123

// - Launching executor processes in allocated containers

124

// - Monitoring executor health and handling failures

125

// - Releasing containers when executors complete

126

```

127

128

### AllocationType Enumeration

129

130

Enumeration defining allocation strategies for YARN resource requests.

131

132

```scala { .api }

133

/**

134

* Enumeration for YARN allocation types

135

* Defines resource allocation strategies for container placement

136

*/

137

object AllocationType extends Enumeration {

138

type AllocationType = Value

139

140

/** Host-specific allocation - request container on specific host */

141

val HOST = Value

142

143

/** Rack-specific allocation - request container within specific rack */

144

val RACK = Value

145

146

/** Any allocation - allow ResourceManager to place container anywhere */

147

val ANY = Value

148

}

149

```

150

151

### YarnAllocationHandler

152

153

Version-specific resource allocation implementations that extend the base YarnAllocator functionality.

154

155

```scala { .api }

156

/**

157

* Version-specific resource allocation implementation

158

* Available in both alpha and stable API versions

159

*/

160

private[yarn] class YarnAllocationHandler extends YarnAllocator {

161

// Concrete implementation of resource allocation logic

162

// YARN API version-specific container operations

163

// Executor container launch and monitoring

164

// Resource optimization and failure recovery

165

}

166

```

167

168

### AllocationType Enumeration

169

170

Enumeration defining different types of YARN resource allocations.

171

172

```scala { .api }

173

/**

174

* Enumeration for YARN allocation types

175

* Defines different categories of resource requests

176

*/

177

object AllocationType extends Enumeration {

178

// Different allocation type values for resource categorization

179

// Used for tracking and managing different types of resource requests

180

}

181

```

182

183

**Usage Examples:**

184

185

```scala

186

import org.apache.spark.deploy.yarn.AllocationType

187

188

// Used internally for categorizing resource allocation requests

189

val allocationType = AllocationType.someValue

190

// Helps track and manage different types of container requests

191

```

192

193

## Resource Allocation Workflow

194

195

### Container Request Process

196

197

```scala

198

// Typical resource allocation workflow

199

abstract class YarnAllocator {

200

// 1. Calculate resource requirements based on Spark configuration

201

protected def calculateResourceRequirements(): ContainerRequest

202

203

// 2. Submit resource requests to ResourceManager

204

protected def requestContainers(requests: List[ContainerRequest]): Unit

205

206

// 3. Handle container allocation responses from ResourceManager

207

protected def onContainersAllocated(containers: List[Container]): Unit

208

209

// 4. Launch executor processes in allocated containers

210

protected def launchExecutors(containers: List[Container]): Unit

211

212

// 5. Monitor executor health and handle failures

213

protected def monitorExecutors(): Unit

214

215

// 6. Release containers when executors complete or fail

216

protected def releaseContainers(containers: List[Container]): Unit

217

}

218

```

219

220

### Resource Requirements Calculation

221

222

```scala

223

// Resource calculation based on Spark configuration

224

class YarnAllocator {

225

private def buildContainerRequest(): ContainerRequest = {

226

val executorMemory = sparkConf.get("spark.executor.memory", "1024m")

227

val executorCores = sparkConf.getInt("spark.executor.cores", 1)

228

val priority = sparkConf.getInt("spark.yarn.priority", 0)

229

230

// Build YARN ContainerRequest with calculated resources

231

// Include locality preferences and resource constraints

232

// Set appropriate priority and capability requirements

233

}

234

}

235

```

236

237

## Container Lifecycle Management

238

239

### Executor Container Launch

240

241

```scala

242

// Container launch process for executor deployment

243

class YarnAllocationHandler {

244

private def launchExecutorContainer(container: Container): Unit = {

245

// 1. Prepare executor launch context

246

val launchContext = createExecutorLaunchContext()

247

248

// 2. Set up environment variables and classpath

249

setupExecutorEnvironment(launchContext)

250

251

// 3. Configure executor JVM parameters

252

configureExecutorJVM(launchContext)

253

254

// 4. Launch container with NodeManager

255

nodeManager.startContainer(container, launchContext)

256

257

// 5. Track container for monitoring

258

trackExecutorContainer(container.getId)

259

}

260

}

261

```

262

263

### Container Monitoring

264

265

```scala

266

// Continuous monitoring of executor containers

267

abstract class YarnAllocator {

268

private def monitorContainerHealth(): Unit = {

269

// Monitor executor heartbeats and health status

270

// Detect container failures and exits

271

// Handle node failures and blacklisting

272

// Report container status to ApplicationMaster

273

}

274

}

275

```

276

277

## Resource Optimization Strategies

278

279

### Locality Preferences

280

281

```scala

282

// Optimizing container allocation for data locality

283

class YarnAllocator {

284

private def buildLocalityRequest(

285

preferredNodes: List[String],

286

preferredRacks: List[String]

287

): ContainerRequest = {

288

// Request containers on preferred nodes (highest priority)

289

// Fall back to preferred racks (medium priority)

290

// Accept any available nodes (lowest priority)

291

// Balance locality with resource availability

292

}

293

}

294

```

295

296

### Dynamic Resource Adjustment

297

298

```scala

299

// Dynamic executor allocation integration

300

abstract class YarnAllocator {

301

def requestAdditionalExecutors(numExecutors: Int): Unit = {

302

// Support for Spark's dynamic allocation feature

303

// Scale executor count based on workload demands

304

// Integrate with cluster resource availability

305

}

306

307

def removeExecutors(executorIds: Set[String]): Unit = {

308

// Gracefully shutdown and release executor containers

309

// Ensure running tasks complete before container release

310

// Update resource tracking and allocation state

311

}

312

}

313

```

314

315

## Configuration Integration

316

317

### Resource Configuration

318

319

```scala

320

// Key configuration properties affecting resource management

321

spark.executor.memory=2g // Executor container memory

322

spark.executor.cores=2 // Executor CPU cores

323

spark.executor.memoryFraction=0.8 // Memory allocation within container

324

spark.executor.memoryOffHeap.enabled=true // Off-heap memory usage

325

326

// YARN-specific resource configuration

327

spark.yarn.executor.memoryOverhead=384 // Additional memory overhead

328

spark.yarn.executor.nodeLabelExpression // Node label constraints

329

spark.yarn.priority=0 // Application priority

330

spark.yarn.queue=production // YARN queue assignment

331

```

332

333

### Container Configuration

334

335

```scala

336

// Container-level configuration options

337

spark.yarn.containerLauncherMaxThreads=25 // Parallel container launch limit

338

spark.yarn.executor.failuresValidityInterval=1h // Failure tracking window

339

spark.yarn.max.executor.failures=3 // Max executor failures per node

340

spark.yarn.nodemanager.resource.memory-mb // NodeManager memory limits

341

```

342

343

## Error Handling and Recovery

344

345

### Container Failure Recovery

346

347

```scala

348

// Robust error handling for container failures

349

class YarnAllocator {

350

private def handleContainerFailure(containerId: ContainerId, exitStatus: Int): Unit = {

351

// Log container failure details

352

// Determine if failure should trigger node blacklisting

353

// Request replacement container if within failure limits

354

// Report failure to Spark scheduler for task rescheduling

355

}

356

357

private def blacklistNode(nodeId: String, reason: String): Unit = {

358

// Add node to blacklist to avoid future allocations

359

// Implement exponential backoff for blacklist duration

360

// Provide mechanisms for node rehabilitation

361

}

362

}

363

```

364

365

### ResourceManager Connection Handling

366

367

```scala

368

// Handling ResourceManager failures and reconnection

369

trait YarnRMClient {

370

private def handleRMFailure(): Unit = {

371

// Detect ResourceManager connection loss

372

// Implement reconnection with exponential backoff

373

// Recover application state after reconnection

374

// Resubmit pending resource requests

375

}

376

}

377

```

378

379

## Performance Monitoring

380

381

### Resource Utilization Metrics

382

383

```scala

384

// Integration with Spark metrics system

385

abstract class YarnAllocator {

386

// Track container allocation success rates

387

private val containerAllocationRate = metrics.meter("containerAllocationRate")

388

389

// Monitor resource request fulfillment times

390

private val resourceRequestLatency = metrics.histogram("resourceRequestLatency")

391

392

// Track executor launch success and failure rates

393

private val executorLaunchSuccess = metrics.counter("executorLaunchSuccess")

394

private val executorLaunchFailure = metrics.counter("executorLaunchFailure")

395

}

396

```

397

398

### Allocation Efficiency

399

400

The resource management components provide detailed metrics for:

401

402

- Container allocation latency and success rates

403

- Resource utilization efficiency across cluster nodes

404

- Executor failure patterns and recovery times

405

- Queue wait times and priority effectiveness

406

- Node blacklisting and rehabilitation statistics

407

408

These metrics enable optimization of resource allocation strategies and troubleshooting of cluster resource issues.