or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdtoken-provider.mdtoken-utilities.mdutilities.md

token-provider.mddocs/

0

# Token Provider

1

2

Core delegation token provider that integrates with Apache Spark's security framework to obtain and manage Kafka delegation tokens for secure cluster authentication.

3

4

## Capabilities

5

6

### KafkaDelegationTokenProvider

7

8

Main token provider class implementing Hadoop's delegation token provider interface for Kafka clusters.

9

10

```scala { .api }

11

/**

12

* Kafka delegation token provider for Apache Spark

13

* Implements HadoopDelegationTokenProvider to integrate with Spark's security framework

14

*/

15

class KafkaDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging {

16

17

/**

18

* Service name identifier for this token provider

19

* @return "kafka"

20

*/

21

def serviceName: String

22

23

/**

24

* Obtains delegation tokens for all configured Kafka clusters

25

* @param hadoopConf Hadoop configuration

26

* @param sparkConf Spark configuration containing cluster settings

27

* @param creds Credentials object to store obtained tokens

28

* @return Optional next renewal time (earliest across all clusters)

29

*/

30

def obtainDelegationTokens(

31

hadoopConf: Configuration,

32

sparkConf: SparkConf,

33

creds: Credentials

34

): Option[Long]

35

36

/**

37

* Checks if delegation tokens are required for any configured Kafka cluster

38

* @param sparkConf Spark configuration

39

* @param hadoopConf Hadoop configuration

40

* @return true if tokens are required, false otherwise

41

*/

42

def delegationTokensRequired(

43

sparkConf: SparkConf,

44

hadoopConf: Configuration

45

): Boolean

46

}

47

```

48

49

**Usage Examples:**

50

51

```scala

52

import org.apache.spark.SparkConf

53

import org.apache.spark.kafka010.KafkaDelegationTokenProvider

54

import org.apache.hadoop.conf.Configuration

55

import org.apache.hadoop.security.Credentials

56

57

// Setup Spark configuration with Kafka cluster details

58

val sparkConf = new SparkConf()

59

.set("spark.kafka.clusters.prod.auth.bootstrap.servers", "kafka1:9092,kafka2:9092")

60

.set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")

61

.set("spark.kafka.clusters.prod.sasl.kerberos.service.name", "kafka")

62

.set("spark.kafka.clusters.prod.ssl.truststore.location", "/path/to/truststore.jks")

63

.set("spark.kafka.clusters.prod.ssl.truststore.password", "password")

64

65

// Create provider and check requirements

66

val tokenProvider = new KafkaDelegationTokenProvider()

67

val hadoopConf = new Configuration()

68

69

// Check if tokens are needed

70

if (tokenProvider.delegationTokensRequired(sparkConf, hadoopConf)) {

71

println("Delegation tokens are required for configured Kafka clusters")

72

73

// Obtain tokens

74

val credentials = new Credentials()

75

val nextRenewal = tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials)

76

77

nextRenewal match {

78

case Some(renewalTime) =>

79

println(s"Tokens obtained successfully. Next renewal: $renewalTime")

80

case None =>

81

println("No tokens obtained or renewal not required")

82

}

83

} else {

84

println("No delegation tokens required")

85

}

86

```

87

88

**Multi-Cluster Configuration:**

89

90

```scala

91

// Configure multiple Kafka clusters

92

val sparkConf = new SparkConf()

93

// Production cluster

94

.set("spark.kafka.clusters.prod.auth.bootstrap.servers", "prod-kafka1:9092")

95

.set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")

96

.set("spark.kafka.clusters.prod.target.bootstrap.servers.regex", "prod-kafka.*:9092")

97

98

// Development cluster

99

.set("spark.kafka.clusters.dev.auth.bootstrap.servers", "dev-kafka1:9092")

100

.set("spark.kafka.clusters.dev.security.protocol", "SASL_PLAINTEXT")

101

.set("spark.kafka.clusters.dev.target.bootstrap.servers.regex", "dev-kafka.*:9092")

102

103

val tokenProvider = new KafkaDelegationTokenProvider()

104

val credentials = new Credentials()

105

106

// This will obtain tokens for both clusters

107

tokenProvider.obtainDelegationTokens(new Configuration(), sparkConf, credentials)

108

```

109

110

### Service Name

111

112

Returns the service name identifier for this token provider.

113

114

```scala { .api }

115

/**

116

* Service name identifier for this token provider

117

* @return "kafka" - the service name used by Hadoop security framework

118

*/

119

def serviceName: String

120

```

121

122

### Token Requirements Check

123

124

Determines if delegation tokens are required based on the current configuration and security settings.

125

126

```scala { .api }

127

/**

128

* Checks if delegation tokens are required for any configured Kafka cluster

129

* Returns true if any cluster requires delegation tokens based on:

130

* - Security protocol (SASL_SSL, SASL_PLAINTEXT, SSL)

131

* - Authentication method availability

132

* - Current user credentials

133

*

134

* @param sparkConf Spark configuration containing cluster settings

135

* @param hadoopConf Hadoop configuration

136

* @return true if delegation tokens are required, false otherwise

137

*/

138

def delegationTokensRequired(sparkConf: SparkConf, hadoopConf: Configuration): Boolean

139

```

140

141

### Token Obtainment

142

143

Obtains delegation tokens for all configured Kafka clusters and stores them in the provided credentials.

144

145

```scala { .api }

146

/**

147

* Obtains delegation tokens for all configured Kafka clusters

148

* Processes each cluster configuration and:

149

* - Validates security requirements

150

* - Connects to Kafka cluster using admin client

151

* - Requests delegation token with appropriate credentials

152

* - Stores token in provided Credentials object

153

* - Tracks renewal times across all clusters

154

*

155

* @param hadoopConf Hadoop configuration

156

* @param sparkConf Spark configuration containing cluster settings under spark.kafka.clusters.*

157

* @param creds Credentials object to store obtained tokens

158

* @return Optional next renewal time (earliest across all clusters), None if no tokens obtained

159

*/

160

def obtainDelegationTokens(

161

hadoopConf: Configuration,

162

sparkConf: SparkConf,

163

creds: Credentials

164

): Option[Long]

165

```

166

167

## Error Handling

168

169

The token provider handles various error conditions:

170

171

- **Authentication failures**: When Kerberos or SSL authentication fails

172

- **Network connectivity issues**: When Kafka clusters are unreachable

173

- **Configuration errors**: When required configuration parameters are missing

174

- **Token creation failures**: When Kafka cluster rejects token requests

175

- **Permission issues**: When user lacks required permissions for token operations

176

177

All errors are logged appropriately and non-fatal errors are handled gracefully to allow other clusters to succeed.