0
# Token Provider
1
2
Core delegation token provider that integrates with Apache Spark's security framework to obtain and manage Kafka delegation tokens for secure cluster authentication.
3
4
## Capabilities
5
6
### KafkaDelegationTokenProvider
7
8
Main token provider class implementing Hadoop's delegation token provider interface for Kafka clusters.
9
10
```scala { .api }
11
/**
12
* Kafka delegation token provider for Apache Spark
13
* Implements HadoopDelegationTokenProvider to integrate with Spark's security framework
14
*/
15
class KafkaDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging {
16
17
/**
18
* Service name identifier for this token provider
19
* @return "kafka"
20
*/
21
def serviceName: String
22
23
/**
24
* Obtains delegation tokens for all configured Kafka clusters
25
* @param hadoopConf Hadoop configuration
26
* @param sparkConf Spark configuration containing cluster settings
27
* @param creds Credentials object to store obtained tokens
28
* @return Optional next renewal time (earliest across all clusters)
29
*/
30
def obtainDelegationTokens(
31
hadoopConf: Configuration,
32
sparkConf: SparkConf,
33
creds: Credentials
34
): Option[Long]
35
36
/**
37
* Checks if delegation tokens are required for any configured Kafka cluster
38
* @param sparkConf Spark configuration
39
* @param hadoopConf Hadoop configuration
40
* @return true if tokens are required, false otherwise
41
*/
42
def delegationTokensRequired(
43
sparkConf: SparkConf,
44
hadoopConf: Configuration
45
): Boolean
46
}
47
```
48
49
**Usage Examples:**
50
51
```scala
52
import org.apache.spark.SparkConf
53
import org.apache.spark.kafka010.KafkaDelegationTokenProvider
54
import org.apache.hadoop.conf.Configuration
55
import org.apache.hadoop.security.Credentials
56
57
// Setup Spark configuration with Kafka cluster details
58
val sparkConf = new SparkConf()
59
.set("spark.kafka.clusters.prod.auth.bootstrap.servers", "kafka1:9092,kafka2:9092")
60
.set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")
61
.set("spark.kafka.clusters.prod.sasl.kerberos.service.name", "kafka")
62
.set("spark.kafka.clusters.prod.ssl.truststore.location", "/path/to/truststore.jks")
63
.set("spark.kafka.clusters.prod.ssl.truststore.password", "password")
64
65
// Create provider and check requirements
66
val tokenProvider = new KafkaDelegationTokenProvider()
67
val hadoopConf = new Configuration()
68
69
// Check if tokens are needed
70
if (tokenProvider.delegationTokensRequired(sparkConf, hadoopConf)) {
71
println("Delegation tokens are required for configured Kafka clusters")
72
73
// Obtain tokens
74
val credentials = new Credentials()
75
val nextRenewal = tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials)
76
77
nextRenewal match {
78
case Some(renewalTime) =>
79
println(s"Tokens obtained successfully. Next renewal: $renewalTime")
80
case None =>
81
println("No tokens obtained or renewal not required")
82
}
83
} else {
84
println("No delegation tokens required")
85
}
86
```
87
88
**Multi-Cluster Configuration:**
89
90
```scala
91
// Configure multiple Kafka clusters
92
val sparkConf = new SparkConf()
93
// Production cluster
94
.set("spark.kafka.clusters.prod.auth.bootstrap.servers", "prod-kafka1:9092")
95
.set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")
96
.set("spark.kafka.clusters.prod.target.bootstrap.servers.regex", "prod-kafka.*:9092")
97
98
// Development cluster
99
.set("spark.kafka.clusters.dev.auth.bootstrap.servers", "dev-kafka1:9092")
100
.set("spark.kafka.clusters.dev.security.protocol", "SASL_PLAINTEXT")
101
.set("spark.kafka.clusters.dev.target.bootstrap.servers.regex", "dev-kafka.*:9092")
102
103
val tokenProvider = new KafkaDelegationTokenProvider()
104
val credentials = new Credentials()
105
106
// This will obtain tokens for both clusters
107
tokenProvider.obtainDelegationTokens(new Configuration(), sparkConf, credentials)
108
```
109
110
### Service Name
111
112
Returns the service name identifier for this token provider.
113
114
```scala { .api }
115
/**
116
* Service name identifier for this token provider
117
* @return "kafka" - the service name used by Hadoop security framework
118
*/
119
def serviceName: String
120
```
121
122
### Token Requirements Check
123
124
Determines if delegation tokens are required based on the current configuration and security settings.
125
126
```scala { .api }
127
/**
128
* Checks if delegation tokens are required for any configured Kafka cluster
129
* Returns true if any cluster requires delegation tokens based on:
130
* - Security protocol (SASL_SSL, SASL_PLAINTEXT, SSL)
131
* - Authentication method availability
132
* - Current user credentials
133
*
134
* @param sparkConf Spark configuration containing cluster settings
135
* @param hadoopConf Hadoop configuration
136
* @return true if delegation tokens are required, false otherwise
137
*/
138
def delegationTokensRequired(sparkConf: SparkConf, hadoopConf: Configuration): Boolean
139
```
140
141
### Token Obtainment
142
143
Obtains delegation tokens for all configured Kafka clusters and stores them in the provided credentials.
144
145
```scala { .api }
146
/**
147
* Obtains delegation tokens for all configured Kafka clusters
148
* Processes each cluster configuration and:
149
* - Validates security requirements
150
* - Connects to Kafka cluster using admin client
151
* - Requests delegation token with appropriate credentials
152
* - Stores token in provided Credentials object
153
* - Tracks renewal times across all clusters
154
*
155
* @param hadoopConf Hadoop configuration
156
* @param sparkConf Spark configuration containing cluster settings under spark.kafka.clusters.*
157
* @param creds Credentials object to store obtained tokens
158
* @return Optional next renewal time (earliest across all clusters), None if no tokens obtained
159
*/
160
def obtainDelegationTokens(
161
hadoopConf: Configuration,
162
sparkConf: SparkConf,
163
creds: Credentials
164
): Option[Long]
165
```
166
167
## Error Handling
168
169
The token provider handles various error conditions:
170
171
- **Authentication failures**: When Kerberos or SSL authentication fails
172
- **Network connectivity issues**: When Kafka clusters are unreachable
173
- **Configuration errors**: When required configuration parameters are missing
174
- **Token creation failures**: When Kafka cluster rejects token requests
175
- **Permission issues**: When user lacks required permissions for token operations
176
177
All errors are logged appropriately and non-fatal errors are handled gracefully to allow other clusters to succeed.