or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-master.mdclient.mdhadoop-utils.mdindex.mdschedulers.md

hadoop-utils.mddocs/

0

# Hadoop Utilities

1

2

The YARN module provides specialized Hadoop utilities that extend Spark's base Hadoop integration with YARN-specific functionality. These utilities handle security, configuration, environment management, and various operational tasks required for running Spark on YARN clusters.

3

4

## Imports

5

6

```scala

7

import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil

8

import org.apache.spark.deploy.SparkHadoopUtil

9

import org.apache.spark.{SecurityManager, SparkConf}

10

import org.apache.hadoop.conf.Configuration

11

import org.apache.hadoop.mapred.JobConf

12

import org.apache.hadoop.security.{Credentials, UserGroupInformation}

13

import org.apache.hadoop.yarn.api.records.ApplicationAccessType

14

import scala.collection.mutable.HashMap

15

```

16

17

## Capabilities

18

19

### YarnSparkHadoopUtil Class

20

21

Main utility class that extends Spark's base Hadoop utilities with YARN-specific functionality.

22

23

```scala { .api }

24

/**

25

* YARN-specific Hadoop utilities for Spark

26

* Extends base SparkHadoopUtil with YARN functionality

27

*/

28

class YarnSparkHadoopUtil extends SparkHadoopUtil {

29

30

/**

31

* Transfer credentials between user group information objects

32

* @param source Source user group information

33

* @param dest Destination user group information

34

*/

35

override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation): Unit

36

37

/**

38

* Returns true indicating YARN mode is enabled

39

* @return Always true for YARN mode

40

*/

41

override def isYarnMode(): Boolean

42

43

/**

44

* Creates a new Hadoop configuration with YARN-specific settings

45

* @param conf Spark configuration

46

* @return YarnConfiguration instance

47

*/

48

override def newConfiguration(conf: SparkConf): Configuration

49

50

/**

51

* Adds user credentials to job configuration for secure clusters

52

* @param conf Job configuration to modify

53

*/

54

override def addCredentials(conf: JobConf): Unit

55

56

/**

57

* Gets current user's security credentials

58

* @return Current user's credentials

59

*/

60

override def getCurrentUserCredentials(): Credentials

61

62

/**

63

* Adds credentials to current user's credential store

64

* @param creds Credentials to add

65

*/

66

override def addCurrentUserCredentials(creds: Credentials): Unit

67

68

/**

69

* Adds a secret key to current user's credentials

70

* @param key Secret key name

71

* @param secret Secret value as string

72

*/

73

override def addSecretKeyToUserCredentials(key: String, secret: String): Unit

74

75

/**

76

* Retrieves a secret key from current user's credentials

77

* @param key Secret key name

78

* @return Secret value as byte array, or null if not found

79

*/

80

override def getSecretKeyFromUserCredentials(key: String): Array[Byte]

81

}

82

```

83

84

**Usage Example:**

85

86

```scala

87

import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil

88

89

// Create YARN-specific Hadoop utilities

90

val yarnUtils = new YarnSparkHadoopUtil()

91

92

// Check if running in YARN mode

93

val isYarn = yarnUtils.isYarnMode() // Always true

94

95

// Create YARN configuration

96

val sparkConf = new SparkConf()

97

val yarnConf = yarnUtils.newConfiguration(sparkConf)

98

99

// Manage credentials for secure clusters

100

val currentCreds = yarnUtils.getCurrentUserCredentials()

101

yarnUtils.addSecretKeyToUserCredentials("myapp.secret", "secretValue")

102

```

103

104

### YarnSparkHadoopUtil Object

105

106

Companion object providing constants, utility methods, and shared functionality for YARN operations.

107

108

