or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdindex.mdtoken-provider.mdtoken-utilities.mdutilities.md
tile.json

token-utilities.mddocs/

Token Utilities

Low-level token operations, authentication helpers, and Kafka admin client management for delegation token lifecycle management.

Capabilities

KafkaTokenUtil

Object providing low-level token operations and utility functions for Kafka delegation token management.

object KafkaTokenUtil extends Logging {
  
  /** Token kind identifier for Kafka delegation tokens */
  val TOKEN_KIND: Text = new Text("KAFKA_DELEGATION_TOKEN")
  
  /**
   * Creates a token service identifier for the given cluster identifier
   * @param identifier Cluster identifier
   * @return Text object representing the token service
   */
  def getTokenService(identifier: String): Text
  
  /**
   * Obtains a delegation token from a Kafka cluster
   * @param sparkConf Spark configuration
   * @param clusterConf Cluster configuration containing authentication details
   * @return Tuple of (delegation token, next renewal time in milliseconds)
   */
  def obtainToken(
    sparkConf: SparkConf,
    clusterConf: KafkaTokenClusterConf
  ): (Token[KafkaDelegationTokenIdentifier], Long)
  
  /**
   * Validates that the current user is not a proxy user
   * Throws exception if proxy user is detected
   */
  def checkProxyUser(): Unit
  
  /**
   * Creates Kafka admin client properties for the given cluster configuration
   * @param sparkConf Spark configuration
   * @param clusterConf Cluster configuration
   * @return Properties object configured for Kafka admin client
   */
  def createAdminClientProperties(
    sparkConf: SparkConf,
    clusterConf: KafkaTokenClusterConf
  ): java.util.Properties
  
  /**
   * Checks if global JAAS configuration is provided
   * @return true if java.security.auth.login.config system property is set
   */
  def isGlobalJaasConfigurationProvided: Boolean
  
  /**
   * Finds matching cluster configuration for given bootstrap servers
   * @param sparkConf Spark configuration
   * @param bootStrapServers Bootstrap servers to match against
   * @return Optional cluster configuration that matches the servers
   */
  def findMatchingTokenClusterConfig(
    sparkConf: SparkConf,
    bootStrapServers: String
  ): Option[KafkaTokenClusterConf]
  
  /**
   * Generates JAAS parameters for token authentication
   * @param clusterConf Cluster configuration containing token details
   * @return JAAS parameter string for token-based authentication
   */
  def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String
  
  /**
   * Checks if token configuration needs updating in the given parameters
   * @param params Current Kafka parameters
   * @param clusterConfig Optional cluster configuration
   * @return true if token configuration update is needed
   */
  def needTokenUpdate(
    params: java.util.Map[String, Object],
    clusterConfig: Option[KafkaTokenClusterConf]
  ): Boolean
  
  /**
   * Generates JAAS parameters for keytab authentication
   * @param keyTab Path to keytab file
   * @param principal Kerberos principal
   * @param kerberosServiceName Kerberos service name
   * @return JAAS parameter string for keytab-based authentication
   */
  def getKeytabJaasParams(
    keyTab: String,
    principal: String,
    kerberosServiceName: String
  ): String
}

Usage Examples:

import org.apache.spark.SparkConf
import org.apache.spark.kafka010.{KafkaTokenUtil, KafkaTokenClusterConf}

// Setup cluster configuration
val clusterConf = KafkaTokenClusterConf(
  identifier = "prod-cluster",
  authBootstrapServers = "kafka1:9092,kafka2:9092",
  targetServersRegex = "kafka.*:9092",
  securityProtocol = "SASL_SSL",
  kerberosServiceName = "kafka",
  trustStoreLocation = Some("/etc/kafka/truststore.jks"),
  trustStorePassword = Some("password"),
  tokenMechanism = "SCRAM-SHA-512",
  // ... other configuration
)

val sparkConf = new SparkConf()

// Check proxy user constraints
try {
  KafkaTokenUtil.checkProxyUser()
  println("User validation passed")
} catch {
  case e: Exception => println(s"Proxy user validation failed: ${e.getMessage}")
}

// Create admin client properties
val adminProps = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)
println(s"Admin client configured with ${adminProps.size()} properties")

// Obtain delegation token
try {
  val (token, renewalTime) = KafkaTokenUtil.obtainToken(sparkConf, clusterConf)
  println(s"Token obtained: ${token.getService}")
  println(s"Next renewal: $renewalTime")
} catch {
  case e: Exception => println(s"Token acquisition failed: ${e.getMessage}")
}

Token Service Management

Creates and manages token service identifiers for Kafka clusters.

/**
 * Creates a token service identifier for the given cluster identifier
 * Service identifiers are used to uniquely identify tokens in the credential store
 * 
 * @param identifier Cluster identifier from configuration
 * @return Text object representing the token service in format: kafka.server.delegation.token.<identifier>
 */
def getTokenService(identifier: String): Text

Token Acquisition

Obtains delegation tokens from Kafka clusters using appropriate authentication methods.

