or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdtoken-provider.mdtoken-utilities.mdutilities.md

configuration.mddocs/

0

# Configuration Management

1

2

Configuration utilities for parsing Spark configuration properties and managing Kafka client parameters with security and redaction support.

3

4

## Capabilities

5

6

### KafkaConfigUpdater

7

8

Utility class for building and updating Kafka configuration parameters with logging and authentication support.

9

10

```scala { .api }

11

/**

12

* Class to conveniently update Kafka config params, while logging the changes

13

* @param module Module name for logging context

14

* @param kafkaParams Initial Kafka parameters map

15

*/

16

case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) extends Logging {

17

18

/**

19

* Sets a configuration parameter

20

* @param key Configuration parameter key

21

* @param value Configuration parameter value

22

* @return Updated KafkaConfigUpdater instance for chaining

23

*/

24

def set(key: String, value: Object): this.type

25

26

/**

27

* Sets a configuration parameter only if it's not already set

28

* @param key Configuration parameter key

29

* @param value Configuration parameter value

30

* @return Updated KafkaConfigUpdater instance for chaining

31

*/

32

def setIfUnset(key: String, value: Object): this.type

33

34

/**

35

* Configures authentication settings if needed based on environment

36

* Uses bootstrap servers from kafkaParams to find matching cluster configuration

37

* @return Updated KafkaConfigUpdater instance for chaining

38

*/

39

def setAuthenticationConfigIfNeeded(): this.type

40

41

/**

42

* Configures authentication settings with specific cluster configuration

43

* @param clusterConfig Optional cluster configuration for authentication

44

* @return Updated KafkaConfigUpdater instance for chaining

45

*/

46

def setAuthenticationConfigIfNeeded(clusterConfig: Option[KafkaTokenClusterConf]): this.type

47

48

/**

49

* Builds and returns the final configuration map

50

* @return Java Map containing all configuration parameters

51

*/

52

def build(): java.util.Map[String, Object]

53

}

54

```

55

56

**Usage Examples:**

57

58

```scala

59

import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenClusterConf}

60

61

// Create config updater with initial parameters

62

val initialParams = Map(

63

"bootstrap.servers" -> "kafka1:9092,kafka2:9092",

64

"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer"

65

)

66

67

val configUpdater = KafkaConfigUpdater("streaming", initialParams)

68

69

// Chain configuration updates

70

val finalConfig = configUpdater

71

.set("security.protocol", "SASL_SSL")

72

.set("sasl.mechanism", "SCRAM-SHA-512")

73

.setIfUnset("client.id", "spark-streaming-client")

74

.setAuthenticationConfigIfNeeded()

75

.build()

76

77

// Use the configuration with Kafka clients

78

println(s"Final config size: ${finalConfig.size()}")

79

```

80

81

**Authentication Configuration:**

82

83

```scala

84

// Configure with cluster-specific authentication

85

val clusterConf = KafkaTokenClusterConf(

86

identifier = "prod-cluster",

87

authBootstrapServers = "kafka1:9092",

88

targetServersRegex = ".*",

89

securityProtocol = "SASL_SSL",

90

kerberosServiceName = "kafka",

91

// ... other SSL/auth settings

92

)

93

94

val authenticatedConfig = KafkaConfigUpdater("producer", Map.empty)

95

.set("bootstrap.servers", clusterConf.authBootstrapServers)

96

.setAuthenticationConfigIfNeeded(Some(clusterConf))

97

.build()

98

```

99

100

### KafkaTokenSparkConf

101

102

Object providing utilities for parsing Kafka cluster configurations from Spark configuration.

103

104

```scala { .api }

105

object KafkaTokenSparkConf {

106

107

/** Configuration prefix for Kafka clusters */

108

val CLUSTERS_CONFIG_PREFIX: String = "spark.kafka.clusters."

109

110

/** Default target servers regex pattern */

111

val DEFAULT_TARGET_SERVERS_REGEX: String = ".*"

112

113

/** Default Kerberos service name */

114

val DEFAULT_SASL_KERBEROS_SERVICE_NAME: String = "kafka"

115

116

/** Default security protocol */

117

val DEFAULT_SECURITY_PROTOCOL_CONFIG: String = "SASL_SSL"

118

119

/** Default SASL token mechanism */

120

val DEFAULT_SASL_TOKEN_MECHANISM: String = "SCRAM-SHA-512"

121

122

/**

123

* Parses cluster configuration from Spark config for specific identifier

124

* @param sparkConf Spark configuration

125

* @param identifier Cluster identifier

126

* @return KafkaTokenClusterConf containing parsed cluster settings

127

*/

128

def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf

129

130

/**

131

* Gets all configured cluster configurations from Spark config

132

* Parses all clusters defined under spark.kafka.clusters.*

133

* @param sparkConf Spark configuration

134

* @return Set of all configured cluster configurations

135

*/

136

def getAllClusterConfigs(sparkConf: SparkConf): Set[KafkaTokenClusterConf]

137

}

138

```

139

140

**Configuration Examples:**

141

142

