or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-master.mdindex.mdresource-management.mdscheduler-backends.mdutilities.mdyarn-client.md

utilities.mddocs/

0

# Utilities and Configuration

1

2

Utility classes and configuration management for YARN-specific operations, distributed cache management, and executor container handling. These components provide essential support functionality for YARN integration.

3

4

## Capabilities

5

6

### YarnSparkHadoopUtil

7

8

YARN-specific Hadoop utilities extending Spark's core Hadoop integration with YARN-specific functionality.

9

10

```scala { .api }

11

/**

12

* YARN-specific Hadoop utilities

13

* Extends SparkHadoopUtil with YARN-specific operations and configurations

14

*/

15

class YarnSparkHadoopUtil extends SparkHadoopUtil {

16

// YARN-specific security token handling

17

// Hadoop configuration management for YARN

18

// YARN service discovery and integration

19

// Kerberos authentication support for YARN clusters

20

}

21

22

/**

23

* Companion object with YARN-specific constants and utility methods

24

*/

25

object YarnSparkHadoopUtil {

26

/** Memory overhead factor for containers (7% of container memory) */

27

val MEMORY_OVERHEAD_FACTOR = 0.07

28

29

/** Minimum memory overhead for containers in MB */

30

val MEMORY_OVERHEAD_MIN = 384

31

32

/** Wildcard host for resource requests */

33

val ANY_HOST = "*"

34

35

/** Default number of executors when not specified */

36

val DEFAULT_NUMBER_EXECUTORS = 2

37

38

/** Resource manager request priority */

39

val RM_REQUEST_PRIORITY = 1

40

41

/** Get YarnSparkHadoopUtil singleton instance */

42

def get: YarnSparkHadoopUtil

43

}

44

```

45

46

**Usage Examples:**

47

48

```scala

49

import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil

50

51

// Access YARN-specific Hadoop utilities

52

val yarnUtil = YarnSparkHadoopUtil.get

53

54

// Use for YARN-specific operations like:

55

// - Security token management

56

// - YARN service discovery

57

// - Hadoop configuration handling specific to YARN

58

// - Kerberos integration for secure clusters

59

```

60

61

### ClientDistributedCacheManager

62

63

Manages distributed cache functionality for YARN applications, handling file and archive distribution to executor nodes.

64

65

```scala { .api }

66

/**

67

* Manages distributed cache for YARN applications

68

* Handles distribution of files, JARs, and archives to executor containers

69

*/

70

private[spark] class ClientDistributedCacheManager {

71

// Distributed cache file management

72

// Archive and JAR distribution coordination

73

// Local resource preparation for container launch

74

// Cache cleanup and resource lifecycle management

75

}

76

```

77

78

**Usage Examples:**

79

80

```scala

81

import org.apache.spark.deploy.yarn.ClientDistributedCacheManager

82

83

// Created internally by Client for managing distributed resources

84

val cacheManager = new ClientDistributedCacheManager()

85

86

// Handles:

87

// - Files specified via --files argument

88

// - Archives specified via --archives argument

89

// - Additional JARs from --addJars argument

90

// - Proper cleanup when application completes

91

```

92

93

### ExecutorRunnableUtil

94

95

Utility trait providing common functionality for executor container management across different YARN API versions.

96

97

```scala { .api }

98

/**

99

* Utility trait for executor container management

100

* Provides common executor launch and management functionality

101

*/

102

trait ExecutorRunnableUtil {

103

// Common executor container launch logic

104

// Environment variable setup for executors

105

// Classpath configuration and JAR distribution

106

// Resource allocation and container configuration

107

}

108

```

109

110

**Usage Examples:**

111

112

```scala

113

import org.apache.spark.deploy.yarn.ExecutorRunnableUtil

114

115

// Mixed into ExecutorRunnable implementations

116

class ExecutorRunnable extends ExecutorRunnableUtil {

117

// Inherits common executor management functionality

118

// Used for launching executor containers on YARN nodes

119

// Handles environment setup and resource configuration

120

}

121

```

122

123

### ExecutorRunnable

124

125

Version-specific executor container implementation classes for managing executor containers on YARN.

126

127

