or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-token-provider-kafka-0-10_2.12

Hadoop delegation token support for Kafka authentication in Spark streaming applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-token-provider-kafka-0-10_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-token-provider-kafka-0-10_2.12@3.5.0

0

# Spark Token Provider Kafka 0.10

1

2

Spark Token Provider Kafka 0.10 provides Hadoop delegation token support for Kafka authentication in Spark streaming applications. This library enables secure authentication with Kafka 0.10+ clusters through delegation tokens, supporting automatic token obtainment, management, and renewal for enterprise streaming environments requiring Kerberos-based security.

3

4

## Package Information

5

6

- **Package Name**: spark-token-provider-kafka-0-10_2.12

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Group ID**: org.apache.spark

10

- **Artifact ID**: spark-token-provider-kafka-0-10_2.12

11

- **Installation**: Include as dependency in pom.xml or build.sbt

12

13

## Core Imports

14

15

```scala

16

import org.apache.spark.kafka010._

17

```

18

19

For specific components:

20

21

```scala

22

import org.apache.spark.kafka010.{KafkaDelegationTokenProvider, KafkaTokenUtil, KafkaConfigUpdater}

23

```

24

25

## Basic Usage

26

27

```scala

28

import org.apache.spark.SparkConf

29

import org.apache.spark.kafka010.{KafkaTokenSparkConf, KafkaConfigUpdater}

30

import org.apache.kafka.clients.CommonClientConfigs

31

32

// Configure Spark for Kafka token authentication

33

val sparkConf = new SparkConf()

34

.set("spark.kafka.clusters.cluster1.auth.bootstrap.servers", "kafka1:9092,kafka2:9092")

35

.set("spark.kafka.clusters.cluster1.security.protocol", "SASL_SSL")

36

.set("spark.kafka.clusters.cluster1.sasl.kerberos.service.name", "kafka")

37

38

// Update Kafka consumer/producer configuration with token authentication

39

val kafkaParams = Map[String, Object](

40

CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9092,kafka2:9092"

41

)

42

43

val updatedParams = KafkaConfigUpdater("example-module", kafkaParams)

44

.setAuthenticationConfigIfNeeded()

45

.build()

46

```

47

48

## Architecture

49

50

The library provides several key components for token-based Kafka authentication:

51

52

- **Token Provider**: `KafkaDelegationTokenProvider` integrates with Spark's security framework to obtain delegation tokens

53

- **Configuration Management**: `KafkaTokenSparkConf` handles cluster-specific configuration parsing from SparkConf

54

- **Token Utilities**: `KafkaTokenUtil` provides core token operations including obtainment and JAAS configuration

55

- **Configuration Updater**: `KafkaConfigUpdater` applies authentication settings to Kafka client configurations

56

- **Security Utilities**: `KafkaRedactionUtil` ensures sensitive data is properly redacted from logs

57

58

## Capabilities

59

60

### Token Provider Integration

61

62

Main entry point for Spark's delegation token framework, automatically obtaining tokens for configured Kafka clusters.

63

64

```scala { .api }

65

class KafkaDelegationTokenProvider extends HadoopDelegationTokenProvider {

66

def serviceName: String

67

def obtainDelegationTokens(

68

hadoopConf: Configuration,

69

sparkConf: SparkConf,

70

creds: Credentials

71

): Option[Long]

72

def delegationTokensRequired(

73

sparkConf: SparkConf,

74

hadoopConf: Configuration

75

): Boolean

76

}

77

```

78

79

### Cluster Configuration Management

80

81

Handles parsing and management of Kafka cluster configurations from Spark configuration properties.

82

83

