Kafka 0.10+ Token Provider for Streaming - A specialized security module that handles delegation token management for Kafka integration in Apache Spark streaming applications.
npx @tessl/cli install tessl/maven-org-apache-spark--spark-token-provider-kafka-0-10_2-13@4.0.00
# Kafka 0.10+ Token Provider for Streaming
1
2
A specialized security module that handles delegation token management for Kafka integration in Apache Spark streaming applications, providing secure authentication and authorization capabilities when connecting to Kafka clusters using SASL-based security protocols.
3
4
## Package Information
5
6
- **Package Name**: spark-token-provider-kafka-0-10_2.13
7
- **Package Type**: Maven
8
- **Language**: Scala
9
- **Installation**: Add to your Maven pom.xml:
10
```xml
11
<dependency>
12
<groupId>org.apache.spark</groupId>
13
<artifactId>spark-token-provider-kafka-0-10_2.13</artifactId>
14
<version>4.0.0</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```scala
21
import org.apache.spark.kafka010.{KafkaDelegationTokenProvider, KafkaConfigUpdater, KafkaTokenUtil}
22
import org.apache.spark.kafka010.{KafkaTokenClusterConf, KafkaTokenSparkConf}
23
import org.apache.spark.kafka010.KafkaTokenUtil.KafkaDelegationTokenIdentifier
24
```
25
26
## Basic Usage
27
28
```scala
29
import org.apache.spark.SparkConf
30
import org.apache.spark.kafka010.{KafkaDelegationTokenProvider, KafkaTokenSparkConf}
31
import org.apache.hadoop.conf.Configuration
32
import org.apache.hadoop.security.Credentials
33
34
// Configure Spark for Kafka cluster authentication
35
val sparkConf = new SparkConf()
36
.set("spark.kafka.clusters.cluster1.auth.bootstrap.servers", "kafka1:9092,kafka2:9092")
37
.set("spark.kafka.clusters.cluster1.security.protocol", "SASL_SSL")
38
.set("spark.kafka.clusters.cluster1.sasl.kerberos.service.name", "kafka")
39
40
// Create token provider
41
val tokenProvider = new KafkaDelegationTokenProvider()
42
43
// Check if tokens are required
44
val hadoopConf = new Configuration()
45
val credentials = new Credentials()
46
47
if (tokenProvider.delegationTokensRequired(sparkConf, hadoopConf)) {
48
// Obtain delegation tokens
49
val nextRenewalTime = tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials)
50
println(s"Tokens obtained, next renewal: $nextRenewalTime")
51
}
52
```
53
54
## Architecture
55
56
The Kafka Token Provider is built around several key components:
57
58
- **Token Provider**: Main `KafkaDelegationTokenProvider` class implementing Hadoop's `HadoopDelegationTokenProvider` interface
59
- **Configuration Management**: `KafkaTokenSparkConf` for parsing Spark configuration and `KafkaConfigUpdater` for runtime configuration updates
60
- **Token Utilities**: `KafkaTokenUtil` object providing low-level token operations and authentication helpers
61
- **Security Integration**: Full integration with Kerberos, SSL, and SASL authentication mechanisms
62
- **Multi-Cluster Support**: Supports delegation token management across multiple Kafka clusters simultaneously
63
64
## Capabilities
65
66
### Token Provider
67
68
Core delegation token provider that integrates with Apache Spark's security framework to obtain and manage Kafka delegation tokens for secure cluster authentication.
69
70
```scala { .api }
71
class KafkaDelegationTokenProvider extends HadoopDelegationTokenProvider {
72
def serviceName: String
73
def obtainDelegationTokens(
74
hadoopConf: Configuration,
75
sparkConf: SparkConf,
76
creds: Credentials
77
): Option[Long]
78
def delegationTokensRequired(
79
sparkConf: SparkConf,
80
hadoopConf: Configuration
81
): Boolean
82
}
83
```
84
85
[Token Provider](./token-provider.md)
86
87
### Configuration Management
88
89
Configuration utilities for parsing Spark configuration properties and managing Kafka client parameters with security and redaction support.
90
91
```scala { .api }
92
case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) {
93
def set(key: String, value: Object): KafkaConfigUpdater.this.type
94
def setIfUnset(key: String, value: Object): KafkaConfigUpdater.this.type
95
def setAuthenticationConfigIfNeeded(): KafkaConfigUpdater.this.type
96
def build(): java.util.Map[String, Object]
97
}
98
99
object KafkaTokenSparkConf {
100
def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf
101
def getAllClusterConfigs(sparkConf: SparkConf): Set[KafkaTokenClusterConf]
102
}
103
```
104
105
[Configuration Management](./configuration.md)
106
107
### Token Utilities
108
109
Low-level token operations, authentication helpers, and Kafka admin client management for delegation token lifecycle management.
110
111
```scala { .api }
112
object KafkaTokenUtil {
113
val TOKEN_KIND: Text
114
def getTokenService(identifier: String): Text
115
def obtainToken(
116
sparkConf: SparkConf,
117
clusterConf: KafkaTokenClusterConf
118
): (Token[KafkaDelegationTokenIdentifier], Long)
119
def checkProxyUser(): Unit
120
def createAdminClientProperties(
121
sparkConf: SparkConf,
122
clusterConf: KafkaTokenClusterConf
123
): java.util.Properties
124
def isGlobalJaasConfigurationProvided: Boolean
125
def findMatchingTokenClusterConfig(
126
sparkConf: SparkConf,
127
bootStrapServers: String
128
): Option[KafkaTokenClusterConf]
129
def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String
130
def needTokenUpdate(
131
params: java.util.Map[String, Object],
132
clusterConfig: Option[KafkaTokenClusterConf]
133
): Boolean
134
def getKeytabJaasParams(
135
keyTab: String,
136
principal: String,
137
kerberosServiceName: String
138
): String
139
}
140
```
141
142
[Token Utilities](./token-utilities.md)
143
144
### Utility Classes
145
146
Additional utility classes for configuration redaction and error handling.
147
148
```scala { .api }
149
object KafkaRedactionUtil {
150
def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]
151
def redactJaasParam(param: String): String
152
}
153
154
object KafkaTokenProviderExceptions {
155
def missingKafkaOption(option: String): SparkException
156
}
157
```
158
159
[Utilities](./utilities.md)
160
161
## Types
162
163
```scala { .api }
164
import org.apache.hadoop.io.Text
165
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
166
167
case class KafkaTokenClusterConf(
168
identifier: String,
169
authBootstrapServers: String,
170
targetServersRegex: String,
171
securityProtocol: String,
172
kerberosServiceName: String,
173
trustStoreType: Option[String],
174
trustStoreLocation: Option[String],
175
trustStorePassword: Option[String],
176
keyStoreType: Option[String],
177
keyStoreLocation: Option[String],
178
keyStorePassword: Option[String],
179
keyPassword: Option[String],
180
tokenMechanism: String,
181
specifiedKafkaParams: Map[String, String]
182
)
183
184
// Defined within KafkaTokenUtil object
185
class KafkaTokenUtil.KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
186
def getKind: Text
187
}
188
```