or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdtoken-provider.mdtoken-utilities.mdutilities.md

token-utilities.mddocs/

0

# Token Utilities

1

2

Low-level token operations, authentication helpers, and Kafka admin client management for delegation token lifecycle management.

3

4

## Capabilities

5

6

### KafkaTokenUtil

7

8

Object providing low-level token operations and utility functions for Kafka delegation token management.

9

10

```scala { .api }

11

object KafkaTokenUtil extends Logging {

12

13

/** Token kind identifier for Kafka delegation tokens */

14

val TOKEN_KIND: Text = new Text("KAFKA_DELEGATION_TOKEN")

15

16

/**

17

* Creates a token service identifier for the given cluster identifier

18

* @param identifier Cluster identifier

19

* @return Text object representing the token service

20

*/

21

def getTokenService(identifier: String): Text

22

23

/**

24

* Obtains a delegation token from a Kafka cluster

25

* @param sparkConf Spark configuration

26

* @param clusterConf Cluster configuration containing authentication details

27

* @return Tuple of (delegation token, next renewal time in milliseconds)

28

*/

29

def obtainToken(

30

sparkConf: SparkConf,

31

clusterConf: KafkaTokenClusterConf

32

): (Token[KafkaDelegationTokenIdentifier], Long)

33

34

/**

35

* Validates that the current user is not a proxy user

36

* Throws exception if proxy user is detected

37

*/

38

def checkProxyUser(): Unit

39

40

/**

41

* Creates Kafka admin client properties for the given cluster configuration

42

* @param sparkConf Spark configuration

43

* @param clusterConf Cluster configuration

44

* @return Properties object configured for Kafka admin client

45

*/

46

def createAdminClientProperties(

47

sparkConf: SparkConf,

48

clusterConf: KafkaTokenClusterConf

49

): java.util.Properties

50

51

/**

52

* Checks if global JAAS configuration is provided

53

* @return true if java.security.auth.login.config system property is set

54

*/

55

def isGlobalJaasConfigurationProvided: Boolean

56

57

/**

58

* Finds matching cluster configuration for given bootstrap servers

59

* @param sparkConf Spark configuration

60

* @param bootStrapServers Bootstrap servers to match against

61

* @return Optional cluster configuration that matches the servers

62

*/

63

def findMatchingTokenClusterConfig(

64

sparkConf: SparkConf,

65

bootStrapServers: String

66

): Option[KafkaTokenClusterConf]

67

68

/**

69

* Generates JAAS parameters for token authentication

70

* @param clusterConf Cluster configuration containing token details

71

* @return JAAS parameter string for token-based authentication

72

*/

73

def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String

74

75

/**

76

* Checks if token configuration needs updating in the given parameters

77

* @param params Current Kafka parameters

78

* @param clusterConfig Optional cluster configuration

79

* @return true if token configuration update is needed

80

*/

81

def needTokenUpdate(

82

params: java.util.Map[String, Object],

83

clusterConfig: Option[KafkaTokenClusterConf]

84

): Boolean

85

86

/**

87

* Generates JAAS parameters for keytab authentication

88

* @param keyTab Path to keytab file

89

* @param principal Kerberos principal

90

* @param kerberosServiceName Kerberos service name

91

* @return JAAS parameter string for keytab-based authentication

92

*/

93

def getKeytabJaasParams(

94

keyTab: String,

95

principal: String,

96

kerberosServiceName: String

97

): String

98

}

99

```

100

101

**Usage Examples:**

102

103

