Low-level token operations, authentication helpers, and Kafka admin client management for delegation token lifecycle management.
Object providing low-level token operations and utility functions for Kafka delegation token management.
object KafkaTokenUtil extends Logging {
/** Token kind identifier for Kafka delegation tokens */
val TOKEN_KIND: Text = new Text("KAFKA_DELEGATION_TOKEN")
/**
* Creates a token service identifier for the given cluster identifier
* @param identifier Cluster identifier
* @return Text object representing the token service
*/
def getTokenService(identifier: String): Text
/**
* Obtains a delegation token from a Kafka cluster
* @param sparkConf Spark configuration
* @param clusterConf Cluster configuration containing authentication details
* @return Tuple of (delegation token, next renewal time in milliseconds)
*/
def obtainToken(
sparkConf: SparkConf,
clusterConf: KafkaTokenClusterConf
): (Token[KafkaDelegationTokenIdentifier], Long)
/**
* Validates that the current user is not a proxy user
* Throws exception if proxy user is detected
*/
def checkProxyUser(): Unit
/**
* Creates Kafka admin client properties for the given cluster configuration
* @param sparkConf Spark configuration
* @param clusterConf Cluster configuration
* @return Properties object configured for Kafka admin client
*/
def createAdminClientProperties(
sparkConf: SparkConf,
clusterConf: KafkaTokenClusterConf
): java.util.Properties
/**
* Checks if global JAAS configuration is provided
* @return true if java.security.auth.login.config system property is set
*/
def isGlobalJaasConfigurationProvided: Boolean
/**
* Finds matching cluster configuration for given bootstrap servers
* @param sparkConf Spark configuration
* @param bootStrapServers Bootstrap servers to match against
* @return Optional cluster configuration that matches the servers
*/
def findMatchingTokenClusterConfig(
sparkConf: SparkConf,
bootStrapServers: String
): Option[KafkaTokenClusterConf]
/**
* Generates JAAS parameters for token authentication
* @param clusterConf Cluster configuration containing token details
* @return JAAS parameter string for token-based authentication
*/
def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String
/**
* Checks if token configuration needs updating in the given parameters
* @param params Current Kafka parameters
* @param clusterConfig Optional cluster configuration
* @return true if token configuration update is needed
*/
def needTokenUpdate(
params: java.util.Map[String, Object],
clusterConfig: Option[KafkaTokenClusterConf]
): Boolean
/**
* Generates JAAS parameters for keytab authentication
* @param keyTab Path to keytab file
* @param principal Kerberos principal
* @param kerberosServiceName Kerberos service name
* @return JAAS parameter string for keytab-based authentication
*/
def getKeytabJaasParams(
keyTab: String,
principal: String,
kerberosServiceName: String
): String
}Usage Examples:
import org.apache.spark.SparkConf
import org.apache.spark.kafka010.{KafkaTokenUtil, KafkaTokenClusterConf}
// Setup cluster configuration
val clusterConf = KafkaTokenClusterConf(
identifier = "prod-cluster",
authBootstrapServers = "kafka1:9092,kafka2:9092",
targetServersRegex = "kafka.*:9092",
securityProtocol = "SASL_SSL",
kerberosServiceName = "kafka",
trustStoreLocation = Some("/etc/kafka/truststore.jks"),
trustStorePassword = Some("password"),
tokenMechanism = "SCRAM-SHA-512",
// ... other configuration
)
val sparkConf = new SparkConf()
// Check proxy user constraints
try {
KafkaTokenUtil.checkProxyUser()
println("User validation passed")
} catch {
case e: Exception => println(s"Proxy user validation failed: ${e.getMessage}")
}
// Create admin client properties
val adminProps = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)
println(s"Admin client configured with ${adminProps.size()} properties")
// Obtain delegation token
try {
val (token, renewalTime) = KafkaTokenUtil.obtainToken(sparkConf, clusterConf)
println(s"Token obtained: ${token.getService}")
println(s"Next renewal: $renewalTime")
} catch {
case e: Exception => println(s"Token acquisition failed: ${e.getMessage}")
}Creates and manages token service identifiers for Kafka clusters.
/**
* Creates a token service identifier for the given cluster identifier
* Service identifiers are used to uniquely identify tokens in the credential store
*
* @param identifier Cluster identifier from configuration
* @return Text object representing the token service in format: kafka.server.delegation.token.<identifier>
*/
def getTokenService(identifier: String): TextObtains delegation tokens from Kafka clusters using appropriate authentication methods.
/**
* Obtains a delegation token from a Kafka cluster
* Process:
* 1. Creates Kafka admin client with authentication credentials
* 2. Requests delegation token with specified parameters
* 3. Converts Kafka token to Hadoop token format
* 4. Calculates next renewal time based on token lifetime
*
* @param sparkConf Spark configuration for general settings
* @param clusterConf Cluster configuration containing authentication details
* @return Tuple of (Hadoop delegation token, next renewal time in milliseconds since epoch)
* @throws Exception if token acquisition fails due to authentication, network, or permission issues
*/
def obtainToken(
sparkConf: SparkConf,
clusterConf: KafkaTokenClusterConf
): (Token[KafkaDelegationTokenIdentifier], Long)Utility functions for managing different authentication methods and configurations.
/**
* Validates that the current user is not a proxy user
* Proxy users are not supported for delegation token operations
* @throws IllegalArgumentException if current user is a proxy user
*/
def checkProxyUser(): Unit
/**
* Checks if global JAAS configuration is provided via system property
* @return true if java.security.auth.login.config system property is set and points to valid file
*/
def isGlobalJaasConfigurationProvided: Boolean
/**
* Generates JAAS parameters for token authentication
* Creates ScramLoginModule configuration with delegation token parameters
*
* @param clusterConf Cluster configuration containing token mechanism and other settings
* @return JAAS parameter string formatted for ScramLoginModule with token authentication
*/
def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String
/**
* Generates JAAS parameters for keytab authentication
* Creates Krb5LoginModule configuration for keytab-based Kerberos authentication
*
* @param keyTab Path to Kerberos keytab file
* @param principal Kerberos principal name
* @param kerberosServiceName Service name for Kerberos authentication
* @return JAAS parameter string formatted for Krb5LoginModule with keytab authentication
*/
def getKeytabJaasParams(
keyTab: String,
principal: String,
kerberosServiceName: String
): StringCreates and configures Kafka admin clients for token operations.
/**
* Creates Kafka admin client properties for the given cluster configuration
* Configures all necessary properties including:
* - Bootstrap servers
* - Security protocol and SASL mechanism
* - SSL keystore and truststore settings
* - Authentication parameters (JAAS config)
* - Client identification
*
* @param sparkConf Spark configuration for general settings
* @param clusterConf Cluster-specific configuration
* @return Properties object ready for AdminClient.create()
*/
def createAdminClientProperties(
sparkConf: SparkConf,
clusterConf: KafkaTokenClusterConf
): java.util.PropertiesUtilities for matching configurations and determining update requirements.
/**
* Finds matching cluster configuration for given bootstrap servers
* Matches bootstrap servers against configured target server regex patterns
*
* @param sparkConf Spark configuration containing cluster definitions
* @param bootStrapServers Comma-separated list of bootstrap servers to match
* @return Optional cluster configuration whose target regex matches the bootstrap servers
*/
def findMatchingTokenClusterConfig(
sparkConf: SparkConf,
bootStrapServers: String
): Option[KafkaTokenClusterConf]
/**
* Checks if token configuration needs updating in the given parameters
* Examines current Kafka parameters to determine if token-based authentication
* configuration needs to be applied or updated
*
* @param params Current Kafka client parameters
* @param clusterConfig Optional cluster configuration for comparison
* @return true if token authentication configuration should be applied/updated
*/
def needTokenUpdate(
params: java.util.Map[String, Object],
clusterConfig: Option[KafkaTokenClusterConf]
): BooleanToken identifier implementation for Kafka delegation tokens.
/**
* Hadoop token identifier implementation for Kafka delegation tokens
* Extends AbstractDelegationTokenIdentifier to integrate with Hadoop security framework
* Defined as inner class within KafkaTokenUtil object
*/
class KafkaTokenUtil.KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
/**
* Returns the token kind for this identifier
* @return TOKEN_KIND constant ("KAFKA_DELEGATION_TOKEN")
*/
def getKind: Text
}The token utilities support multiple security protocols:
Authentication Methods:
// Check authentication method availability
if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
println("Using global JAAS configuration")
} else {
// Generate JAAS config programmatically
val tokenJaas = KafkaTokenUtil.getTokenJaasParams(clusterConf)
println(s"Generated token JAAS: $tokenJaas")
// Or for keytab authentication
val keytabJaas = KafkaTokenUtil.getKeytabJaasParams(
"/path/to/user.keytab",
"user@REALM",
"kafka"
)
println(s"Generated keytab JAAS: $keytabJaas")
}