```scala { .api }

109

/**

110

* Companion object with YARN utility constants and methods

111

*/

112

object YarnSparkHadoopUtil {

113

114

// Memory overhead configuration

115

val MEMORY_OVERHEAD_FACTOR: Double = 0.07 // 7% memory overhead

116

val MEMORY_OVERHEAD_MIN: Int = 384 // Minimum 384MB overhead

117

118

// Host and resource constants

119

val ANY_HOST: String = "*" // Wildcard for any host

120

val DEFAULT_NUMBER_EXECUTORS: Int = 2 // Default executor count

121

val RM_REQUEST_PRIORITY: Int = 1 // ResourceManager request priority

122

123

/**

124

* Adds a path variable to environment map

125

* Appends to existing value if key already exists

126

* @param env Environment map to modify

127

* @param key Environment variable name

128

* @param value Path value to add

129

*/

130

def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit

131

132

/**

133

* Sets environment variables from input string

134

* Input format: "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3"

135

* @param env Environment map to modify

136

* @param inputString Comma-separated key=value pairs

137

*/

138

def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit

139

140

/**

141

* Escapes argument for shell execution in YARN

142

* Handles special characters for bash execution

143

* @param arg Argument to escape

144

* @return Shell-escaped argument

145

*/

146

def escapeForShell(arg: String): String

147

148

/**

149

* Looks up rack information for a host

150

* Uses cached rack topology information

151

* @param conf Hadoop configuration

152

* @param host Hostname to look up

153

* @return Rack name for the host

154

*/

155

def lookupRack(conf: Configuration, host: String): String

156

157

/**

158

* Populates rack information cache for a hostname

159

* Resolves rack topology and caches results

160

* @param conf Hadoop configuration

161

* @param hostname Hostname to resolve

162

*/

163

def populateRackInfo(conf: Configuration, hostname: String): Unit

164

165

/**

166

* Gets application ACLs formatted for YARN

167

* Converts Spark security manager ACLs to YARN format

168

* @param securityMgr Spark security manager

169

* @return Map of YARN application access types to ACL strings

170

*/

171

def getApplicationAclsForYarn(securityMgr: SecurityManager): Map[ApplicationAccessType, String]

172

}

173

```

174

175

## Environment Management

176

177

### Path Variable Handling

178

179

Managing environment variables and paths for YARN containers:

180

181

```scala

182

import scala.collection.mutable.HashMap

183

184

val env = HashMap[String, String]()

185

186

// Add path to existing environment variable

187

YarnSparkHadoopUtil.addPathToEnvironment(env, "CLASSPATH", "/path/to/jar")

188

YarnSparkHadoopUtil.addPathToEnvironment(env, "CLASSPATH", "/another/path")

189

// Result: env("CLASSPATH") = "/path/to/jar:/another/path"

190

191

// Set multiple environment variables from string

192

YarnSparkHadoopUtil.setEnvFromInputString(env, "JAVA_HOME=/usr/lib/jvm/java-8,SPARK_HOME=/opt/spark")

193

```

194

195

### Shell Argument Escaping

196

197

Properly escape arguments for YARN's bash command execution:

198

199

```scala

200

val unsafeArg = "my file with spaces and $special chars"

201

val safeArg = YarnSparkHadoopUtil.escapeForShell(unsafeArg)

202

// Result: 'my file with spaces and \$special chars'

203

204

// Use in YARN container commands

205

val command = s"java -cp ${YarnSparkHadoopUtil.escapeForShell(classpath)} MyClass"

206

```

207

208

## Security and Credentials

209

210

### Credential Management

211

212

Handle security credentials for secure Hadoop clusters:

213

214