```scala { .api }

128

/**

129

* Version-specific executor container implementation

130

* Available in both alpha (deprecated) and stable modules

131

* Manages executor containers on YARN NodeManager

132

*/

133

class ExecutorRunnable extends ExecutorRunnableUtil {

134

// Executor container launch and lifecycle management

135

// Integration with specific YARN API versions

136

// Resource allocation and environment setup

137

// Container monitoring and cleanup

138

}

139

```

140

141

**Usage Examples:**

142

143

```scala

144

import org.apache.spark.deploy.yarn.ExecutorRunnable

145

146

// Created by YarnAllocator for each executor container

147

val executorRunnable = new ExecutorRunnable(

148

container = yarnContainer,

149

conf = sparkConf,

150

sparkJar = distributedSparkJar

151

)

152

153

// Handles complete executor container lifecycle:

154

// - Container launch on NodeManager

155

// - Executor JVM startup and configuration

156

// - Resource monitoring and cleanup

157

```

158

159

## Distributed Cache Management

160

161

### File Distribution

162

163

```scala

164

// Distributed cache handles various file types

165

class ClientDistributedCacheManager {

166

// Regular files (--files argument)

167

private def distributeFiles(files: String): Map[String, LocalResource] = {

168

// Upload files to HDFS staging directory

169

// Create LocalResource entries for YARN

170

// Configure file permissions and visibility

171

// Return resource map for container launch context

172

}

173

174

// Archive files (--archives argument)

175

private def distributeArchives(archives: String): Map[String, LocalResource] = {

176

// Handle tar.gz, zip, and other archive formats

177

// Configure automatic extraction in containers

178

// Set up proper working directory structure

179

}

180

181

// JAR files (--addJars argument)

182

private def distributeJars(jars: String): Map[String, LocalResource] = {

183

// Add JARs to executor classpath

184

// Handle both local and HDFS JAR locations

185

// Optimize JAR distribution across cluster nodes

186

}

187

}

188

```

189

190

### Resource Lifecycle

191

192

```scala

193

// Complete resource lifecycle management

194

class ClientDistributedCacheManager {

195

def setupDistributedCache(): Map[String, LocalResource] = {

196

// 1. Analyze resource requirements from arguments

197

// 2. Upload local resources to HDFS staging area

198

// 3. Create YARN LocalResource descriptors

199

// 4. Return resource map for container launch

200

}

201

202

def cleanupStagingDir(): Unit = {

203

// Clean up HDFS staging directory after application completion

204

// Remove temporary files and directories

205

// Handle cleanup failures gracefully

206

}

207

}

208

```

209

210

## Executor Container Management

211

212

### Container Launch Context

213

214

```scala

215

// ExecutorRunnable creates complete launch context

216

class ExecutorRunnable {

217

private def createContainerLaunchContext(): ContainerLaunchContext = {

218

// 1. Set up executor command line

219

val executorCommand = buildExecutorCommand()

220

221

// 2. Configure environment variables

222

val environment = setupExecutorEnvironment()

223

224

// 3. Set up local resources (JARs, files, archives)

225

val localResources = setupLocalResources()

226

227

// 4. Configure security tokens

228

val tokens = setupSecurityTokens()

229

230

// 5. Build complete launch context

231

ContainerLaunchContext.newInstance(

232

localResources, environment, executorCommand, tokens

233

)

234

}

235

}

236

```

237

238

### Executor Environment Setup

239

240

```scala

241

// Environment configuration for executor containers

242

trait ExecutorRunnableUtil {

243

protected def setupExecutorEnvironment(): Map[String, String] = {

244

Map(

245

"SPARK_HOME" -> sparkHome,

246

"CLASSPATH" -> buildExecutorClasspath(),

247

"JAVA_HOME" -> javaHome,

248

"PYTHONPATH" -> pythonPath,

249

"HADOOP_CONF_DIR" -> hadoopConfDir,

250

// YARN-specific environment variables

251

"CONTAINER_ID" -> containerId,

252

"APPLICATION_WEB_PROXY_BASE" -> webProxyBase

253

)

254

}

255

256

protected def buildExecutorClasspath(): String = {

257

// Spark JARs and dependencies

258

// User application JARs (--addJars)

259

// Hadoop and YARN libraries

260

// Custom classpaths from configuration

261

}

262

}

263

```

264

265

## Configuration Utilities

266

267

### YARN Configuration Integration

268

269

