or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-deployment.mdcluster-management.mdcommand-building.mdconfiguration-system.mdextension-points.mdindex.mdresource-management.mdsecurity-integration.mdyarn-shuffle-service.md

security-integration.mddocs/

0

# Security Integration

1

2

Security credential management and delegation token handling for secure YARN clusters with Kerberos authentication. This module provides extensible security integration patterns and automatic credential renewal for long-running applications.

3

4

## Capabilities

5

6

### ServiceCredentialProvider

7

8

Main extension point for implementing custom secure service credential providers. Enables integration with various secure services beyond the built-in Hadoop ecosystem.

9

10

```scala { .api }

11

trait ServiceCredentialProvider {

12

def serviceName: String

13

def credentialsRequired(hadoopConf: Configuration): Boolean

14

def obtainCredentials(hadoopConf: Configuration, sparkConf: SparkConf, creds: Credentials): Option[Long]

15

}

16

```

17

18

**Core Methods:**

19

20

**`serviceName: String`**

21

- Returns unique identifier for the service

22

- Used for service discovery and configuration

23

- Should be lowercase and descriptive (e.g., "hdfs", "hive", "hbase")

24

25

**`credentialsRequired(hadoopConf: Configuration): Boolean`**

26

- Determines if credentials are needed for this service

27

- Examines Hadoop configuration for service-specific settings

28

- Returns true if delegation tokens should be obtained

29

30

**`obtainCredentials(hadoopConf, sparkConf, creds): Option[Long]`**

31

- Obtains delegation tokens for the service

32

- Adds tokens to the provided Credentials object

33

- Returns token renewal time in milliseconds, or None if no renewal needed

34

35

**Implementation Example:**

36

37

```scala

38

import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider

39

import org.apache.hadoop.conf.Configuration

40

import org.apache.hadoop.security.Credentials

41

import org.apache.spark.SparkConf

42

43

class MyServiceCredentialProvider extends ServiceCredentialProvider {

44

override def serviceName: String = "myservice"

45

46

override def credentialsRequired(hadoopConf: Configuration): Boolean = {

47

// Check if service is enabled and security is required

48

hadoopConf.getBoolean("myservice.security.enabled", false) &&

49

hadoopConf.get("hadoop.security.authentication", "simple") == "kerberos"

50

}

51

52

override def obtainCredentials(

53

hadoopConf: Configuration,

54

sparkConf: SparkConf,

55

creds: Credentials): Option[Long] = {

56

57

if (credentialsRequired(hadoopConf)) {

58

// Connect to service and obtain delegation token

59

val serviceClient = new MyServiceClient(hadoopConf)

60

val token = serviceClient.getDelegationToken("spark-user")

61

62

// Add token to credentials

63

creds.addToken(token.getService, token)

64

65

// Return renewal time (e.g., 24 hours from now)

66

Some(System.currentTimeMillis() + 24 * 60 * 60 * 1000)

67

} else {

68

None

69

}

70

}

71

}

72

```

73

74

**Service Registration:**

75

```scala

76

// Register provider through ServiceLoader mechanism

77

// META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider

78

com.example.MyServiceCredentialProvider

79

```

80

81

### AMCredentialRenewer

82

83

Manages automatic credential renewal for long-running applications in secure clusters.

84

85

```scala { .api }

86

class AMCredentialRenewer(

87

sparkConf: SparkConf,

88

hadoopConf: Configuration,

89

amClient: AMRMClient[_]

90

) {

91

def start(): Unit

92

def stop(): Unit

93

def isRunning: Boolean

94

}

95

```

96

97

**Credential Renewal Process:**

98

1. Scans for renewable delegation tokens

99

2. Schedules renewal based on token expiration times

100

3. Obtains fresh tokens before expiration

101

4. Updates credentials in running executors

102

5. Handles renewal failures gracefully

103

104

**Usage Example:**

105

106

```scala

107

// AMCredentialRenewer is automatically managed by ApplicationMaster

108

// Configuration controls renewal behavior

109

110

val sparkConf = new SparkConf()

111

.set("spark.yarn.credentials.file", "/path/to/credentials")

112

.set("spark.yarn.credentials.renewalTime", "24h")

113

.set("spark.yarn.credentials.updateTime", "1h")

114

115

// Renewal happens automatically in secure clusters

116

```

117

118

**Configuration Options:**

119

- `spark.yarn.credentials.file`: Path to delegation token file

120

- `spark.yarn.credentials.renewalTime`: How often to renew tokens

121

- `spark.yarn.credentials.updateTime`: How often to update executor credentials

122

123

### YARNHadoopDelegationTokenManager

124

125

Manages Hadoop delegation tokens specifically for YARN applications, coordinating with registered ServiceCredentialProvider implementations.

126

127

```scala { .api }

128

class YARNHadoopDelegationTokenManager(

129

sparkConf: SparkConf,

130

hadoopConf: Configuration,

131

scheduler: TaskScheduler

132

) {

133

def obtainTokensForNamenodes(paths: Set[Path]): Unit

134

def renewTokens(): Unit

135

def stop(): Unit

136

}

137

```