```scala

104

import org.apache.spark.SparkConf

105

import org.apache.spark.kafka010.{KafkaTokenUtil, KafkaTokenClusterConf}

106

107

// Setup cluster configuration

108

val clusterConf = KafkaTokenClusterConf(

109

identifier = "prod-cluster",

110

authBootstrapServers = "kafka1:9092,kafka2:9092",

111

targetServersRegex = "kafka.*:9092",

112

securityProtocol = "SASL_SSL",

113

kerberosServiceName = "kafka",

114

trustStoreLocation = Some("/etc/kafka/truststore.jks"),

115

trustStorePassword = Some("password"),

116

tokenMechanism = "SCRAM-SHA-512",

117

// ... other configuration

118

)

119

120

val sparkConf = new SparkConf()

121

122

// Check proxy user constraints

123

try {

124

KafkaTokenUtil.checkProxyUser()

125

println("User validation passed")

126

} catch {

127

case e: Exception => println(s"Proxy user validation failed: ${e.getMessage}")

128

}

129

130

// Create admin client properties

131

val adminProps = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)

132

println(s"Admin client configured with ${adminProps.size()} properties")

133

134

// Obtain delegation token

135

try {

136

val (token, renewalTime) = KafkaTokenUtil.obtainToken(sparkConf, clusterConf)

137

println(s"Token obtained: ${token.getService}")

138

println(s"Next renewal: $renewalTime")

139

} catch {

140

case e: Exception => println(s"Token acquisition failed: ${e.getMessage}")

141

}

142

```

143

144

### Token Service Management

145

146

Creates and manages token service identifiers for Kafka clusters.

147

148

```scala { .api }

149

/**

150

* Creates a token service identifier for the given cluster identifier

151

* Service identifiers are used to uniquely identify tokens in the credential store

152

*

153

* @param identifier Cluster identifier from configuration

154

* @return Text object representing the token service in format: kafka.server.delegation.token.<identifier>

155

*/

156

def getTokenService(identifier: String): Text

157

```

158

159

### Token Acquisition

160

161

Obtains delegation tokens from Kafka clusters using appropriate authentication methods.

162

163

```scala { .api }

164

/**

165

* Obtains a delegation token from a Kafka cluster

166

* Process:

167

* 1. Creates Kafka admin client with authentication credentials

168

* 2. Requests delegation token with specified parameters

169

* 3. Converts Kafka token to Hadoop token format

170

* 4. Calculates next renewal time based on token lifetime

171

*

172

* @param sparkConf Spark configuration for general settings

173

* @param clusterConf Cluster configuration containing authentication details

174

* @return Tuple of (Hadoop delegation token, next renewal time in milliseconds since epoch)

175

* @throws Exception if token acquisition fails due to authentication, network, or permission issues

176

*/

177

def obtainToken(

178

sparkConf: SparkConf,

179

clusterConf: KafkaTokenClusterConf

180

): (Token[KafkaDelegationTokenIdentifier], Long)

181

```

182

183

### Authentication Helpers

184

185

Utility functions for managing different authentication methods and configurations.

186

187

```scala { .api }

188

/**

189

* Validates that the current user is not a proxy user

190

* Proxy users are not supported for delegation token operations

191

* @throws IllegalArgumentException if current user is a proxy user

192

*/

193

def checkProxyUser(): Unit

194

195

/**

196

* Checks if global JAAS configuration is provided via system property

197

* @return true if java.security.auth.login.config system property is set and points to valid file

198

*/

199

def isGlobalJaasConfigurationProvided: Boolean

200

201

/**

202

* Generates JAAS parameters for token authentication

203

* Creates ScramLoginModule configuration with delegation token parameters

204

*

205

* @param clusterConf Cluster configuration containing token mechanism and other settings

206

* @return JAAS parameter string formatted for ScramLoginModule with token authentication

207

*/

208

def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String

209

210

/**

211

* Generates JAAS parameters for keytab authentication

212

* Creates Krb5LoginModule configuration for keytab-based Kerberos authentication

213

*

214

* @param keyTab Path to Kerberos keytab file

215

* @param principal Kerberos principal name

216

* @param kerberosServiceName Service name for Kerberos authentication

217

* @return JAAS parameter string formatted for Krb5LoginModule with keytab authentication

218

*/

219

def getKeytabJaasParams(

220

keyTab: String,

221

principal: String,

222

kerberosServiceName: String

223

): String

224

```

