Core delegation token provider that integrates with Apache Spark's security framework to obtain and manage Kafka delegation tokens for secure cluster authentication.
Main token provider class implementing Hadoop's delegation token provider interface for Kafka clusters.
/**
* Kafka delegation token provider for Apache Spark
* Implements HadoopDelegationTokenProvider to integrate with Spark's security framework
*/
class KafkaDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging {
/**
* Service name identifier for this token provider
* @return "kafka"
*/
def serviceName: String
/**
* Obtains delegation tokens for all configured Kafka clusters
* @param hadoopConf Hadoop configuration
* @param sparkConf Spark configuration containing cluster settings
* @param creds Credentials object to store obtained tokens
* @return Optional next renewal time (earliest across all clusters)
*/
def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials
): Option[Long]
/**
* Checks if delegation tokens are required for any configured Kafka cluster
* @param sparkConf Spark configuration
* @param hadoopConf Hadoop configuration
* @return true if tokens are required, false otherwise
*/
def delegationTokensRequired(
sparkConf: SparkConf,
hadoopConf: Configuration
): Boolean
}Usage Examples:
import org.apache.spark.SparkConf
import org.apache.spark.kafka010.KafkaDelegationTokenProvider
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
// Setup Spark configuration with Kafka cluster details
val sparkConf = new SparkConf()
.set("spark.kafka.clusters.prod.auth.bootstrap.servers", "kafka1:9092,kafka2:9092")
.set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")
.set("spark.kafka.clusters.prod.sasl.kerberos.service.name", "kafka")
.set("spark.kafka.clusters.prod.ssl.truststore.location", "/path/to/truststore.jks")
.set("spark.kafka.clusters.prod.ssl.truststore.password", "password")
// Create provider and check requirements
val tokenProvider = new KafkaDelegationTokenProvider()
val hadoopConf = new Configuration()
// Check if tokens are needed
if (tokenProvider.delegationTokensRequired(sparkConf, hadoopConf)) {
println("Delegation tokens are required for configured Kafka clusters")
// Obtain tokens
val credentials = new Credentials()
val nextRenewal = tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials)
nextRenewal match {
case Some(renewalTime) =>
println(s"Tokens obtained successfully. Next renewal: $renewalTime")
case None =>
println("No tokens obtained or renewal not required")
}
} else {
println("No delegation tokens required")
}Multi-Cluster Configuration:
// Configure multiple Kafka clusters
val sparkConf = new SparkConf()
// Production cluster
.set("spark.kafka.clusters.prod.auth.bootstrap.servers", "prod-kafka1:9092")
.set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")
.set("spark.kafka.clusters.prod.target.bootstrap.servers.regex", "prod-kafka.*:9092")
// Development cluster
.set("spark.kafka.clusters.dev.auth.bootstrap.servers", "dev-kafka1:9092")
.set("spark.kafka.clusters.dev.security.protocol", "SASL_PLAINTEXT")
.set("spark.kafka.clusters.dev.target.bootstrap.servers.regex", "dev-kafka.*:9092")
val tokenProvider = new KafkaDelegationTokenProvider()
val credentials = new Credentials()
// This will obtain tokens for both clusters
tokenProvider.obtainDelegationTokens(new Configuration(), sparkConf, credentials)Returns the service name identifier for this token provider.
/**
* Service name identifier for this token provider
* @return "kafka" - the service name used by Hadoop security framework
*/
def serviceName: StringDetermines if delegation tokens are required based on the current configuration and security settings.
/**
* Checks if delegation tokens are required for any configured Kafka cluster
* Returns true if any cluster requires delegation tokens based on:
* - Security protocol (SASL_SSL, SASL_PLAINTEXT, SSL)
* - Authentication method availability
* - Current user credentials
*
* @param sparkConf Spark configuration containing cluster settings
* @param hadoopConf Hadoop configuration
* @return true if delegation tokens are required, false otherwise
*/
def delegationTokensRequired(sparkConf: SparkConf, hadoopConf: Configuration): BooleanObtains delegation tokens for all configured Kafka clusters and stores them in the provided credentials.
/**
* Obtains delegation tokens for all configured Kafka clusters
* Processes each cluster configuration and:
* - Validates security requirements
* - Connects to Kafka cluster using admin client
* - Requests delegation token with appropriate credentials
* - Stores token in provided Credentials object
* - Tracks renewal times across all clusters
*
* @param hadoopConf Hadoop configuration
* @param sparkConf Spark configuration containing cluster settings under spark.kafka.clusters.*
* @param creds Credentials object to store obtained tokens
* @return Optional next renewal time (earliest across all clusters), None if no tokens obtained
*/
def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials
): Option[Long]The token provider handles various error conditions:
All errors are logged appropriately and non-fatal errors are handled gracefully to allow other clusters to succeed.