```scala

215

import org.apache.hadoop.security.{Credentials, UserGroupInformation}

216

217

val yarnUtils = new YarnSparkHadoopUtil()

218

219

// Get current user credentials

220

val currentCreds = yarnUtils.getCurrentUserCredentials()

221

222

// Add secret keys for application security

223

yarnUtils.addSecretKeyToUserCredentials("spark.authenticate.secret", "mySecretKey")

224

yarnUtils.addSecretKeyToUserCredentials("app.custom.token", "applicationToken")

225

226

// Retrieve secret keys

227

val authSecret = yarnUtils.getSecretKeyFromUserCredentials("spark.authenticate.secret")

228

val appToken = yarnUtils.getSecretKeyFromUserCredentials("app.custom.token")

229

230

// Transfer credentials between users

231

val sourceUGI = UserGroupInformation.getCurrentUser()

232

val destUGI = UserGroupInformation.createProxyUser("appuser", sourceUGI)

233

yarnUtils.transferCredentials(sourceUGI, destUGI)

234

```

235

236

### Kerberos Integration

237

238

Integration with Kerberos authentication:

239

240

```scala

241

import org.apache.hadoop.mapred.JobConf

242

243

val jobConf = new JobConf()

244

val yarnUtils = new YarnSparkHadoopUtil()

245

246

// Add current user's Kerberos credentials to job configuration

247

yarnUtils.addCredentials(jobConf)

248

249

// Credentials include:

250

// - Kerberos tickets

251

// - HDFS delegation tokens

252

// - Other service tokens (HBase, Hive, etc.)

253

```

254

255

### Application ACLs

256

257

Convert Spark security settings to YARN application ACLs:

258

259

```scala

260

import org.apache.spark.SecurityManager

261

import org.apache.hadoop.yarn.api.records.ApplicationAccessType

262

263

val sparkConf = new SparkConf()

264

.set("spark.ui.view.acls", "user1,user2")

265

.set("spark.modify.acls", "admin1,admin2")

266

267

val securityMgr = new SecurityManager(sparkConf)

268

val yarnAcls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)

269

270

// Result map:

271

// ApplicationAccessType.VIEW_APP -> "user1,user2"

272

// ApplicationAccessType.MODIFY_APP -> "admin1,admin2"

273

```

274

275

## Network Topology and Rack Awareness

276

277

### Rack Information Management

278

279

Efficient rack topology resolution and caching:

280

281

```scala

282

import org.apache.hadoop.conf.Configuration

283

284

val hadoopConf = new Configuration()

285

286

// Look up rack for a host (with caching)

287

val rack1 = YarnSparkHadoopUtil.lookupRack(hadoopConf, "node1.example.com")

288

val rack2 = YarnSparkHadoopUtil.lookupRack(hadoopConf, "node2.example.com")

289

290

// Explicitly populate rack information cache

291

YarnSparkHadoopUtil.populateRackInfo(hadoopConf, "node3.example.com")

292

293

// Benefits:

294

// - Cached lookups for performance

295

// - Data locality optimization

296

// - Cross-rack communication minimization

297

```

298

299

### Topology-Aware Scheduling

300

301

Integration with Spark's scheduler for optimal task placement:

302

303

```scala

304

// Used internally by schedulers for rack-aware task placement

305

class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {

306

override def getRackForHost(hostPort: String): Option[String] = {

307

val host = Utils.parseHostPort(hostPort)._1

308

Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))

309

}

310

}

311

```

312

313

## Resource Configuration

314

315

### Memory Overhead Calculation

316

317

Automatic memory overhead calculation for YARN containers:

318

319

```scala

320

// Constants for memory overhead

321

val overheadFactor = YarnSparkHadoopUtil.MEMORY_OVERHEAD_FACTOR // 0.07 (7%)

322

val minOverhead = YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN // 384 MB

323

324

// Calculate memory overhead for executor

325

def calculateMemoryOverhead(executorMemoryMB: Int): Int = {

326

val calculatedOverhead = (executorMemoryMB * overheadFactor).toInt

327

math.max(calculatedOverhead, minOverhead)

328

}

329

330

// Example: 2GB executor memory

331

val executorMem = 2048 // 2GB

332

val overhead = calculateMemoryOverhead(executorMem) // max(143, 384) = 384 MB

333

val totalMemory = executorMem + overhead // 2432 MB requested from YARN

334

```