225

226

### Admin Client Management

227

228

Creates and configures Kafka admin clients for token operations.

229

230

```scala { .api }

231

/**

232

* Creates Kafka admin client properties for the given cluster configuration

233

* Configures all necessary properties including:

234

* - Bootstrap servers

235

* - Security protocol and SASL mechanism

236

* - SSL keystore and truststore settings

237

* - Authentication parameters (JAAS config)

238

* - Client identification

239

*

240

* @param sparkConf Spark configuration for general settings

241

* @param clusterConf Cluster-specific configuration

242

* @return Properties object ready for AdminClient.create()

243

*/

244

def createAdminClientProperties(

245

sparkConf: SparkConf,

246

clusterConf: KafkaTokenClusterConf

247

): java.util.Properties

248

```

249

250

### Configuration Matching

251

252

Utilities for matching configurations and determining update requirements.

253

254

```scala { .api }

255

/**

256

* Finds matching cluster configuration for given bootstrap servers

257

* Matches bootstrap servers against configured target server regex patterns

258

*

259

* @param sparkConf Spark configuration containing cluster definitions

260

* @param bootStrapServers Comma-separated list of bootstrap servers to match

261

* @return Optional cluster configuration whose target regex matches the bootstrap servers

262

*/

263

def findMatchingTokenClusterConfig(

264

sparkConf: SparkConf,

265

bootStrapServers: String

266

): Option[KafkaTokenClusterConf]

267

268

/**

269

* Checks if token configuration needs updating in the given parameters

270

* Examines current Kafka parameters to determine if token-based authentication

271

* configuration needs to be applied or updated

272

*

273

* @param params Current Kafka client parameters

274

* @param clusterConfig Optional cluster configuration for comparison

275

* @return true if token authentication configuration should be applied/updated

276

*/

277

def needTokenUpdate(

278

params: java.util.Map[String, Object],

279

clusterConfig: Option[KafkaTokenClusterConf]

280

): Boolean

281

```

282

283

### KafkaDelegationTokenIdentifier

284

285

Token identifier implementation for Kafka delegation tokens.

286

287

```scala { .api }

288

/**

289

* Hadoop token identifier implementation for Kafka delegation tokens

290

* Extends AbstractDelegationTokenIdentifier to integrate with Hadoop security framework

291

* Defined as inner class within KafkaTokenUtil object

292

*/

293

class KafkaTokenUtil.KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {

294

295

/**

296

* Returns the token kind for this identifier

297

* @return TOKEN_KIND constant ("KAFKA_DELEGATION_TOKEN")

298

*/

299

def getKind: Text

300

}

301

```

302

303

## Security Protocols

304

305

The token utilities support multiple security protocols:

306

307

### SASL_SSL

308

- SASL authentication over SSL-encrypted connections

309

- Supports SCRAM-SHA-256 and SCRAM-SHA-512 mechanisms

310

- Requires SSL truststore configuration

311

312

### SASL_PLAINTEXT

313

- SASL authentication over plaintext connections

314

- Not recommended for production use

315

- Supports same SASL mechanisms as SASL_SSL

316

317

### SSL

318

- SSL with client certificate authentication

319

- Requires both truststore and keystore configuration

320

- Uses client certificates for authentication

321

322

**Authentication Methods:**

323

324

```scala

325

// Check authentication method availability

326

if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {

327

println("Using global JAAS configuration")

328

} else {

329

// Generate JAAS config programmatically

330

val tokenJaas = KafkaTokenUtil.getTokenJaasParams(clusterConf)

331

println(s"Generated token JAAS: $tokenJaas")

332

333

// Or for keytab authentication

334

val keytabJaas = KafkaTokenUtil.getKeytabJaasParams(

335

"/path/to/user.keytab",

336

"user@REALM",

337

"kafka"

338

)

339

println(s"Generated keytab JAAS: $keytabJaas")

340

}

341

```