Hadoop delegation token support for Kafka authentication in Spark streaming applications
npx @tessl/cli install tessl/maven-org-apache-spark--spark-token-provider-kafka-0-10-2-12@3.5.0Spark Token Provider Kafka 0.10 provides Hadoop delegation token support for Kafka authentication in Spark streaming applications. This library enables secure authentication with Kafka 0.10+ clusters through delegation tokens, supporting automatic token obtainment, management, and renewal for enterprise streaming environments requiring Kerberos-based security.
import org.apache.spark.kafka010._For specific components:
import org.apache.spark.kafka010.{KafkaDelegationTokenProvider, KafkaTokenUtil, KafkaConfigUpdater}import org.apache.spark.SparkConf
import org.apache.spark.kafka010.{KafkaTokenSparkConf, KafkaConfigUpdater}
import org.apache.kafka.clients.CommonClientConfigs
// Configure Spark for Kafka token authentication
val sparkConf = new SparkConf()
.set("spark.kafka.clusters.cluster1.auth.bootstrap.servers", "kafka1:9092,kafka2:9092")
.set("spark.kafka.clusters.cluster1.security.protocol", "SASL_SSL")
.set("spark.kafka.clusters.cluster1.sasl.kerberos.service.name", "kafka")
// Update Kafka consumer/producer configuration with token authentication
val kafkaParams = Map[String, Object](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9092,kafka2:9092"
)
val updatedParams = KafkaConfigUpdater("example-module", kafkaParams)
.setAuthenticationConfigIfNeeded()
.build()The library provides several key components for token-based Kafka authentication:
KafkaDelegationTokenProvider integrates with Spark's security framework to obtain delegation tokensKafkaTokenSparkConf handles cluster-specific configuration parsing from SparkConfKafkaTokenUtil provides core token operations including obtainment and JAAS configurationKafkaConfigUpdater applies authentication settings to Kafka client configurationsKafkaRedactionUtil ensures sensitive data is properly redacted from logsMain entry point for Spark's delegation token framework, automatically obtaining tokens for configured Kafka clusters.
class KafkaDelegationTokenProvider extends HadoopDelegationTokenProvider {
def serviceName: String
def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials
): Option[Long]
def delegationTokensRequired(
sparkConf: SparkConf,
hadoopConf: Configuration
): Boolean
}Handles parsing and management of Kafka cluster configurations from Spark configuration properties.
case class KafkaTokenClusterConf(
identifier: String,
authBootstrapServers: String,
targetServersRegex: String,
securityProtocol: String,
kerberosServiceName: String,
trustStoreType: Option[String],
trustStoreLocation: Option[String],
trustStorePassword: Option[String],
keyStoreType: Option[String],
keyStoreLocation: Option[String],
keyStorePassword: Option[String],
keyPassword: Option[String],
tokenMechanism: String,
specifiedKafkaParams: Map[String, String]
)
object KafkaTokenSparkConf {
def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf
def getAllClusterConfigs(sparkConf: SparkConf): Set[KafkaTokenClusterConf]
val CLUSTERS_CONFIG_PREFIX: String = "spark.kafka.clusters."
val DEFAULT_TARGET_SERVERS_REGEX: String = ".*"
val DEFAULT_SASL_KERBEROS_SERVICE_NAME: String = "kafka"
val DEFAULT_SECURITY_PROTOCOL_CONFIG: String = "SASL_SSL"
val DEFAULT_SASL_TOKEN_MECHANISM: String = "SCRAM-SHA-512"
}Core utilities for obtaining delegation tokens and managing authentication configurations.
object KafkaTokenUtil {
val TOKEN_KIND: Text
def obtainToken(
sparkConf: SparkConf,
clusterConf: KafkaTokenClusterConf
): (Token[KafkaDelegationTokenIdentifier], Long)
def checkProxyUser(): Unit
def createAdminClientProperties(
sparkConf: SparkConf,
clusterConf: KafkaTokenClusterConf
): java.util.Properties
def isGlobalJaasConfigurationProvided: Boolean
def getKeytabJaasParams(
keyTab: String,
principal: String,
kerberosServiceName: String
): String
def findMatchingTokenClusterConfig(
sparkConf: SparkConf,
bootStrapServers: String
): Option[KafkaTokenClusterConf]
def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String
def needTokenUpdate(
params: java.util.Map[String, Object],
clusterConfig: Option[KafkaTokenClusterConf]
): Boolean
def getTokenService(identifier: String): Text
}
class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
def getKind: Text
}Fluent interface for updating Kafka client configurations with authentication settings.
case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) {
def set(key: String, value: Object): KafkaConfigUpdater.this.type
def setIfUnset(key: String, value: Object): KafkaConfigUpdater.this.type
def setAuthenticationConfigIfNeeded(): KafkaConfigUpdater.this.type
def setAuthenticationConfigIfNeeded(
clusterConfig: Option[KafkaTokenClusterConf]
): KafkaConfigUpdater.this.type
def build(): java.util.Map[String, Object]
}Utilities for secure handling of sensitive configuration parameters in logs.
object KafkaRedactionUtil {
def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]
def redactJaasParam(param: String): String
}The library uses Spark configuration properties with the prefix spark.kafka.clusters.<identifier>. for cluster-specific settings:
// Bootstrap servers for token obtainment (required)
"spark.kafka.clusters.<identifier>.auth.bootstrap.servers" -> "kafka1:9092,kafka2:9092"// Target servers regex pattern (default: ".*")
"spark.kafka.clusters.<identifier>.target.bootstrap.servers.regex" -> "kafka.*:9092"
// Security protocol (default: "SASL_SSL")
"spark.kafka.clusters.<identifier>.security.protocol" -> "SASL_SSL"
// Kerberos service name (default: "kafka")
"spark.kafka.clusters.<identifier>.sasl.kerberos.service.name" -> "kafka"
// Token mechanism (default: "SCRAM-SHA-512")
"spark.kafka.clusters.<identifier>.sasl.token.mechanism" -> "SCRAM-SHA-512"
// SSL truststore configuration
"spark.kafka.clusters.<identifier>.ssl.truststore.type" -> "JKS"
"spark.kafka.clusters.<identifier>.ssl.truststore.location" -> "/path/to/truststore.jks"
"spark.kafka.clusters.<identifier>.ssl.truststore.password" -> "truststore-password"
// SSL keystore configuration (for SSL protocol)
"spark.kafka.clusters.<identifier>.ssl.keystore.type" -> "JKS"
"spark.kafka.clusters.<identifier>.ssl.keystore.location" -> "/path/to/keystore.jks"
"spark.kafka.clusters.<identifier>.ssl.keystore.password" -> "keystore-password"
"spark.kafka.clusters.<identifier>.ssl.key.password" -> "key-password"
// Additional Kafka client properties
"spark.kafka.clusters.<identifier>.kafka.<kafka-property>" -> "value"The library supports multiple authentication methods applied in the following order of preference:
java.security.auth.login.config)The library handles several error conditions:
IllegalArgumentException when attempting to use proxy users (not yet supported)IllegalArgumentException when multiple tokens match bootstrap serversval sparkConf = new SparkConf()
// Cluster 1 - Production
.set("spark.kafka.clusters.prod.auth.bootstrap.servers", "prod-kafka1:9092,prod-kafka2:9092")
.set("spark.kafka.clusters.prod.target.bootstrap.servers.regex", "prod-kafka.*:9092")
.set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")
.set("spark.kafka.clusters.prod.ssl.truststore.location", "/etc/kafka/truststore.jks")
.set("spark.kafka.clusters.prod.ssl.truststore.password", "prod-truststore-pass")
// Cluster 2 - Staging
.set("spark.kafka.clusters.staging.auth.bootstrap.servers", "staging-kafka:9092")
.set("spark.kafka.clusters.staging.target.bootstrap.servers.regex", "staging-kafka:9092")
.set("spark.kafka.clusters.staging.security.protocol", "SASL_PLAINTEXT")import org.apache.kafka.clients.consumer.ConsumerConfig
val baseConsumerConfig = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9092,kafka2:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "my-consumer-group",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"
)
val authenticatedConfig = KafkaConfigUpdater("consumer", baseConsumerConfig)
.setAuthenticationConfigIfNeeded()
.build()import org.apache.kafka.clients.producer.ProducerConfig
val baseProducerConfig = Map[String, Object](
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9092,kafka2:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer"
)
val authenticatedConfig = KafkaConfigUpdater("producer", baseProducerConfig)
.setAuthenticationConfigIfNeeded()
.build()