335

336

### Resource Request Configuration

337

338

Default values and constants for resource requests:

339

340

```scala

341

// Default configuration values

342

val defaultExecutors = YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS // 2

343

val requestPriority = YarnSparkHadoopUtil.RM_REQUEST_PRIORITY // 1

344

val anyHost = YarnSparkHadoopUtil.ANY_HOST // "*"

345

346

// Use in resource requests to YARN ResourceManager

347

val resourceRequest = Records.newRecord(classOf[ResourceRequest])

348

resourceRequest.setPriority(Priority.newInstance(requestPriority))

349

resourceRequest.setResourceName(anyHost) // No locality preference

350

```

351

352

## Configuration Integration

353

354

### YARN Configuration Creation

355

356

Create properly configured YarnConfiguration instances:

357

358

```scala

359

val sparkConf = new SparkConf()

360

.set("spark.yarn.queue", "production")

361

.set("spark.yarn.am.memory", "1g")

362

363

val yarnUtils = new YarnSparkHadoopUtil()

364

val yarnConf = yarnUtils.newConfiguration(sparkConf).asInstanceOf[YarnConfiguration]

365

366

// YarnConfiguration includes:

367

// - Spark configuration properties

368

// - Hadoop configuration from classpath

369

// - YARN-specific configuration

370

// - Security settings and credentials

371

```

372

373

### Mode Detection

374

375

Reliable detection of YARN execution mode:

376

377

```scala

378

val yarnUtils = new YarnSparkHadoopUtil()

379

380

// Always returns true in YARN module

381

val isYarnMode = yarnUtils.isYarnMode()

382

383

// Used by Spark core to enable YARN-specific behavior:

384

// - Different security handling

385

// - YARN-specific UI integration

386

// - Special configuration validation

387

// - YARN-aware error messages

388

```

389

390

## Error Handling and Diagnostics

391

392

### Robust Network Resolution

393

394

Reliable hostname and rack resolution with error handling:

395

396

```scala

397

// Handles various failure scenarios:

398

// - DNS resolution failures

399

// - Network connectivity issues

400

// - Rack resolver configuration problems

401

// - Invalid hostname formats

402

403

try {

404

YarnSparkHadoopUtil.populateRackInfo(hadoopConf, hostname)

405

val rack = YarnSparkHadoopUtil.lookupRack(hadoopConf, hostname)

406

} catch {

407

case e: Exception =>

408

// Graceful degradation - continue without rack information

409

logWarning(s"Failed to resolve rack for $hostname", e)

410

}

411

```

412

413

### Security Error Handling

414

415

Comprehensive error handling for security operations:

416

417

```scala

418

try {

419

yarnUtils.addSecretKeyToUserCredentials(key, secret)

420

} catch {

421

case e: SecurityException =>

422

logError("Failed to add secret key - insufficient permissions", e)

423

case e: IOException =>

424

logError("Failed to add secret key - credential store error", e)

425

}

426

```

427

428

## Integration with Spark Components

429

430

### SparkContext Integration

431

432

Automatic integration with SparkContext initialization:

433

434

```scala

435

// Automatic registration as Hadoop util implementation

436

SparkHadoopUtil.get // Returns YarnSparkHadoopUtil instance in YARN mode

437

438

// Used throughout Spark for:

439

// - Configuration management

440

// - Security operations

441

// - File system access

442

// - Credential handling

443

```

444

445

### FileSystem Integration

446

447

Enhanced file system integration for YARN environments:

448

449

```scala

450

val yarnConf = yarnUtils.newConfiguration(sparkConf)

451

val fs = FileSystem.get(yarnConf)

452

453

// FileSystem configured with:

454

// - YARN-specific authentication

455

// - Proper delegation tokens

456

// - Security credentials

457

// - Optimal configuration for YARN clusters

458

```