or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-token-provider-kafka-0-10_2-13

Kafka 0.10+ Token Provider for Streaming - A specialized security module that handles delegation token management for Kafka integration in Apache Spark streaming applications.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-token-provider-kafka-0-10_2.13@4.0.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-token-provider-kafka-0-10_2-13@4.0.0

0

# Kafka 0.10+ Token Provider for Streaming

1

2

A specialized security module that handles delegation token management for Kafka integration in Apache Spark streaming applications, providing secure authentication and authorization capabilities when connecting to Kafka clusters using SASL-based security protocols.

3

4

## Package Information

5

6

- **Package Name**: spark-token-provider-kafka-0-10_2.13

7

- **Package Type**: Maven

8

- **Language**: Scala

9

- **Installation**: Add to your Maven pom.xml:

10

```xml

11

<dependency>

12

<groupId>org.apache.spark</groupId>

13

<artifactId>spark-token-provider-kafka-0-10_2.13</artifactId>

14

<version>4.0.0</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```scala

21

import org.apache.spark.kafka010.{KafkaDelegationTokenProvider, KafkaConfigUpdater, KafkaTokenUtil}

22

import org.apache.spark.kafka010.{KafkaTokenClusterConf, KafkaTokenSparkConf}

23

import org.apache.spark.kafka010.KafkaTokenUtil.KafkaDelegationTokenIdentifier

24

```

25

26

## Basic Usage

27

28

```scala

29

import org.apache.spark.SparkConf

30

import org.apache.spark.kafka010.{KafkaDelegationTokenProvider, KafkaTokenSparkConf}

31

import org.apache.hadoop.conf.Configuration

32

import org.apache.hadoop.security.Credentials

33

34

// Configure Spark for Kafka cluster authentication

35

val sparkConf = new SparkConf()

36

.set("spark.kafka.clusters.cluster1.auth.bootstrap.servers", "kafka1:9092,kafka2:9092")

37

.set("spark.kafka.clusters.cluster1.security.protocol", "SASL_SSL")

38

.set("spark.kafka.clusters.cluster1.sasl.kerberos.service.name", "kafka")

39

40

// Create token provider

41

val tokenProvider = new KafkaDelegationTokenProvider()

42

43

// Check if tokens are required

44

val hadoopConf = new Configuration()

45

val credentials = new Credentials()

46

47

if (tokenProvider.delegationTokensRequired(sparkConf, hadoopConf)) {

48

// Obtain delegation tokens

49

val nextRenewalTime = tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials)

50

println(s"Tokens obtained, next renewal: $nextRenewalTime")

51

}

52

```

53

54

## Architecture

55

56

The Kafka Token Provider is built around several key components:

57

58

- **Token Provider**: Main `KafkaDelegationTokenProvider` class implementing Hadoop's `HadoopDelegationTokenProvider` interface

59

- **Configuration Management**: `KafkaTokenSparkConf` for parsing Spark configuration and `KafkaConfigUpdater` for runtime configuration updates

60

- **Token Utilities**: `KafkaTokenUtil` object providing low-level token operations and authentication helpers

61

- **Security Integration**: Full integration with Kerberos, SSL, and SASL authentication mechanisms

62

- **Multi-Cluster Support**: Supports delegation token management across multiple Kafka clusters simultaneously

63

64

## Capabilities

65

66

### Token Provider

67

68

Core delegation token provider that integrates with Apache Spark's security framework to obtain and manage Kafka delegation tokens for secure cluster authentication.

69

70

```scala { .api }

71

class KafkaDelegationTokenProvider extends HadoopDelegationTokenProvider {

72

def serviceName: String

73

def obtainDelegationTokens(

74

hadoopConf: Configuration,

75

sparkConf: SparkConf,

76

creds: Credentials

77

): Option[Long]

78

def delegationTokensRequired(

79

sparkConf: SparkConf,

80

hadoopConf: Configuration

81

): Boolean

82

}

83

```

84

85

[Token Provider](./token-provider.md)

86

87

### Configuration Management

88

89

Configuration utilities for parsing Spark configuration properties and managing Kafka client parameters with security and redaction support.

90

91

```scala { .api }

92

case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) {

93

def set(key: String, value: Object): KafkaConfigUpdater.this.type

94

def setIfUnset(key: String, value: Object): KafkaConfigUpdater.this.type

95

def setAuthenticationConfigIfNeeded(): KafkaConfigUpdater.this.type

96

def build(): java.util.Map[String, Object]

97

}

98

99

object KafkaTokenSparkConf {

100

def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf

101

def getAllClusterConfigs(sparkConf: SparkConf): Set[KafkaTokenClusterConf]

102

}

103

```

104

105

[Configuration Management](./configuration.md)

106

107

### Token Utilities

108

109

Low-level token operations, authentication helpers, and Kafka admin client management for delegation token lifecycle management.

110

111

```scala { .api }

112

object KafkaTokenUtil {

113

val TOKEN_KIND: Text

114

def getTokenService(identifier: String): Text

115

def obtainToken(

116

sparkConf: SparkConf,

117

clusterConf: KafkaTokenClusterConf

118

): (Token[KafkaDelegationTokenIdentifier], Long)

119

def checkProxyUser(): Unit

120

def createAdminClientProperties(

121

sparkConf: SparkConf,

122

clusterConf: KafkaTokenClusterConf

123

): java.util.Properties

124

def isGlobalJaasConfigurationProvided: Boolean

125

def findMatchingTokenClusterConfig(

126

sparkConf: SparkConf,

127

bootStrapServers: String

128

): Option[KafkaTokenClusterConf]

129

def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String

130

def needTokenUpdate(

131

params: java.util.Map[String, Object],

132

clusterConfig: Option[KafkaTokenClusterConf]

133

): Boolean

134

def getKeytabJaasParams(

135

keyTab: String,

136

principal: String,

137

kerberosServiceName: String

138

): String

139

}

140

```

141

142

[Token Utilities](./token-utilities.md)

143

144

### Utility Classes

145

146

Additional utility classes for configuration redaction and error handling.

147

148

```scala { .api }

149

object KafkaRedactionUtil {

150

def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]

151

def redactJaasParam(param: String): String

152

}

153

154

object KafkaTokenProviderExceptions {

155

def missingKafkaOption(option: String): SparkException

156

}

157

```

158

159

[Utilities](./utilities.md)

160

161

## Types

162

163

```scala { .api }

164

import org.apache.hadoop.io.Text

165

import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

166

167

case class KafkaTokenClusterConf(

168

identifier: String,

169

authBootstrapServers: String,

170

targetServersRegex: String,

171

securityProtocol: String,

172

kerberosServiceName: String,

173

trustStoreType: Option[String],

174

trustStoreLocation: Option[String],

175

trustStorePassword: Option[String],

176

keyStoreType: Option[String],

177

keyStoreLocation: Option[String],

178

keyStorePassword: Option[String],

179

keyPassword: Option[String],

180

tokenMechanism: String,

181

specifiedKafkaParams: Map[String, String]

182

)

183

184

// Defined within KafkaTokenUtil object

185

class KafkaTokenUtil.KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {

186

def getKind: Text

187

}

188

```