```scala { .api }

84

case class KafkaTokenClusterConf(

85

identifier: String,

86

authBootstrapServers: String,

87

targetServersRegex: String,

88

securityProtocol: String,

89

kerberosServiceName: String,

90

trustStoreType: Option[String],

91

trustStoreLocation: Option[String],

92

trustStorePassword: Option[String],

93

keyStoreType: Option[String],

94

keyStoreLocation: Option[String],

95

keyStorePassword: Option[String],

96

keyPassword: Option[String],

97

tokenMechanism: String,

98

specifiedKafkaParams: Map[String, String]

99

)

100

101

object KafkaTokenSparkConf {

102

def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf

103

def getAllClusterConfigs(sparkConf: SparkConf): Set[KafkaTokenClusterConf]

104

105

val CLUSTERS_CONFIG_PREFIX: String = "spark.kafka.clusters."

106

val DEFAULT_TARGET_SERVERS_REGEX: String = ".*"

107

val DEFAULT_SASL_KERBEROS_SERVICE_NAME: String = "kafka"

108

val DEFAULT_SECURITY_PROTOCOL_CONFIG: String = "SASL_SSL"

109

val DEFAULT_SASL_TOKEN_MECHANISM: String = "SCRAM-SHA-512"

110

}

111

```

112

113

### Token Operations

114

115

Core utilities for obtaining delegation tokens and managing authentication configurations.

116

117

```scala { .api }

118

object KafkaTokenUtil {

119

val TOKEN_KIND: Text

120

121

def obtainToken(

122

sparkConf: SparkConf,

123

clusterConf: KafkaTokenClusterConf

124

): (Token[KafkaDelegationTokenIdentifier], Long)

125

126

def checkProxyUser(): Unit

127

128

def createAdminClientProperties(

129

sparkConf: SparkConf,

130

clusterConf: KafkaTokenClusterConf

131

): java.util.Properties

132

133

def isGlobalJaasConfigurationProvided: Boolean

134

135

def getKeytabJaasParams(

136

keyTab: String,

137

principal: String,

138

kerberosServiceName: String

139

): String

140

141

def findMatchingTokenClusterConfig(

142

sparkConf: SparkConf,

143

bootStrapServers: String

144

): Option[KafkaTokenClusterConf]

145

146

def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String

147

148

def needTokenUpdate(

149

params: java.util.Map[String, Object],

150

clusterConfig: Option[KafkaTokenClusterConf]

151

): Boolean

152

153

def getTokenService(identifier: String): Text

154

}

155

156

class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {

157

def getKind: Text

158

}

159

```

160

161

### Configuration Updating

162

163

Fluent interface for updating Kafka client configurations with authentication settings.

164

165

```scala { .api }

166

case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) {

167

def set(key: String, value: Object): KafkaConfigUpdater.this.type

168

def setIfUnset(key: String, value: Object): KafkaConfigUpdater.this.type

169

def setAuthenticationConfigIfNeeded(): KafkaConfigUpdater.this.type

170

def setAuthenticationConfigIfNeeded(

171

clusterConfig: Option[KafkaTokenClusterConf]

172

): KafkaConfigUpdater.this.type

173

def build(): java.util.Map[String, Object]

174

}

175

```

176

177

### Security and Logging

178

179

Utilities for secure handling of sensitive configuration parameters in logs.

180

181

```scala { .api }

182

object KafkaRedactionUtil {

183

def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]

184

def redactJaasParam(param: String): String

185

}

186

```

187

188

## Configuration Properties

189

190

The library uses Spark configuration properties with the prefix `spark.kafka.clusters.<identifier>.` for cluster-specific settings:

191

192

### Required Configuration

193

194

```scala

195

// Bootstrap servers for token obtainment (required)

196

"spark.kafka.clusters.<identifier>.auth.bootstrap.servers" -> "kafka1:9092,kafka2:9092"

197

```

198

199

### Optional Configuration

200

201

```scala

202

// Target servers regex pattern (default: ".*")

203

"spark.kafka.clusters.<identifier>.target.bootstrap.servers.regex" -> "kafka.*:9092"

204

205

// Security protocol (default: "SASL_SSL")

206

"spark.kafka.clusters.<identifier>.security.protocol" -> "SASL_SSL"

207

208

// Kerberos service name (default: "kafka")

209

"spark.kafka.clusters.<identifier>.sasl.kerberos.service.name" -> "kafka"

210

211

// Token mechanism (default: "SCRAM-SHA-512")

212

"spark.kafka.clusters.<identifier>.sasl.token.mechanism" -> "SCRAM-SHA-512"

213

214

// SSL truststore configuration

215

"spark.kafka.clusters.<identifier>.ssl.truststore.type" -> "JKS"

216

"spark.kafka.clusters.<identifier>.ssl.truststore.location" -> "/path/to/truststore.jks"

217

"spark.kafka.clusters.<identifier>.ssl.truststore.password" -> "truststore-password"

218

219

// SSL keystore configuration (for SSL protocol)

220

"spark.kafka.clusters.<identifier>.ssl.keystore.type" -> "JKS"

221

"spark.kafka.clusters.<identifier>.ssl.keystore.location" -> "/path/to/keystore.jks"

222

"spark.kafka.clusters.<identifier>.ssl.keystore.password" -> "keystore-password"

223

"spark.kafka.clusters.<identifier>.ssl.key.password" -> "key-password"

224

225

// Additional Kafka client properties

226

"spark.kafka.clusters.<identifier>.kafka.<kafka-property>" -> "value"

227

```

