0
# Security Integration
1
2
Security credential management and delegation token handling for secure YARN clusters with Kerberos authentication. This module provides extensible security integration patterns and automatic credential renewal for long-running applications.
3
4
## Capabilities
5
6
### ServiceCredentialProvider
7
8
Main extension point for implementing custom secure service credential providers. Enables integration with various secure services beyond the built-in Hadoop ecosystem.
9
10
```scala { .api }
11
trait ServiceCredentialProvider {
12
def serviceName: String
13
def credentialsRequired(hadoopConf: Configuration): Boolean
14
def obtainCredentials(hadoopConf: Configuration, sparkConf: SparkConf, creds: Credentials): Option[Long]
15
}
16
```
17
18
**Core Methods:**
19
20
**`serviceName: String`**
21
- Returns unique identifier for the service
22
- Used for service discovery and configuration
23
- Should be lowercase and descriptive (e.g., "hdfs", "hive", "hbase")
24
25
**`credentialsRequired(hadoopConf: Configuration): Boolean`**
26
- Determines if credentials are needed for this service
27
- Examines Hadoop configuration for service-specific settings
28
- Returns true if delegation tokens should be obtained
29
30
**`obtainCredentials(hadoopConf, sparkConf, creds): Option[Long]`**
31
- Obtains delegation tokens for the service
32
- Adds tokens to the provided Credentials object
33
- Returns token renewal time in milliseconds, or None if no renewal needed
34
35
**Implementation Example:**
36
37
```scala
38
import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
39
import org.apache.hadoop.conf.Configuration
40
import org.apache.hadoop.security.Credentials
41
import org.apache.spark.SparkConf
42
43
class MyServiceCredentialProvider extends ServiceCredentialProvider {
44
override def serviceName: String = "myservice"
45
46
override def credentialsRequired(hadoopConf: Configuration): Boolean = {
47
// Check if service is enabled and security is required
48
hadoopConf.getBoolean("myservice.security.enabled", false) &&
49
hadoopConf.get("hadoop.security.authentication", "simple") == "kerberos"
50
}
51
52
override def obtainCredentials(
53
hadoopConf: Configuration,
54
sparkConf: SparkConf,
55
creds: Credentials): Option[Long] = {
56
57
if (credentialsRequired(hadoopConf)) {
58
// Connect to service and obtain delegation token
59
val serviceClient = new MyServiceClient(hadoopConf)
60
val token = serviceClient.getDelegationToken("spark-user")
61
62
// Add token to credentials
63
creds.addToken(token.getService, token)
64
65
// Return renewal time (e.g., 24 hours from now)
66
Some(System.currentTimeMillis() + 24 * 60 * 60 * 1000)
67
} else {
68
None
69
}
70
}
71
}
72
```
73
74
**Service Registration:**
75
```scala
76
// Register provider through ServiceLoader mechanism
77
// META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
78
com.example.MyServiceCredentialProvider
79
```
80
81
### AMCredentialRenewer
82
83
Manages automatic credential renewal for long-running applications in secure clusters.
84
85
```scala { .api }
86
class AMCredentialRenewer(
87
sparkConf: SparkConf,
88
hadoopConf: Configuration,
89
amClient: AMRMClient[_]
90
) {
91
def start(): Unit
92
def stop(): Unit
93
def isRunning: Boolean
94
}
95
```
96
97
**Credential Renewal Process:**
98
1. Scans for renewable delegation tokens
99
2. Schedules renewal based on token expiration times
100
3. Obtains fresh tokens before expiration
101
4. Updates credentials in running executors
102
5. Handles renewal failures gracefully
103
104
**Usage Example:**
105
106
```scala
107
// AMCredentialRenewer is automatically managed by ApplicationMaster
108
// Configuration controls renewal behavior
109
110
val sparkConf = new SparkConf()
111
.set("spark.yarn.credentials.file", "/path/to/credentials")
112
.set("spark.yarn.credentials.renewalTime", "24h")
113
.set("spark.yarn.credentials.updateTime", "1h")
114
115
// Renewal happens automatically in secure clusters
116
```
117
118
**Configuration Options:**
119
- `spark.yarn.credentials.file`: Path to delegation token file
120
- `spark.yarn.credentials.renewalTime`: How often to renew tokens
121
- `spark.yarn.credentials.updateTime`: How often to update executor credentials
122
123
### YARNHadoopDelegationTokenManager
124
125
Manages Hadoop delegation tokens specifically for YARN applications, coordinating with registered ServiceCredentialProvider implementations.
126
127
```scala { .api }
128
class YARNHadoopDelegationTokenManager(
129
sparkConf: SparkConf,
130
hadoopConf: Configuration,
131
scheduler: TaskScheduler
132
) {
133
def obtainTokensForNamenodes(paths: Set[Path]): Unit
134
def renewTokens(): Unit
135
def stop(): Unit
136
}
137
```
138
139
**Token Management:**
140
- Obtains tokens for HDFS NameNodes based on input/output paths
141
- Coordinates with all registered ServiceCredentialProvider instances
142
- Handles token renewal scheduling and execution
143
- Distributes updated tokens to running executors
144
145
## Built-in Security Providers
146
147
### HDFS Integration
148
149
```scala
150
// HDFS tokens are obtained automatically for:
151
val sparkConf = new SparkConf()
152
.set("spark.yarn.access.hadoopFileSystems", "hdfs://namenode1:8020,hdfs://namenode2:8020")
153
.set("spark.hadoop.fs.defaultFS", "hdfs://namenode1:8020")
154
155
// Automatic token acquisition for configured filesystems
156
```
157
158
### Hive Integration
159
160
```scala
161
// Hive metastore tokens when using Spark SQL
162
val sparkConf = new SparkConf()
163
.set("spark.sql.hive.metastore.version", "2.3.0")
164
.set("spark.hadoop.hive.metastore.uris", "thrift://hive-metastore:9083")
165
.set("spark.hadoop.hive.metastore.sasl.enabled", "true")
166
167
// Tokens obtained automatically when Hive is configured
168
```
169
170
### HBase Integration
171
172
```scala
173
// HBase delegation tokens for secure HBase clusters
174
val sparkConf = new SparkConf()
175
.set("spark.hadoop.hbase.security.authentication", "kerberos")
176
.set("spark.hadoop.hbase.master.kerberos.principal", "hbase/_HOST@REALM")
177
.set("spark.hadoop.hbase.regionserver.kerberos.principal", "hbase/_HOST@REALM")
178
179
// Custom HBase credential provider can be implemented
180
```
181
182
## Kerberos Integration
183
184
### Principal and Keytab Configuration
185
186
```scala
187
val sparkConf = new SparkConf()
188
.set("spark.yarn.principal", "spark/hostname@REALM")
189
.set("spark.yarn.keytab", "/path/to/spark.keytab")
190
.set("spark.yarn.access.hadoopFileSystems", "hdfs://namenode:8020")
191
```
192
193
**Authentication Flow:**
194
1. ApplicationMaster authenticates using principal/keytab
195
2. Obtains delegation tokens for configured services
196
3. Distributes tokens to executor containers
197
4. Renews tokens before expiration
198
5. Updates executors with fresh tokens
199
200
### Proxy User Support
201
202
```scala
203
// Running as proxy user in secure cluster
204
val sparkConf = new SparkConf()
205
.set("spark.yarn.principal", "spark/hostname@REALM")
206
.set("spark.yarn.keytab", "/path/to/spark.keytab")
207
.set("spark.sql.hive.hiveserver2.jdbc.url.principal", "hive/_HOST@REALM")
208
209
// Spark service principal can proxy for end users
210
```
211
212
## Credential Distribution
213
214
### Token File Management
215
216
```scala
217
// Credential file lifecycle
218
val credentialFile = "/tmp/spark-credentials-" + UUID.randomUUID()
219
220
// Tokens written to file for executor distribution
221
val creds = new Credentials()
222
// ... populate credentials
223
creds.writeTokenStorageFile(credentialFile, hadoopConf)
224
225
// File distributed to executors via YARN LocalResource
226
val localResource = LocalResource.newInstance(
227
ConverterUtils.getYarnUrlFromPath(new Path(credentialFile)),
228
LocalResourceType.FILE,
229
LocalResourceVisibility.PRIVATE,
230
fileStatus.getLen,
231
fileStatus.getModificationTime
232
)
233
```
234
235
### Dynamic Token Updates
236
237
```scala
238
// Executors receive updated tokens through RPC
239
case class UpdateDelegationTokens(tokens: Array[Byte])
240
241
// ApplicationMaster broadcasts token updates
242
def updateExecutorCredentials(newTokens: Credentials): Unit = {
243
val tokenBytes = SparkHadoopUtil.get.serialize(newTokens)
244
val message = UpdateDelegationTokens(tokenBytes)
245
246
// Send to all executors
247
scheduler.executorIds.foreach { executorId =>
248
scheduler.executorEndpointRef(executorId).send(message)
249
}
250
}
251
```
252
253
## Security Configuration
254
255
### Core Security Settings
256
257
```scala
258
// Enable security in YARN mode
259
val sparkConf = new SparkConf()
260
.set("spark.authenticate", "true")
261
.set("spark.authenticate.secret", "shared-secret")
262
.set("spark.network.crypto.enabled", "true")
263
.set("spark.io.encryption.enabled", "true")
264
```
265
266
### YARN-Specific Security
267
268
```scala
269
// YARN security configuration
270
val sparkConf = new SparkConf()
271
.set("spark.yarn.security.credentials.hadoopfs.enabled", "true")
272
.set("spark.yarn.security.credentials.hive.enabled", "true")
273
.set("spark.yarn.security.credentials.hbase.enabled", "true")
274
.set("spark.yarn.maxAppAttempts", "1") // Reduce attempts in secure mode
275
```
276
277
### SSL/TLS Configuration
278
279
```scala
280
// SSL configuration for secure communication
281
val sparkConf = new SparkConf()
282
.set("spark.ssl.enabled", "true")
283
.set("spark.ssl.keyStore", "/path/to/keystore.jks")
284
.set("spark.ssl.keyStorePassword", "keystore-password")
285
.set("spark.ssl.trustStore", "/path/to/truststore.jks")
286
.set("spark.ssl.trustStorePassword", "truststore-password")
287
```
288
289
## Error Handling
290
291
### Authentication Failures
292
293
```scala
294
// Common authentication errors
295
throw new IOException("Failed to authenticate with Kerberos KDC")
296
throw new AccessControlException("User not authorized for queue: production")
297
throw new TokenException("Delegation token has expired")
298
```
299
300
### Token Renewal Failures
301
302
```scala
303
// Token renewal error handling
304
try {
305
credentialRenewer.renewTokens()
306
} catch {
307
case e: IOException =>
308
logError("Failed to renew delegation tokens", e)
309
// Attempt re-authentication with keytab
310
authenticateWithKeytab()
311
case e: InterruptedException =>
312
logWarning("Token renewal interrupted")
313
Thread.currentThread().interrupt()
314
}
315
```
316
317
### Security Policy Violations
318
319
```scala
320
// Security policy enforcement
321
def validateSecureAccess(user: String, resource: String): Unit = {
322
if (!securityManager.checkAccess(user, resource)) {
323
throw new AccessControlException(s"User $user denied access to $resource")
324
}
325
}
326
```
327
328
## Advanced Security Patterns
329
330
### Custom Authentication
331
332
```scala
333
class CustomAuthenticationProvider extends ServiceCredentialProvider {
334
override def serviceName: String = "custom-auth"
335
336
override def obtainCredentials(
337
hadoopConf: Configuration,
338
sparkConf: SparkConf,
339
creds: Credentials): Option[Long] = {
340
341
// Custom authentication logic
342
val authToken = performCustomAuth(hadoopConf, sparkConf)
343
creds.addToken(new Text("custom-service"), authToken)
344
345
// Return renewal time
346
Some(System.currentTimeMillis() + renewalIntervalMs)
347
}
348
349
private def performCustomAuth(hadoopConf: Configuration, sparkConf: SparkConf): Token[_] = {
350
// Implement custom authentication protocol
351
// Return delegation token for the service
352
???
353
}
354
}
355
```
356
357
### Multi-Cluster Security
358
359
```scala
360
// Security configuration for multi-cluster access
361
val sparkConf = new SparkConf()
362
.set("spark.yarn.access.hadoopFileSystems",
363
"hdfs://cluster1:8020,hdfs://cluster2:8020,hdfs://cluster3:8020")
364
.set("spark.hadoop.fs.hdfs.impl.disable.cache", "true") // Avoid connection caching
365
366
// Tokens obtained for all configured clusters
367
```
368
369
### Security Monitoring
370
371
```scala
372
// Security event monitoring
373
class SecurityEventListener extends SparkListener {
374
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
375
logInfo("Application started in secure mode")
376
auditSecurityConfiguration()
377
}
378
379
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
380
logInfo(s"Executor ${executorAdded.executorId} added with security context")
381
validateExecutorSecurity(executorAdded.executorId)
382
}
383
}
384
```
385
386
## Troubleshooting Security Issues
387
388
### Common Issues
389
390
**Token Expiration:**
391
```scala
392
// Symptoms: Applications fail after running for extended periods
393
// Solutions:
394
// 1. Configure automatic renewal
395
val conf = new SparkConf()
396
.set("spark.yarn.credentials.renewalTime", "12h")
397
398
// 2. Use long-lived keytabs instead of tickets
399
.set("spark.yarn.principal", "spark/_HOST@REALM")
400
.set("spark.yarn.keytab", "/etc/security/keytabs/spark.headless.keytab")
401
```
402
403
**Cross-Realm Authentication:**
404
```scala
405
// Configure cross-realm trust
406
val sparkConf = new SparkConf()
407
.set("spark.hadoop.hadoop.security.auth_to_local", "RULE:[2:$1@$0](.*@REALM2)s/@REALM2/@REALM1/")
408
.set("spark.yarn.principal", "spark/_HOST@REALM1")
409
```
410
411
**Service Discovery Issues:**
412
```scala
413
// Ensure service credential providers are on classpath
414
// Check ServiceLoader registration
415
val providers = ServiceLoader.load(classOf[ServiceCredentialProvider])
416
providers.forEach(p => logInfo(s"Found provider: ${p.serviceName}"))
417
```