0
# Spark Token Provider Kafka 0.10
1
2
Spark Token Provider Kafka 0.10 provides Hadoop delegation token support for Kafka authentication in Spark streaming applications. This library enables secure authentication with Kafka 0.10+ clusters through delegation tokens, supporting automatic token obtainment, management, and renewal for enterprise streaming environments requiring Kerberos-based security.
3
4
## Package Information
5
6
- **Package Name**: spark-token-provider-kafka-0-10_2.12
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Group ID**: org.apache.spark
10
- **Artifact ID**: spark-token-provider-kafka-0-10_2.12
11
- **Installation**: Include as dependency in pom.xml or build.sbt
12
13
## Core Imports
14
15
```scala
16
import org.apache.spark.kafka010._
17
```
18
19
For specific components:
20
21
```scala
22
import org.apache.spark.kafka010.{KafkaDelegationTokenProvider, KafkaTokenUtil, KafkaConfigUpdater}
23
```
24
25
## Basic Usage
26
27
```scala
28
import org.apache.spark.SparkConf
29
import org.apache.spark.kafka010.{KafkaTokenSparkConf, KafkaConfigUpdater}
30
import org.apache.kafka.clients.CommonClientConfigs
31
32
// Configure Spark for Kafka token authentication
33
val sparkConf = new SparkConf()
34
.set("spark.kafka.clusters.cluster1.auth.bootstrap.servers", "kafka1:9092,kafka2:9092")
35
.set("spark.kafka.clusters.cluster1.security.protocol", "SASL_SSL")
36
.set("spark.kafka.clusters.cluster1.sasl.kerberos.service.name", "kafka")
37
38
// Update Kafka consumer/producer configuration with token authentication
39
val kafkaParams = Map[String, Object](
40
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9092,kafka2:9092"
41
)
42
43
val updatedParams = KafkaConfigUpdater("example-module", kafkaParams)
44
.setAuthenticationConfigIfNeeded()
45
.build()
46
```
47
48
## Architecture
49
50
The library provides several key components for token-based Kafka authentication:
51
52
- **Token Provider**: `KafkaDelegationTokenProvider` integrates with Spark's security framework to obtain delegation tokens
53
- **Configuration Management**: `KafkaTokenSparkConf` handles cluster-specific configuration parsing from SparkConf
54
- **Token Utilities**: `KafkaTokenUtil` provides core token operations including obtainment and JAAS configuration
55
- **Configuration Updater**: `KafkaConfigUpdater` applies authentication settings to Kafka client configurations
56
- **Security Utilities**: `KafkaRedactionUtil` ensures sensitive data is properly redacted from logs
57
58
## Capabilities
59
60
### Token Provider Integration
61
62
Main entry point for Spark's delegation token framework, automatically obtaining tokens for configured Kafka clusters.
63
64
```scala { .api }
65
class KafkaDelegationTokenProvider extends HadoopDelegationTokenProvider {
66
def serviceName: String
67
def obtainDelegationTokens(
68
hadoopConf: Configuration,
69
sparkConf: SparkConf,
70
creds: Credentials
71
): Option[Long]
72
def delegationTokensRequired(
73
sparkConf: SparkConf,
74
hadoopConf: Configuration
75
): Boolean
76
}
77
```
78
79
### Cluster Configuration Management
80
81
Handles parsing and management of Kafka cluster configurations from Spark configuration properties.
82
83
```scala { .api }
84
case class KafkaTokenClusterConf(
85
identifier: String,
86
authBootstrapServers: String,
87
targetServersRegex: String,
88
securityProtocol: String,
89
kerberosServiceName: String,
90
trustStoreType: Option[String],
91
trustStoreLocation: Option[String],
92
trustStorePassword: Option[String],
93
keyStoreType: Option[String],
94
keyStoreLocation: Option[String],
95
keyStorePassword: Option[String],
96
keyPassword: Option[String],
97
tokenMechanism: String,
98
specifiedKafkaParams: Map[String, String]
99
)
100
101
object KafkaTokenSparkConf {
102
def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf
103
def getAllClusterConfigs(sparkConf: SparkConf): Set[KafkaTokenClusterConf]
104
105
val CLUSTERS_CONFIG_PREFIX: String = "spark.kafka.clusters."
106
val DEFAULT_TARGET_SERVERS_REGEX: String = ".*"
107
val DEFAULT_SASL_KERBEROS_SERVICE_NAME: String = "kafka"
108
val DEFAULT_SECURITY_PROTOCOL_CONFIG: String = "SASL_SSL"
109
val DEFAULT_SASL_TOKEN_MECHANISM: String = "SCRAM-SHA-512"
110
}
111
```
112
113
### Token Operations
114
115
Core utilities for obtaining delegation tokens and managing authentication configurations.
116
117
```scala { .api }
118
object KafkaTokenUtil {
119
val TOKEN_KIND: Text
120
121
def obtainToken(
122
sparkConf: SparkConf,
123
clusterConf: KafkaTokenClusterConf
124
): (Token[KafkaDelegationTokenIdentifier], Long)
125
126
def checkProxyUser(): Unit
127
128
def createAdminClientProperties(
129
sparkConf: SparkConf,
130
clusterConf: KafkaTokenClusterConf
131
): java.util.Properties
132
133
def isGlobalJaasConfigurationProvided: Boolean
134
135
def getKeytabJaasParams(
136
keyTab: String,
137
principal: String,
138
kerberosServiceName: String
139
): String
140
141
def findMatchingTokenClusterConfig(
142
sparkConf: SparkConf,
143
bootStrapServers: String
144
): Option[KafkaTokenClusterConf]
145
146
def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String
147
148
def needTokenUpdate(
149
params: java.util.Map[String, Object],
150
clusterConfig: Option[KafkaTokenClusterConf]
151
): Boolean
152
153
def getTokenService(identifier: String): Text
154
}
155
156
class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
157
def getKind: Text
158
}
159
```
160
161
### Configuration Updating
162
163
Fluent interface for updating Kafka client configurations with authentication settings.
164
165
```scala { .api }
166
case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) {
167
def set(key: String, value: Object): KafkaConfigUpdater.this.type
168
def setIfUnset(key: String, value: Object): KafkaConfigUpdater.this.type
169
def setAuthenticationConfigIfNeeded(): KafkaConfigUpdater.this.type
170
def setAuthenticationConfigIfNeeded(
171
clusterConfig: Option[KafkaTokenClusterConf]
172
): KafkaConfigUpdater.this.type
173
def build(): java.util.Map[String, Object]
174
}
175
```
176
177
### Security and Logging
178
179
Utilities for secure handling of sensitive configuration parameters in logs.
180
181
```scala { .api }
182
object KafkaRedactionUtil {
183
def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]
184
def redactJaasParam(param: String): String
185
}
186
```
187
188
## Configuration Properties
189
190
The library uses Spark configuration properties with the prefix `spark.kafka.clusters.<identifier>.` for cluster-specific settings:
191
192
### Required Configuration
193
194
```scala
195
// Bootstrap servers for token obtainment (required)
196
"spark.kafka.clusters.<identifier>.auth.bootstrap.servers" -> "kafka1:9092,kafka2:9092"
197
```
198
199
### Optional Configuration
200
201
```scala
202
// Target servers regex pattern (default: ".*")
203
"spark.kafka.clusters.<identifier>.target.bootstrap.servers.regex" -> "kafka.*:9092"
204
205
// Security protocol (default: "SASL_SSL")
206
"spark.kafka.clusters.<identifier>.security.protocol" -> "SASL_SSL"
207
208
// Kerberos service name (default: "kafka")
209
"spark.kafka.clusters.<identifier>.sasl.kerberos.service.name" -> "kafka"
210
211
// Token mechanism (default: "SCRAM-SHA-512")
212
"spark.kafka.clusters.<identifier>.sasl.token.mechanism" -> "SCRAM-SHA-512"
213
214
// SSL truststore configuration
215
"spark.kafka.clusters.<identifier>.ssl.truststore.type" -> "JKS"
216
"spark.kafka.clusters.<identifier>.ssl.truststore.location" -> "/path/to/truststore.jks"
217
"spark.kafka.clusters.<identifier>.ssl.truststore.password" -> "truststore-password"
218
219
// SSL keystore configuration (for SSL protocol)
220
"spark.kafka.clusters.<identifier>.ssl.keystore.type" -> "JKS"
221
"spark.kafka.clusters.<identifier>.ssl.keystore.location" -> "/path/to/keystore.jks"
222
"spark.kafka.clusters.<identifier>.ssl.keystore.password" -> "keystore-password"
223
"spark.kafka.clusters.<identifier>.ssl.key.password" -> "key-password"
224
225
// Additional Kafka client properties
226
"spark.kafka.clusters.<identifier>.kafka.<kafka-property>" -> "value"
227
```
228
229
## Authentication Methods
230
231
The library supports multiple authentication methods applied in the following order of preference:
232
233
1. **Global JAAS Configuration**: Uses JVM-wide security settings (e.g., `java.security.auth.login.config`)
234
2. **Keytab Authentication**: Uses Kerberos keytab file with dynamic JAAS configuration
235
3. **Ticket Cache Authentication**: Uses Kerberos ticket cache with dynamic JAAS configuration
236
4. **Token Authentication**: Uses delegation tokens with SCRAM mechanism
237
238
## Security Protocols
239
240
### SASL_SSL (Recommended)
241
- SASL authentication over SSL-encrypted connection
242
- Requires truststore configuration
243
- Default security protocol
244
245
### SSL
246
- SSL with client certificate authentication
247
- Requires both truststore and keystore configuration
248
- Generates warning about 2-way authentication requirement
249
250
### SASL_PLAINTEXT
251
- SASL authentication over unencrypted connection
252
- Generates security warning
253
- Not recommended for production
254
255
## Error Handling
256
257
The library handles several error conditions:
258
259
- **Proxy User Error**: Throws `IllegalArgumentException` when attempting to use proxy users (not yet supported)
260
- **Configuration Errors**: Logs warnings for missing or invalid cluster configurations
261
- **Token Obtainment Failures**: Logs warnings with cluster context for debugging
262
- **Multiple Token Matches**: Throws `IllegalArgumentException` when multiple tokens match bootstrap servers
263
264
## Usage Examples
265
266
### Multi-Cluster Configuration
267
268
```scala
269
val sparkConf = new SparkConf()
270
// Cluster 1 - Production
271
.set("spark.kafka.clusters.prod.auth.bootstrap.servers", "prod-kafka1:9092,prod-kafka2:9092")
272
.set("spark.kafka.clusters.prod.target.bootstrap.servers.regex", "prod-kafka.*:9092")
273
.set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")
274
.set("spark.kafka.clusters.prod.ssl.truststore.location", "/etc/kafka/truststore.jks")
275
.set("spark.kafka.clusters.prod.ssl.truststore.password", "prod-truststore-pass")
276
277
// Cluster 2 - Staging
278
.set("spark.kafka.clusters.staging.auth.bootstrap.servers", "staging-kafka:9092")
279
.set("spark.kafka.clusters.staging.target.bootstrap.servers.regex", "staging-kafka:9092")
280
.set("spark.kafka.clusters.staging.security.protocol", "SASL_PLAINTEXT")
281
```
282
283
### Consumer Configuration Update
284
285
```scala
286
import org.apache.kafka.clients.consumer.ConsumerConfig
287
288
val baseConsumerConfig = Map[String, Object](
289
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9092,kafka2:9092",
290
ConsumerConfig.GROUP_ID_CONFIG -> "my-consumer-group",
291
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
292
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"
293
)
294
295
val authenticatedConfig = KafkaConfigUpdater("consumer", baseConsumerConfig)
296
.setAuthenticationConfigIfNeeded()
297
.build()
298
```
299
300
### Producer Configuration Update
301
302
```scala
303
import org.apache.kafka.clients.producer.ProducerConfig
304
305
val baseProducerConfig = Map[String, Object](
306
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9092,kafka2:9092",
307
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
308
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer"
309
)
310
311
val authenticatedConfig = KafkaConfigUpdater("producer", baseProducerConfig)
312
.setAuthenticationConfigIfNeeded()
313
.build()
314
```