0
# Token Utilities
1
2
Low-level token operations, authentication helpers, and Kafka admin client management for delegation token lifecycle management.
3
4
## Capabilities
5
6
### KafkaTokenUtil
7
8
Object providing low-level token operations and utility functions for Kafka delegation token management.
9
10
```scala { .api }
11
object KafkaTokenUtil extends Logging {
12
13
/** Token kind identifier for Kafka delegation tokens */
14
val TOKEN_KIND: Text = new Text("KAFKA_DELEGATION_TOKEN")
15
16
/**
17
* Creates a token service identifier for the given cluster identifier
18
* @param identifier Cluster identifier
19
* @return Text object representing the token service
20
*/
21
def getTokenService(identifier: String): Text
22
23
/**
24
* Obtains a delegation token from a Kafka cluster
25
* @param sparkConf Spark configuration
26
* @param clusterConf Cluster configuration containing authentication details
27
* @return Tuple of (delegation token, next renewal time in milliseconds)
28
*/
29
def obtainToken(
30
sparkConf: SparkConf,
31
clusterConf: KafkaTokenClusterConf
32
): (Token[KafkaDelegationTokenIdentifier], Long)
33
34
/**
35
* Validates that the current user is not a proxy user
36
* Throws exception if proxy user is detected
37
*/
38
def checkProxyUser(): Unit
39
40
/**
41
* Creates Kafka admin client properties for the given cluster configuration
42
* @param sparkConf Spark configuration
43
* @param clusterConf Cluster configuration
44
* @return Properties object configured for Kafka admin client
45
*/
46
def createAdminClientProperties(
47
sparkConf: SparkConf,
48
clusterConf: KafkaTokenClusterConf
49
): java.util.Properties
50
51
/**
52
* Checks if global JAAS configuration is provided
53
* @return true if java.security.auth.login.config system property is set
54
*/
55
def isGlobalJaasConfigurationProvided: Boolean
56
57
/**
58
* Finds matching cluster configuration for given bootstrap servers
59
* @param sparkConf Spark configuration
60
* @param bootStrapServers Bootstrap servers to match against
61
* @return Optional cluster configuration that matches the servers
62
*/
63
def findMatchingTokenClusterConfig(
64
sparkConf: SparkConf,
65
bootStrapServers: String
66
): Option[KafkaTokenClusterConf]
67
68
/**
69
* Generates JAAS parameters for token authentication
70
* @param clusterConf Cluster configuration containing token details
71
* @return JAAS parameter string for token-based authentication
72
*/
73
def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String
74
75
/**
76
* Checks if token configuration needs updating in the given parameters
77
* @param params Current Kafka parameters
78
* @param clusterConfig Optional cluster configuration
79
* @return true if token configuration update is needed
80
*/
81
def needTokenUpdate(
82
params: java.util.Map[String, Object],
83
clusterConfig: Option[KafkaTokenClusterConf]
84
): Boolean
85
86
/**
87
* Generates JAAS parameters for keytab authentication
88
* @param keyTab Path to keytab file
89
* @param principal Kerberos principal
90
* @param kerberosServiceName Kerberos service name
91
* @return JAAS parameter string for keytab-based authentication
92
*/
93
def getKeytabJaasParams(
94
keyTab: String,
95
principal: String,
96
kerberosServiceName: String
97
): String
98
}
99
```
100
101
**Usage Examples:**
102
103
```scala
104
import org.apache.spark.SparkConf
105
import org.apache.spark.kafka010.{KafkaTokenUtil, KafkaTokenClusterConf}
106
107
// Setup cluster configuration
108
val clusterConf = KafkaTokenClusterConf(
109
identifier = "prod-cluster",
110
authBootstrapServers = "kafka1:9092,kafka2:9092",
111
targetServersRegex = "kafka.*:9092",
112
securityProtocol = "SASL_SSL",
113
kerberosServiceName = "kafka",
114
trustStoreLocation = Some("/etc/kafka/truststore.jks"),
115
trustStorePassword = Some("password"),
116
tokenMechanism = "SCRAM-SHA-512",
117
// ... other configuration
118
)
119
120
val sparkConf = new SparkConf()
121
122
// Check proxy user constraints
123
try {
124
KafkaTokenUtil.checkProxyUser()
125
println("User validation passed")
126
} catch {
127
case e: Exception => println(s"Proxy user validation failed: ${e.getMessage}")
128
}
129
130
// Create admin client properties
131
val adminProps = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)
132
println(s"Admin client configured with ${adminProps.size()} properties")
133
134
// Obtain delegation token
135
try {
136
val (token, renewalTime) = KafkaTokenUtil.obtainToken(sparkConf, clusterConf)
137
println(s"Token obtained: ${token.getService}")
138
println(s"Next renewal: $renewalTime")
139
} catch {
140
case e: Exception => println(s"Token acquisition failed: ${e.getMessage}")
141
}
142
```
143
144
### Token Service Management
145
146
Creates and manages token service identifiers for Kafka clusters.
147
148
```scala { .api }
149
/**
150
* Creates a token service identifier for the given cluster identifier
151
* Service identifiers are used to uniquely identify tokens in the credential store
152
*
153
* @param identifier Cluster identifier from configuration
154
* @return Text object representing the token service in format: kafka.server.delegation.token.<identifier>
155
*/
156
def getTokenService(identifier: String): Text
157
```
158
159
### Token Acquisition
160
161
Obtains delegation tokens from Kafka clusters using appropriate authentication methods.
162
163
```scala { .api }
164
/**
165
* Obtains a delegation token from a Kafka cluster
166
* Process:
167
* 1. Creates Kafka admin client with authentication credentials
168
* 2. Requests delegation token with specified parameters
169
* 3. Converts Kafka token to Hadoop token format
170
* 4. Calculates next renewal time based on token lifetime
171
*
172
* @param sparkConf Spark configuration for general settings
173
* @param clusterConf Cluster configuration containing authentication details
174
* @return Tuple of (Hadoop delegation token, next renewal time in milliseconds since epoch)
175
* @throws Exception if token acquisition fails due to authentication, network, or permission issues
176
*/
177
def obtainToken(
178
sparkConf: SparkConf,
179
clusterConf: KafkaTokenClusterConf
180
): (Token[KafkaDelegationTokenIdentifier], Long)
181
```
182
183
### Authentication Helpers
184
185
Utility functions for managing different authentication methods and configurations.
186
187
```scala { .api }
188
/**
189
* Validates that the current user is not a proxy user
190
* Proxy users are not supported for delegation token operations
191
* @throws IllegalArgumentException if current user is a proxy user
192
*/
193
def checkProxyUser(): Unit
194
195
/**
196
* Checks if global JAAS configuration is provided via system property
197
* @return true if java.security.auth.login.config system property is set and points to valid file
198
*/
199
def isGlobalJaasConfigurationProvided: Boolean
200
201
/**
202
* Generates JAAS parameters for token authentication
203
* Creates ScramLoginModule configuration with delegation token parameters
204
*
205
* @param clusterConf Cluster configuration containing token mechanism and other settings
206
* @return JAAS parameter string formatted for ScramLoginModule with token authentication
207
*/
208
def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String
209
210
/**
211
* Generates JAAS parameters for keytab authentication
212
* Creates Krb5LoginModule configuration for keytab-based Kerberos authentication
213
*
214
* @param keyTab Path to Kerberos keytab file
215
* @param principal Kerberos principal name
216
* @param kerberosServiceName Service name for Kerberos authentication
217
* @return JAAS parameter string formatted for Krb5LoginModule with keytab authentication
218
*/
219
def getKeytabJaasParams(
220
keyTab: String,
221
principal: String,
222
kerberosServiceName: String
223
): String
224
```
225
226
### Admin Client Management
227
228
Creates and configures Kafka admin clients for token operations.
229
230
```scala { .api }
231
/**
232
* Creates Kafka admin client properties for the given cluster configuration
233
* Configures all necessary properties including:
234
* - Bootstrap servers
235
* - Security protocol and SASL mechanism
236
* - SSL keystore and truststore settings
237
* - Authentication parameters (JAAS config)
238
* - Client identification
239
*
240
* @param sparkConf Spark configuration for general settings
241
* @param clusterConf Cluster-specific configuration
242
* @return Properties object ready for AdminClient.create()
243
*/
244
def createAdminClientProperties(
245
sparkConf: SparkConf,
246
clusterConf: KafkaTokenClusterConf
247
): java.util.Properties
248
```
249
250
### Configuration Matching
251
252
Utilities for matching configurations and determining update requirements.
253
254
```scala { .api }
255
/**
256
* Finds matching cluster configuration for given bootstrap servers
257
* Matches bootstrap servers against configured target server regex patterns
258
*
259
* @param sparkConf Spark configuration containing cluster definitions
260
* @param bootStrapServers Comma-separated list of bootstrap servers to match
261
* @return Optional cluster configuration whose target regex matches the bootstrap servers
262
*/
263
def findMatchingTokenClusterConfig(
264
sparkConf: SparkConf,
265
bootStrapServers: String
266
): Option[KafkaTokenClusterConf]
267
268
/**
269
* Checks if token configuration needs updating in the given parameters
270
* Examines current Kafka parameters to determine if token-based authentication
271
* configuration needs to be applied or updated
272
*
273
* @param params Current Kafka client parameters
274
* @param clusterConfig Optional cluster configuration for comparison
275
* @return true if token authentication configuration should be applied/updated
276
*/
277
def needTokenUpdate(
278
params: java.util.Map[String, Object],
279
clusterConfig: Option[KafkaTokenClusterConf]
280
): Boolean
281
```
282
283
### KafkaDelegationTokenIdentifier
284
285
Token identifier implementation for Kafka delegation tokens.
286
287
```scala { .api }
288
/**
289
* Hadoop token identifier implementation for Kafka delegation tokens
290
* Extends AbstractDelegationTokenIdentifier to integrate with Hadoop security framework
291
* Defined as inner class within KafkaTokenUtil object
292
*/
293
class KafkaTokenUtil.KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
294
295
/**
296
* Returns the token kind for this identifier
297
* @return TOKEN_KIND constant ("KAFKA_DELEGATION_TOKEN")
298
*/
299
def getKind: Text
300
}
301
```
302
303
## Security Protocols
304
305
The token utilities support multiple security protocols:
306
307
### SASL_SSL
308
- SASL authentication over SSL-encrypted connections
309
- Supports SCRAM-SHA-256 and SCRAM-SHA-512 mechanisms
310
- Requires SSL truststore configuration
311
312
### SASL_PLAINTEXT
313
- SASL authentication over plaintext connections
314
- Not recommended for production use
315
- Supports same SASL mechanisms as SASL_SSL
316
317
### SSL
318
- SSL with client certificate authentication
319
- Requires both truststore and keystore configuration
320
- Uses client certificates for authentication
321
322
**Authentication Methods:**
323
324
```scala
325
// Check authentication method availability
326
if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
327
println("Using global JAAS configuration")
328
} else {
329
// Generate JAAS config programmatically
330
val tokenJaas = KafkaTokenUtil.getTokenJaasParams(clusterConf)
331
println(s"Generated token JAAS: $tokenJaas")
332
333
// Or for keytab authentication
334
val keytabJaas = KafkaTokenUtil.getKeytabJaasParams(
335
"/path/to/user.keytab",
336
"user@REALM",
337
"kafka"
338
)
339
println(s"Generated keytab JAAS: $keytabJaas")
340
}
341
```