/**
 * Obtains a delegation token from a Kafka cluster
 * Process:
 * 1. Creates Kafka admin client with authentication credentials
 * 2. Requests delegation token with specified parameters
 * 3. Converts Kafka token to Hadoop token format
 * 4. Calculates next renewal time based on token lifetime
 * 
 * @param sparkConf Spark configuration for general settings
 * @param clusterConf Cluster configuration containing authentication details
 * @return Tuple of (Hadoop delegation token, next renewal time in milliseconds since epoch)
 * @throws Exception if token acquisition fails due to authentication, network, or permission issues
 */
def obtainToken(
  sparkConf: SparkConf,
  clusterConf: KafkaTokenClusterConf
): (Token[KafkaDelegationTokenIdentifier], Long)

Authentication Helpers

Utility functions for managing different authentication methods and configurations.

/**
 * Validates that the current user is not a proxy user
 * Proxy users are not supported for delegation token operations
 * @throws IllegalArgumentException if current user is a proxy user
 */
def checkProxyUser(): Unit

/**
 * Checks if global JAAS configuration is provided via system property
 * @return true if java.security.auth.login.config system property is set and points to valid file
 */
def isGlobalJaasConfigurationProvided: Boolean

/**
 * Generates JAAS parameters for token authentication
 * Creates ScramLoginModule configuration with delegation token parameters
 * 
 * @param clusterConf Cluster configuration containing token mechanism and other settings
 * @return JAAS parameter string formatted for ScramLoginModule with token authentication
 */
def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String

/**
 * Generates JAAS parameters for keytab authentication
 * Creates Krb5LoginModule configuration for keytab-based Kerberos authentication
 * 
 * @param keyTab Path to Kerberos keytab file
 * @param principal Kerberos principal name
 * @param kerberosServiceName Service name for Kerberos authentication
 * @return JAAS parameter string formatted for Krb5LoginModule with keytab authentication
 */
def getKeytabJaasParams(
  keyTab: String,
  principal: String,
  kerberosServiceName: String
): String

Admin Client Management

Creates and configures Kafka admin clients for token operations.

/**
 * Creates Kafka admin client properties for the given cluster configuration
 * Configures all necessary properties including:
 * - Bootstrap servers
 * - Security protocol and SASL mechanism
 * - SSL keystore and truststore settings
 * - Authentication parameters (JAAS config)
 * - Client identification
 * 
 * @param sparkConf Spark configuration for general settings
 * @param clusterConf Cluster-specific configuration
 * @return Properties object ready for AdminClient.create()
 */
def createAdminClientProperties(
  sparkConf: SparkConf,
  clusterConf: KafkaTokenClusterConf
): java.util.Properties

Configuration Matching

Utilities for matching configurations and determining update requirements.

/**
 * Finds matching cluster configuration for given bootstrap servers
 * Matches bootstrap servers against configured target server regex patterns
 * 
 * @param sparkConf Spark configuration containing cluster definitions
 * @param bootStrapServers Comma-separated list of bootstrap servers to match
 * @return Optional cluster configuration whose target regex matches the bootstrap servers
 */
def findMatchingTokenClusterConfig(
  sparkConf: SparkConf,
  bootStrapServers: String
): Option[KafkaTokenClusterConf]

/**
 * Checks if token configuration needs updating in the given parameters
 * Examines current Kafka parameters to determine if token-based authentication
 * configuration needs to be applied or updated
 * 
 * @param params Current Kafka client parameters
 * @param clusterConfig Optional cluster configuration for comparison
 * @return true if token authentication configuration should be applied/updated
 */
def needTokenUpdate(
  params: java.util.Map[String, Object],
  clusterConfig: Option[KafkaTokenClusterConf]
): Boolean

KafkaDelegationTokenIdentifier

Token identifier implementation for Kafka delegation tokens.

/**
 * Hadoop token identifier implementation for Kafka delegation tokens
 * Extends AbstractDelegationTokenIdentifier to integrate with Hadoop security framework
 * Defined as inner class within KafkaTokenUtil object
 */
class KafkaTokenUtil.KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
  
  /**
   * Returns the token kind for this identifier
   * @return TOKEN_KIND constant ("KAFKA_DELEGATION_TOKEN")
   */
  def getKind: Text
}

Security Protocols

The token utilities support multiple security protocols:

SASL_SSL

  • SASL authentication over SSL-encrypted connections
  • Supports SCRAM-SHA-256 and SCRAM-SHA-512 mechanisms
  • Requires SSL truststore configuration

SASL_PLAINTEXT

  • SASL authentication over plaintext connections
  • Not recommended for production use
  • Supports same SASL mechanisms as SASL_SSL

SSL

  • SSL with client certificate authentication
  • Requires both truststore and keystore configuration
  • Uses client certificates for authentication

Authentication Methods:

// Check authentication method availability
if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
  println("Using global JAAS configuration")
} else {
  // Generate JAAS config programmatically
  val tokenJaas = KafkaTokenUtil.getTokenJaasParams(clusterConf)
  println(s"Generated token JAAS: $tokenJaas")
  
  // Or for keytab authentication
  val keytabJaas = KafkaTokenUtil.getKeytabJaasParams(
    "/path/to/user.keytab",
    "user@REALM",
    "kafka"
  )
  println(s"Generated keytab JAAS: $keytabJaas")
}