```scala

270

// YarnSparkHadoopUtil handles configuration integration

271

class YarnSparkHadoopUtil {

272

def getYarnConfiguration(sparkConf: SparkConf): Configuration = {

273

// Load base Hadoop configuration

274

val hadoopConf = new Configuration()

275

276

// Apply Spark-specific YARN overrides

277

applySparkYarnConfiguration(hadoopConf, sparkConf)

278

279

// Handle security configuration for Kerberos clusters

280

setupSecurityConfiguration(hadoopConf)

281

282

hadoopConf

283

}

284

285

private def applySparkYarnConfiguration(

286

hadoopConf: Configuration,

287

sparkConf: SparkConf

288

): Unit = {

289

// Map Spark configuration to Hadoop configuration

290

// Set YARN-specific properties

291

// Configure resource manager addresses

292

// Set up queue and priority configurations

293

}

294

}

295

```

296

297

### Security Token Management

298

299

```scala

300

// Security integration for Kerberos-enabled clusters

301

object YarnSparkHadoopUtil {

302

def obtainTokensForNamenodes(

303

paths: Set[Path],

304

conf: Configuration

305

): Map[String, Token[_]] = {

306

// Obtain delegation tokens for HDFS access

307

// Handle multiple namenode configurations

308

// Support for secure cluster authentication

309

}

310

311

def obtainTokensForHBase(conf: Configuration): Map[String, Token[_]] = {

312

// Obtain tokens for HBase integration

313

// Support for secure HBase clusters

314

}

315

316

def obtainTokensForHive(conf: Configuration): Map[String, Token[_]] = {

317

// Obtain tokens for Hive metastore access

318

// Support for secure Hive integration

319

}

320

}

321

```

322

323

## Performance Optimizations

324

325

### Resource Caching

326

327

```scala

328

// Efficient resource distribution and caching

329

class ClientDistributedCacheManager {

330

private def optimizeResourceDistribution(): Unit = {

331

// Cache frequently used files in HDFS

332

// Use symbolic links for shared resources

333

// Minimize network transfer overhead

334

// Leverage YARN's distributed cache capabilities

335

}

336

}

337

```

338

339

### Container Launch Optimization

340

341

```scala

342

// Optimized executor container launch

343

class ExecutorRunnable {

344

private def launchContainerAsync(): Future[Unit] = {

345

// Parallel container launch to reduce startup time

346

// Pre-warm executor JVMs when possible

347

// Optimize classpath and resource loading

348

// Monitor launch success and retry on failures

349

}

350

}

351

```

352

353

## Error Handling and Diagnostics

354

355

### Container Launch Failures

356

357

```scala

358

// Robust error handling for container operations

359

class ExecutorRunnable {

360

private def handleLaunchFailure(exception: Exception): Unit = {

361

// Log detailed failure information

362

// Classify failure types (resource, network, configuration)

363

// Implement retry logic for transient failures

364

// Report failures to ApplicationMaster for resource reallocation

365

}

366

}

367

```

368

369

### Distributed Cache Failures

370

371

```scala

372

// Error handling for resource distribution

373

class ClientDistributedCacheManager {

374

private def handleDistributionFailure(resource: String, error: Throwable): Unit = {

375

// Log resource distribution failures

376

// Attempt alternative distribution strategies

377

// Fail fast for critical resources

378

// Provide detailed error messages for troubleshooting

379

}

380

}

381

```

382

383

## Integration Points

384

385

### Spark Core Integration

386

387

The utilities integrate deeply with Spark core components:

388

389

- **SparkContext**: Configuration and environment setup

390

- **SparkConf**: YARN-specific configuration properties

391

- **SecurityManager**: Token management and authentication

392

- **SparkHadoopUtil**: Base Hadoop utility functionality

393

394

### YARN Service Integration

395

396

Integration with YARN cluster services:

397

398

- **ResourceManager**: Application submission and monitoring

399

- **NodeManager**: Container launch and management

400

- **Timeline Service**: Application history and metrics

401

- **Web Application Proxy**: Secure application web UI access

402

403

### Hadoop Ecosystem Integration

404

405

Support for broader Hadoop ecosystem:

406

407

- **HDFS**: Distributed file system access and security

408

- **HBase**: NoSQL database integration with token support

409

- **Hive**: Data warehouse integration and metastore access

410

- **Security**: Kerberos authentication and delegation tokens