138

139

**Token Management:**

140

- Obtains tokens for HDFS NameNodes based on input/output paths

141

- Coordinates with all registered ServiceCredentialProvider instances

142

- Handles token renewal scheduling and execution

143

- Distributes updated tokens to running executors

144

145

## Built-in Security Providers

146

147

### HDFS Integration

148

149

```scala

150

// HDFS tokens are obtained automatically for:

151

val sparkConf = new SparkConf()

152

.set("spark.yarn.access.hadoopFileSystems", "hdfs://namenode1:8020,hdfs://namenode2:8020")

153

.set("spark.hadoop.fs.defaultFS", "hdfs://namenode1:8020")

154

155

// Automatic token acquisition for configured filesystems

156

```

157

158

### Hive Integration

159

160

```scala

161

// Hive metastore tokens when using Spark SQL

162

val sparkConf = new SparkConf()

163

.set("spark.sql.hive.metastore.version", "2.3.0")

164

.set("spark.hadoop.hive.metastore.uris", "thrift://hive-metastore:9083")

165

.set("spark.hadoop.hive.metastore.sasl.enabled", "true")

166

167

// Tokens obtained automatically when Hive is configured

168

```

169

170

### HBase Integration

171

172

```scala

173

// HBase delegation tokens for secure HBase clusters

174

val sparkConf = new SparkConf()

175

.set("spark.hadoop.hbase.security.authentication", "kerberos")

176

.set("spark.hadoop.hbase.master.kerberos.principal", "hbase/_HOST@REALM")

177

.set("spark.hadoop.hbase.regionserver.kerberos.principal", "hbase/_HOST@REALM")

178

179

// Custom HBase credential provider can be implemented

180

```

181

182

## Kerberos Integration

183

184

### Principal and Keytab Configuration

185

186

```scala

187

val sparkConf = new SparkConf()

188

.set("spark.yarn.principal", "spark/hostname@REALM")

189

.set("spark.yarn.keytab", "/path/to/spark.keytab")

190

.set("spark.yarn.access.hadoopFileSystems", "hdfs://namenode:8020")

191

```

192

193

**Authentication Flow:**

194

1. ApplicationMaster authenticates using principal/keytab

195

2. Obtains delegation tokens for configured services

196

3. Distributes tokens to executor containers

197

4. Renews tokens before expiration

198

5. Updates executors with fresh tokens

199

200

### Proxy User Support

201

202

```scala

203

// Running as proxy user in secure cluster

204

val sparkConf = new SparkConf()

205

.set("spark.yarn.principal", "spark/hostname@REALM")

206

.set("spark.yarn.keytab", "/path/to/spark.keytab")

207

.set("spark.sql.hive.hiveserver2.jdbc.url.principal", "hive/_HOST@REALM")

208

209

// Spark service principal can proxy for end users

210

```

211

212

## Credential Distribution

213

214

### Token File Management

215

216

```scala

217

// Credential file lifecycle

218

val credentialFile = "/tmp/spark-credentials-" + UUID.randomUUID()

219

220

// Tokens written to file for executor distribution

221

val creds = new Credentials()

222

// ... populate credentials

223

creds.writeTokenStorageFile(credentialFile, hadoopConf)

224

225

// File distributed to executors via YARN LocalResource

226

val localResource = LocalResource.newInstance(

227

ConverterUtils.getYarnUrlFromPath(new Path(credentialFile)),

228

LocalResourceType.FILE,

229

LocalResourceVisibility.PRIVATE,

230

fileStatus.getLen,

231

fileStatus.getModificationTime

232

)

233

```

234

235

### Dynamic Token Updates

236

237

```scala

238

// Executors receive updated tokens through RPC

239

case class UpdateDelegationTokens(tokens: Array[Byte])

240

241

// ApplicationMaster broadcasts token updates

242

def updateExecutorCredentials(newTokens: Credentials): Unit = {

243

val tokenBytes = SparkHadoopUtil.get.serialize(newTokens)

244

val message = UpdateDelegationTokens(tokenBytes)

245

246

// Send to all executors

247

scheduler.executorIds.foreach { executorId =>

248

scheduler.executorEndpointRef(executorId).send(message)

249

}

250

}

251

```

252

253

## Security Configuration

254

255

### Core Security Settings

256

257

```scala

258

// Enable security in YARN mode

259

val sparkConf = new SparkConf()

260

.set("spark.authenticate", "true")

261

.set("spark.authenticate.secret", "shared-secret")

262

.set("spark.network.crypto.enabled", "true")

263

.set("spark.io.encryption.enabled", "true")

264

```

265

266

### YARN-Specific Security

267

268