```scala

143

import org.apache.spark.SparkConf

144

import org.apache.spark.kafka010.KafkaTokenSparkConf

145

146

// Setup Spark configuration with multiple clusters

147

val sparkConf = new SparkConf()

148

// Production cluster configuration

149

.set("spark.kafka.clusters.prod.auth.bootstrap.servers", "prod-kafka1:9092,prod-kafka2:9092")

150

.set("spark.kafka.clusters.prod.target.bootstrap.servers.regex", "prod-kafka.*:9092")

151

.set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")

152

.set("spark.kafka.clusters.prod.sasl.kerberos.service.name", "kafka")

153

.set("spark.kafka.clusters.prod.ssl.truststore.location", "/etc/kafka/truststore.jks")

154

.set("spark.kafka.clusters.prod.ssl.truststore.password", "truststore-password")

155

156

// Development cluster configuration

157

.set("spark.kafka.clusters.dev.auth.bootstrap.servers", "dev-kafka1:9092")

158

.set("spark.kafka.clusters.dev.security.protocol", "SASL_PLAINTEXT")

159

.set("spark.kafka.clusters.dev.sasl.token.mechanism", "SCRAM-SHA-256")

160

161

// Get specific cluster configuration

162

val prodClusterConf = KafkaTokenSparkConf.getClusterConfig(sparkConf, "prod")

163

println(s"Production cluster: ${prodClusterConf.authBootstrapServers}")

164

165

// Get all cluster configurations

166

val allClusters = KafkaTokenSparkConf.getAllClusterConfigs(sparkConf)

167

println(s"Found ${allClusters.size} configured clusters")

168

169

allClusters.foreach { cluster =>

170

println(s"Cluster ${cluster.identifier}: ${cluster.securityProtocol}")

171

}

172

```

173

174

### KafkaTokenClusterConf

175

176

Configuration data class representing a single Kafka cluster's settings.

177

178

```scala { .api }

179

/**

180

* Configuration data for a Kafka cluster

181

* @param identifier Unique cluster identifier

182

* @param authBootstrapServers Bootstrap servers for authentication

183

* @param targetServersRegex Regex pattern for target servers

184

* @param securityProtocol Security protocol (SASL_SSL, SSL, SASL_PLAINTEXT)

185

* @param kerberosServiceName Kerberos service name for SASL authentication

186

* @param trustStoreType Optional SSL truststore type

187

* @param trustStoreLocation Optional SSL truststore location

188

* @param trustStorePassword Optional SSL truststore password

189

* @param keyStoreType Optional SSL keystore type

190

* @param keyStoreLocation Optional SSL keystore location

191

* @param keyStorePassword Optional SSL keystore password

192

* @param keyPassword Optional SSL key password

193

* @param tokenMechanism SASL token mechanism (default: SCRAM-SHA-512)

194

* @param specifiedKafkaParams Additional Kafka parameters

195

*/

196

case class KafkaTokenClusterConf(

197

identifier: String,

198

authBootstrapServers: String,

199

targetServersRegex: String,

200

securityProtocol: String,

201

kerberosServiceName: String,

202

trustStoreType: Option[String],

203

trustStoreLocation: Option[String],

204

trustStorePassword: Option[String],

205

keyStoreType: Option[String],

206

keyStoreLocation: Option[String],

207

keyStorePassword: Option[String],

208

keyPassword: Option[String],

209

tokenMechanism: String,

210

specifiedKafkaParams: Map[String, String]

211

) {

212

/**

213

* String representation with redacted sensitive fields

214

* @return String representation suitable for logging

215

*/

216

override def toString: String

217

}

218

```

219

220

## Configuration Properties

221

222

The library uses Spark configuration properties with the prefix `spark.kafka.clusters.<identifier>` for cluster-specific settings:

223

224

### Required Properties

225

- `auth.bootstrap.servers` - Bootstrap servers for token acquisition

226

- `security.protocol` - Security protocol (SASL_SSL, SSL, SASL_PLAINTEXT)

227

228

### Optional Properties

229

- `target.bootstrap.servers.regex` - Target servers regex pattern (default: ".*")

230

- `sasl.kerberos.service.name` - Kerberos service name (default: "kafka")

231

- `sasl.token.mechanism` - Token mechanism (default: "SCRAM-SHA-512")

232

233

### SSL Properties

234

- `ssl.truststore.type` - Truststore type

235

- `ssl.truststore.location` - Truststore file location

236

- `ssl.truststore.password` - Truststore password

237

- `ssl.keystore.type` - Keystore type

238

- `ssl.keystore.location` - Keystore file location

239

- `ssl.keystore.password` - Keystore password

240

- `ssl.key.password` - Key password

241

242

### Additional Kafka Properties

243

- `kafka.*` - Additional Kafka client properties (prefixed with kafka.)

244

245

**Configuration Example:**

246

247

```properties

248

# Production Kafka cluster

249

spark.kafka.clusters.prod.auth.bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

250

spark.kafka.clusters.prod.target.bootstrap.servers.regex=kafka[1-3]:9092

251

spark.kafka.clusters.prod.security.protocol=SASL_SSL

252

spark.kafka.clusters.prod.sasl.kerberos.service.name=kafka

253

spark.kafka.clusters.prod.ssl.truststore.location=/etc/kafka/client.truststore.jks

254

spark.kafka.clusters.prod.ssl.truststore.password=changeit

255

spark.kafka.clusters.prod.sasl.token.mechanism=SCRAM-SHA-512

256

257

# Additional Kafka client properties

258

spark.kafka.clusters.prod.kafka.client.id=spark-streaming-app

259

spark.kafka.clusters.prod.kafka.session.timeout.ms=30000

260

```