228

229

## Authentication Methods

230

231

The library supports multiple authentication methods applied in the following order of preference:

232

233

1. **Global JAAS Configuration**: Uses JVM-wide security settings (e.g., `java.security.auth.login.config`)

234

2. **Keytab Authentication**: Uses Kerberos keytab file with dynamic JAAS configuration

235

3. **Ticket Cache Authentication**: Uses Kerberos ticket cache with dynamic JAAS configuration

236

4. **Token Authentication**: Uses delegation tokens with SCRAM mechanism

237

238

## Security Protocols

239

240

### SASL_SSL (Recommended)

241

- SASL authentication over SSL-encrypted connection

242

- Requires truststore configuration

243

- Default security protocol

244

245

### SSL

246

- SSL with client certificate authentication

247

- Requires both truststore and keystore configuration

248

- Generates warning about 2-way authentication requirement

249

250

### SASL_PLAINTEXT

251

- SASL authentication over unencrypted connection

252

- Generates security warning

253

- Not recommended for production

254

255

## Error Handling

256

257

The library handles several error conditions:

258

259

- **Proxy User Error**: Throws `IllegalArgumentException` when attempting to use proxy users (not yet supported)

260

- **Configuration Errors**: Logs warnings for missing or invalid cluster configurations

261

- **Token Obtainment Failures**: Logs warnings with cluster context for debugging

262

- **Multiple Token Matches**: Throws `IllegalArgumentException` when multiple tokens match bootstrap servers

263

264

## Usage Examples

265

266

### Multi-Cluster Configuration

267

268

```scala

269

val sparkConf = new SparkConf()

270

// Cluster 1 - Production

271

.set("spark.kafka.clusters.prod.auth.bootstrap.servers", "prod-kafka1:9092,prod-kafka2:9092")

272

.set("spark.kafka.clusters.prod.target.bootstrap.servers.regex", "prod-kafka.*:9092")

273

.set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")

274

.set("spark.kafka.clusters.prod.ssl.truststore.location", "/etc/kafka/truststore.jks")

275

.set("spark.kafka.clusters.prod.ssl.truststore.password", "prod-truststore-pass")

276

277

// Cluster 2 - Staging

278

.set("spark.kafka.clusters.staging.auth.bootstrap.servers", "staging-kafka:9092")

279

.set("spark.kafka.clusters.staging.target.bootstrap.servers.regex", "staging-kafka:9092")

280

.set("spark.kafka.clusters.staging.security.protocol", "SASL_PLAINTEXT")

281

```

282

283

### Consumer Configuration Update

284

285

```scala

286

import org.apache.kafka.clients.consumer.ConsumerConfig

287

288

val baseConsumerConfig = Map[String, Object](

289

ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9092,kafka2:9092",

290

ConsumerConfig.GROUP_ID_CONFIG -> "my-consumer-group",

291

ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",

292

ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"

293

)

294

295

val authenticatedConfig = KafkaConfigUpdater("consumer", baseConsumerConfig)

296

.setAuthenticationConfigIfNeeded()

297

.build()

298

```

299

300

### Producer Configuration Update

301

302

```scala

303

import org.apache.kafka.clients.producer.ProducerConfig

304

305

val baseProducerConfig = Map[String, Object](

306

ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9092,kafka2:9092",

307

ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",

308

ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer"

309

)

310

311

val authenticatedConfig = KafkaConfigUpdater("producer", baseProducerConfig)

312

.setAuthenticationConfigIfNeeded()

313

.build()

314

```