```scala

269

// YARN security configuration

270

val sparkConf = new SparkConf()

271

.set("spark.yarn.security.credentials.hadoopfs.enabled", "true")

272

.set("spark.yarn.security.credentials.hive.enabled", "true")

273

.set("spark.yarn.security.credentials.hbase.enabled", "true")

274

.set("spark.yarn.maxAppAttempts", "1") // Reduce attempts in secure mode

275

```

276

277

### SSL/TLS Configuration

278

279

```scala

280

// SSL configuration for secure communication

281

val sparkConf = new SparkConf()

282

.set("spark.ssl.enabled", "true")

283

.set("spark.ssl.keyStore", "/path/to/keystore.jks")

284

.set("spark.ssl.keyStorePassword", "keystore-password")

285

.set("spark.ssl.trustStore", "/path/to/truststore.jks")

286

.set("spark.ssl.trustStorePassword", "truststore-password")

287

```

288

289

## Error Handling

290

291

### Authentication Failures

292

293

```scala

294

// Common authentication errors

295

throw new IOException("Failed to authenticate with Kerberos KDC")

296

throw new AccessControlException("User not authorized for queue: production")

297

throw new TokenException("Delegation token has expired")

298

```

299

300

### Token Renewal Failures

301

302

```scala

303

// Token renewal error handling

304

try {

305

credentialRenewer.renewTokens()

306

} catch {

307

case e: IOException =>

308

logError("Failed to renew delegation tokens", e)

309

// Attempt re-authentication with keytab

310

authenticateWithKeytab()

311

case e: InterruptedException =>

312

logWarning("Token renewal interrupted")

313

Thread.currentThread().interrupt()

314

}

315

```

316

317

### Security Policy Violations

318

319

```scala

320

// Security policy enforcement

321

def validateSecureAccess(user: String, resource: String): Unit = {

322

if (!securityManager.checkAccess(user, resource)) {

323

throw new AccessControlException(s"User $user denied access to $resource")

324

}

325

}

326

```

327

328

## Advanced Security Patterns

329

330

### Custom Authentication

331

332

```scala

333

class CustomAuthenticationProvider extends ServiceCredentialProvider {

334

override def serviceName: String = "custom-auth"

335

336

override def obtainCredentials(

337

hadoopConf: Configuration,

338

sparkConf: SparkConf,

339

creds: Credentials): Option[Long] = {

340

341

// Custom authentication logic

342

val authToken = performCustomAuth(hadoopConf, sparkConf)

343

creds.addToken(new Text("custom-service"), authToken)

344

345

// Return renewal time

346

Some(System.currentTimeMillis() + renewalIntervalMs)

347

}

348

349

private def performCustomAuth(hadoopConf: Configuration, sparkConf: SparkConf): Token[_] = {

350

// Implement custom authentication protocol

351

// Return delegation token for the service

352

???

353

}

354

}

355

```

356

357

### Multi-Cluster Security

358

359

```scala

360

// Security configuration for multi-cluster access

361

val sparkConf = new SparkConf()

362

.set("spark.yarn.access.hadoopFileSystems",

363

"hdfs://cluster1:8020,hdfs://cluster2:8020,hdfs://cluster3:8020")

364

.set("spark.hadoop.fs.hdfs.impl.disable.cache", "true") // Avoid connection caching

365

366

// Tokens obtained for all configured clusters

367

```

368

369

### Security Monitoring

370

371

```scala

372

// Security event monitoring

373

class SecurityEventListener extends SparkListener {

374

override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {

375

logInfo("Application started in secure mode")

376

auditSecurityConfiguration()

377

}

378

379

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {

380

logInfo(s"Executor ${executorAdded.executorId} added with security context")

381

validateExecutorSecurity(executorAdded.executorId)

382

}

383

}

384

```

385

386

## Troubleshooting Security Issues

387

388

### Common Issues

389

390

**Token Expiration:**

391

```scala

392

// Symptoms: Applications fail after running for extended periods

393

// Solutions:

394

// 1. Configure automatic renewal

395

val conf = new SparkConf()

396

.set("spark.yarn.credentials.renewalTime", "12h")

397

398

// 2. Use long-lived keytabs instead of tickets

399

.set("spark.yarn.principal", "spark/_HOST@REALM")

400

.set("spark.yarn.keytab", "/etc/security/keytabs/spark.headless.keytab")

401

```

402

403

**Cross-Realm Authentication:**

404

```scala

405

// Configure cross-realm trust

406

val sparkConf = new SparkConf()

407

.set("spark.hadoop.hadoop.security.auth_to_local", "RULE:[2:$1@$0](.*@REALM2)s/@REALM2/@REALM1/")

408

.set("spark.yarn.principal", "spark/_HOST@REALM1")

409

```

410

411

**Service Discovery Issues:**

412

```scala

413

// Ensure service credential providers are on classpath

414

// Check ServiceLoader registration

415

val providers = ServiceLoader.load(classOf[ServiceCredentialProvider])

416

providers.forEach(p => logInfo(s"Found provider: ${p.serviceName}"))

417

```