0
# Configuration Management
1
2
Configuration utilities for parsing Spark configuration properties and managing Kafka client parameters with security and redaction support.
3
4
## Capabilities
5
6
### KafkaConfigUpdater
7
8
Utility class for building and updating Kafka configuration parameters with logging and authentication support.
9
10
```scala { .api }
11
/**
12
* Class to conveniently update Kafka config params, while logging the changes
13
* @param module Module name for logging context
14
* @param kafkaParams Initial Kafka parameters map
15
*/
16
case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) extends Logging {
17
18
/**
19
* Sets a configuration parameter
20
* @param key Configuration parameter key
21
* @param value Configuration parameter value
22
* @return Updated KafkaConfigUpdater instance for chaining
23
*/
24
def set(key: String, value: Object): this.type
25
26
/**
27
* Sets a configuration parameter only if it's not already set
28
* @param key Configuration parameter key
29
* @param value Configuration parameter value
30
* @return Updated KafkaConfigUpdater instance for chaining
31
*/
32
def setIfUnset(key: String, value: Object): this.type
33
34
/**
35
* Configures authentication settings if needed based on environment
36
* Uses bootstrap servers from kafkaParams to find matching cluster configuration
37
* @return Updated KafkaConfigUpdater instance for chaining
38
*/
39
def setAuthenticationConfigIfNeeded(): this.type
40
41
/**
42
* Configures authentication settings with specific cluster configuration
43
* @param clusterConfig Optional cluster configuration for authentication
44
* @return Updated KafkaConfigUpdater instance for chaining
45
*/
46
def setAuthenticationConfigIfNeeded(clusterConfig: Option[KafkaTokenClusterConf]): this.type
47
48
/**
49
* Builds and returns the final configuration map
50
* @return Java Map containing all configuration parameters
51
*/
52
def build(): java.util.Map[String, Object]
53
}
54
```
55
56
**Usage Examples:**
57
58
```scala
59
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenClusterConf}
60
61
// Create config updater with initial parameters
62
val initialParams = Map(
63
"bootstrap.servers" -> "kafka1:9092,kafka2:9092",
64
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer"
65
)
66
67
val configUpdater = KafkaConfigUpdater("streaming", initialParams)
68
69
// Chain configuration updates
70
val finalConfig = configUpdater
71
.set("security.protocol", "SASL_SSL")
72
.set("sasl.mechanism", "SCRAM-SHA-512")
73
.setIfUnset("client.id", "spark-streaming-client")
74
.setAuthenticationConfigIfNeeded()
75
.build()
76
77
// Use the configuration with Kafka clients
78
println(s"Final config size: ${finalConfig.size()}")
79
```
80
81
**Authentication Configuration:**
82
83
```scala
84
// Configure with cluster-specific authentication
85
val clusterConf = KafkaTokenClusterConf(
86
identifier = "prod-cluster",
87
authBootstrapServers = "kafka1:9092",
88
targetServersRegex = ".*",
89
securityProtocol = "SASL_SSL",
90
kerberosServiceName = "kafka",
91
// ... other SSL/auth settings
92
)
93
94
val authenticatedConfig = KafkaConfigUpdater("producer", Map.empty)
95
.set("bootstrap.servers", clusterConf.authBootstrapServers)
96
.setAuthenticationConfigIfNeeded(Some(clusterConf))
97
.build()
98
```
99
100
### KafkaTokenSparkConf
101
102
Object providing utilities for parsing Kafka cluster configurations from Spark configuration.
103
104
```scala { .api }
105
object KafkaTokenSparkConf {
106
107
/** Configuration prefix for Kafka clusters */
108
val CLUSTERS_CONFIG_PREFIX: String = "spark.kafka.clusters."
109
110
/** Default target servers regex pattern */
111
val DEFAULT_TARGET_SERVERS_REGEX: String = ".*"
112
113
/** Default Kerberos service name */
114
val DEFAULT_SASL_KERBEROS_SERVICE_NAME: String = "kafka"
115
116
/** Default security protocol */
117
val DEFAULT_SECURITY_PROTOCOL_CONFIG: String = "SASL_SSL"
118
119
/** Default SASL token mechanism */
120
val DEFAULT_SASL_TOKEN_MECHANISM: String = "SCRAM-SHA-512"
121
122
/**
123
* Parses cluster configuration from Spark config for specific identifier
124
* @param sparkConf Spark configuration
125
* @param identifier Cluster identifier
126
* @return KafkaTokenClusterConf containing parsed cluster settings
127
*/
128
def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf
129
130
/**
131
* Gets all configured cluster configurations from Spark config
132
* Parses all clusters defined under spark.kafka.clusters.*
133
* @param sparkConf Spark configuration
134
* @return Set of all configured cluster configurations
135
*/
136
def getAllClusterConfigs(sparkConf: SparkConf): Set[KafkaTokenClusterConf]
137
}
138
```
139
140
**Configuration Examples:**
141
142
```scala
143
import org.apache.spark.SparkConf
144
import org.apache.spark.kafka010.KafkaTokenSparkConf
145
146
// Setup Spark configuration with multiple clusters
147
val sparkConf = new SparkConf()
148
// Production cluster configuration
149
.set("spark.kafka.clusters.prod.auth.bootstrap.servers", "prod-kafka1:9092,prod-kafka2:9092")
150
.set("spark.kafka.clusters.prod.target.bootstrap.servers.regex", "prod-kafka.*:9092")
151
.set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")
152
.set("spark.kafka.clusters.prod.sasl.kerberos.service.name", "kafka")
153
.set("spark.kafka.clusters.prod.ssl.truststore.location", "/etc/kafka/truststore.jks")
154
.set("spark.kafka.clusters.prod.ssl.truststore.password", "truststore-password")
155
156
// Development cluster configuration
157
.set("spark.kafka.clusters.dev.auth.bootstrap.servers", "dev-kafka1:9092")
158
.set("spark.kafka.clusters.dev.security.protocol", "SASL_PLAINTEXT")
159
.set("spark.kafka.clusters.dev.sasl.token.mechanism", "SCRAM-SHA-256")
160
161
// Get specific cluster configuration
162
val prodClusterConf = KafkaTokenSparkConf.getClusterConfig(sparkConf, "prod")
163
println(s"Production cluster: ${prodClusterConf.authBootstrapServers}")
164
165
// Get all cluster configurations
166
val allClusters = KafkaTokenSparkConf.getAllClusterConfigs(sparkConf)
167
println(s"Found ${allClusters.size} configured clusters")
168
169
allClusters.foreach { cluster =>
170
println(s"Cluster ${cluster.identifier}: ${cluster.securityProtocol}")
171
}
172
```
173
174
### KafkaTokenClusterConf
175
176
Configuration data class representing a single Kafka cluster's settings.
177
178
```scala { .api }
179
/**
180
* Configuration data for a Kafka cluster
181
* @param identifier Unique cluster identifier
182
* @param authBootstrapServers Bootstrap servers for authentication
183
* @param targetServersRegex Regex pattern for target servers
184
* @param securityProtocol Security protocol (SASL_SSL, SSL, SASL_PLAINTEXT)
185
* @param kerberosServiceName Kerberos service name for SASL authentication
186
* @param trustStoreType Optional SSL truststore type
187
* @param trustStoreLocation Optional SSL truststore location
188
* @param trustStorePassword Optional SSL truststore password
189
* @param keyStoreType Optional SSL keystore type
190
* @param keyStoreLocation Optional SSL keystore location
191
* @param keyStorePassword Optional SSL keystore password
192
* @param keyPassword Optional SSL key password
193
* @param tokenMechanism SASL token mechanism (default: SCRAM-SHA-512)
194
* @param specifiedKafkaParams Additional Kafka parameters
195
*/
196
case class KafkaTokenClusterConf(
197
identifier: String,
198
authBootstrapServers: String,
199
targetServersRegex: String,
200
securityProtocol: String,
201
kerberosServiceName: String,
202
trustStoreType: Option[String],
203
trustStoreLocation: Option[String],
204
trustStorePassword: Option[String],
205
keyStoreType: Option[String],
206
keyStoreLocation: Option[String],
207
keyStorePassword: Option[String],
208
keyPassword: Option[String],
209
tokenMechanism: String,
210
specifiedKafkaParams: Map[String, String]
211
) {
212
/**
213
* String representation with redacted sensitive fields
214
* @return String representation suitable for logging
215
*/
216
override def toString: String
217
}
218
```
219
220
## Configuration Properties
221
222
The library uses Spark configuration properties with the prefix `spark.kafka.clusters.<identifier>` for cluster-specific settings:
223
224
### Required Properties
225
- `auth.bootstrap.servers` - Bootstrap servers for token acquisition
226
- `security.protocol` - Security protocol (SASL_SSL, SSL, SASL_PLAINTEXT)
227
228
### Optional Properties
229
- `target.bootstrap.servers.regex` - Target servers regex pattern (default: ".*")
230
- `sasl.kerberos.service.name` - Kerberos service name (default: "kafka")
231
- `sasl.token.mechanism` - Token mechanism (default: "SCRAM-SHA-512")
232
233
### SSL Properties
234
- `ssl.truststore.type` - Truststore type
235
- `ssl.truststore.location` - Truststore file location
236
- `ssl.truststore.password` - Truststore password
237
- `ssl.keystore.type` - Keystore type
238
- `ssl.keystore.location` - Keystore file location
239
- `ssl.keystore.password` - Keystore password
240
- `ssl.key.password` - Key password
241
242
### Additional Kafka Properties
243
- `kafka.*` - Additional Kafka client properties (prefixed with kafka.)
244
245
**Configuration Example:**
246
247
```properties
248
# Production Kafka cluster
249
spark.kafka.clusters.prod.auth.bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
250
spark.kafka.clusters.prod.target.bootstrap.servers.regex=kafka[1-3]:9092
251
spark.kafka.clusters.prod.security.protocol=SASL_SSL
252
spark.kafka.clusters.prod.sasl.kerberos.service.name=kafka
253
spark.kafka.clusters.prod.ssl.truststore.location=/etc/kafka/client.truststore.jks
254
spark.kafka.clusters.prod.ssl.truststore.password=changeit
255
spark.kafka.clusters.prod.sasl.token.mechanism=SCRAM-SHA-512
256
257
# Additional Kafka client properties
258
spark.kafka.clusters.prod.kafka.client.id=spark-streaming-app
259
spark.kafka.clusters.prod.kafka.session.timeout.ms=30000
260
```