or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

index.mddocs/

Spark Token Provider Kafka 0.10

Spark Token Provider Kafka 0.10 provides Hadoop delegation token support for Kafka authentication in Spark streaming applications. This library enables secure authentication with Kafka 0.10+ clusters through delegation tokens, supporting automatic token obtainment, management, and renewal for enterprise streaming environments requiring Kerberos-based security.

Package Information

  • Package Name: spark-token-provider-kafka-0-10_2.12
  • Package Type: maven
  • Language: Scala
  • Group ID: org.apache.spark
  • Artifact ID: spark-token-provider-kafka-0-10_2.12
  • Installation: Include as dependency in pom.xml or build.sbt

Core Imports

import org.apache.spark.kafka010._

For specific components:

import org.apache.spark.kafka010.{KafkaDelegationTokenProvider, KafkaTokenUtil, KafkaConfigUpdater}

Basic Usage

import org.apache.spark.SparkConf
import org.apache.spark.kafka010.{KafkaTokenSparkConf, KafkaConfigUpdater}
import org.apache.kafka.clients.CommonClientConfigs

// Configure Spark for Kafka token authentication
val sparkConf = new SparkConf()
  .set("spark.kafka.clusters.cluster1.auth.bootstrap.servers", "kafka1:9092,kafka2:9092")
  .set("spark.kafka.clusters.cluster1.security.protocol", "SASL_SSL")
  .set("spark.kafka.clusters.cluster1.sasl.kerberos.service.name", "kafka")

// Update Kafka consumer/producer configuration with token authentication
val kafkaParams = Map[String, Object](
  CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9092,kafka2:9092"
)

val updatedParams = KafkaConfigUpdater("example-module", kafkaParams)
  .setAuthenticationConfigIfNeeded()
  .build()

Architecture

The library provides several key components for token-based Kafka authentication:

  • Token Provider: KafkaDelegationTokenProvider integrates with Spark's security framework to obtain delegation tokens
  • Configuration Management: KafkaTokenSparkConf handles cluster-specific configuration parsing from SparkConf
  • Token Utilities: KafkaTokenUtil provides core token operations including obtainment and JAAS configuration
  • Configuration Updater: KafkaConfigUpdater applies authentication settings to Kafka client configurations
  • Security Utilities: KafkaRedactionUtil ensures sensitive data is properly redacted from logs

Capabilities

Token Provider Integration

Main entry point for Spark's delegation token framework, automatically obtaining tokens for configured Kafka clusters.

class KafkaDelegationTokenProvider extends HadoopDelegationTokenProvider {
  def serviceName: String
  def obtainDelegationTokens(
    hadoopConf: Configuration,
    sparkConf: SparkConf,
    creds: Credentials
  ): Option[Long]
  def delegationTokensRequired(
    sparkConf: SparkConf,
    hadoopConf: Configuration
  ): Boolean
}

Cluster Configuration Management

Handles parsing and management of Kafka cluster configurations from Spark configuration properties.

case class KafkaTokenClusterConf(
  identifier: String,
  authBootstrapServers: String,
  targetServersRegex: String,
  securityProtocol: String,
  kerberosServiceName: String,
  trustStoreType: Option[String],
  trustStoreLocation: Option[String],
  trustStorePassword: Option[String],
  keyStoreType: Option[String],
  keyStoreLocation: Option[String],
  keyStorePassword: Option[String],
  keyPassword: Option[String],
  tokenMechanism: String,
  specifiedKafkaParams: Map[String, String]
)

object KafkaTokenSparkConf {
  def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf
  def getAllClusterConfigs(sparkConf: SparkConf): Set[KafkaTokenClusterConf]
  
  val CLUSTERS_CONFIG_PREFIX: String = "spark.kafka.clusters."
  val DEFAULT_TARGET_SERVERS_REGEX: String = ".*"
  val DEFAULT_SASL_KERBEROS_SERVICE_NAME: String = "kafka"
  val DEFAULT_SECURITY_PROTOCOL_CONFIG: String = "SASL_SSL"
  val DEFAULT_SASL_TOKEN_MECHANISM: String = "SCRAM-SHA-512"
}

Token Operations

Core utilities for obtaining delegation tokens and managing authentication configurations.

object KafkaTokenUtil {
  val TOKEN_KIND: Text
  
  def obtainToken(
    sparkConf: SparkConf,
    clusterConf: KafkaTokenClusterConf
  ): (Token[KafkaDelegationTokenIdentifier], Long)
  
  def checkProxyUser(): Unit
  
  def createAdminClientProperties(
    sparkConf: SparkConf,
    clusterConf: KafkaTokenClusterConf
  ): java.util.Properties
  
  def isGlobalJaasConfigurationProvided: Boolean
  
  def getKeytabJaasParams(
    keyTab: String,
    principal: String,
    kerberosServiceName: String
  ): String
  
  def findMatchingTokenClusterConfig(
    sparkConf: SparkConf,
    bootStrapServers: String
  ): Option[KafkaTokenClusterConf]
  
  def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String
  
  def needTokenUpdate(
    params: java.util.Map[String, Object],
    clusterConfig: Option[KafkaTokenClusterConf]
  ): Boolean
  
  def getTokenService(identifier: String): Text
}

class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
  def getKind: Text
}

Configuration Updating

Fluent interface for updating Kafka client configurations with authentication settings.

case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) {
  def set(key: String, value: Object): KafkaConfigUpdater.this.type
  def setIfUnset(key: String, value: Object): KafkaConfigUpdater.this.type
  def setAuthenticationConfigIfNeeded(): KafkaConfigUpdater.this.type
  def setAuthenticationConfigIfNeeded(
    clusterConfig: Option[KafkaTokenClusterConf]
  ): KafkaConfigUpdater.this.type
  def build(): java.util.Map[String, Object]
}

Security and Logging

Utilities for secure handling of sensitive configuration parameters in logs.

object KafkaRedactionUtil {
  def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]
  def redactJaasParam(param: String): String
}

Configuration Properties

The library uses Spark configuration properties with the prefix spark.kafka.clusters.<identifier>. for cluster-specific settings:

Required Configuration

// Bootstrap servers for token obtainment (required)
"spark.kafka.clusters.<identifier>.auth.bootstrap.servers" -> "kafka1:9092,kafka2:9092"

Optional Configuration

// Target servers regex pattern (default: ".*")
"spark.kafka.clusters.<identifier>.target.bootstrap.servers.regex" -> "kafka.*:9092"

// Security protocol (default: "SASL_SSL")
"spark.kafka.clusters.<identifier>.security.protocol" -> "SASL_SSL"

// Kerberos service name (default: "kafka")
"spark.kafka.clusters.<identifier>.sasl.kerberos.service.name" -> "kafka"

// Token mechanism (default: "SCRAM-SHA-512")
"spark.kafka.clusters.<identifier>.sasl.token.mechanism" -> "SCRAM-SHA-512"

// SSL truststore configuration
"spark.kafka.clusters.<identifier>.ssl.truststore.type" -> "JKS"
"spark.kafka.clusters.<identifier>.ssl.truststore.location" -> "/path/to/truststore.jks"
"spark.kafka.clusters.<identifier>.ssl.truststore.password" -> "truststore-password"

// SSL keystore configuration (for SSL protocol)
"spark.kafka.clusters.<identifier>.ssl.keystore.type" -> "JKS"
"spark.kafka.clusters.<identifier>.ssl.keystore.location" -> "/path/to/keystore.jks"
"spark.kafka.clusters.<identifier>.ssl.keystore.password" -> "keystore-password"
"spark.kafka.clusters.<identifier>.ssl.key.password" -> "key-password"

// Additional Kafka client properties
"spark.kafka.clusters.<identifier>.kafka.<kafka-property>" -> "value"

Authentication Methods

The library supports multiple authentication methods applied in the following order of preference:

  1. Global JAAS Configuration: Uses JVM-wide security settings (e.g., java.security.auth.login.config)
  2. Keytab Authentication: Uses Kerberos keytab file with dynamic JAAS configuration
  3. Ticket Cache Authentication: Uses Kerberos ticket cache with dynamic JAAS configuration
  4. Token Authentication: Uses delegation tokens with SCRAM mechanism

Security Protocols

SASL_SSL (Recommended)

  • SASL authentication over SSL-encrypted connection
  • Requires truststore configuration
  • Default security protocol

SSL

  • SSL with client certificate authentication
  • Requires both truststore and keystore configuration
  • Generates warning about 2-way authentication requirement

SASL_PLAINTEXT

  • SASL authentication over unencrypted connection
  • Generates security warning
  • Not recommended for production

Error Handling

The library handles several error conditions:

  • Proxy User Error: Throws IllegalArgumentException when attempting to use proxy users (not yet supported)
  • Configuration Errors: Logs warnings for missing or invalid cluster configurations
  • Token Obtainment Failures: Logs warnings with cluster context for debugging
  • Multiple Token Matches: Throws IllegalArgumentException when multiple tokens match bootstrap servers

Usage Examples

Multi-Cluster Configuration

val sparkConf = new SparkConf()
  // Cluster 1 - Production
  .set("spark.kafka.clusters.prod.auth.bootstrap.servers", "prod-kafka1:9092,prod-kafka2:9092")
  .set("spark.kafka.clusters.prod.target.bootstrap.servers.regex", "prod-kafka.*:9092")
  .set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")
  .set("spark.kafka.clusters.prod.ssl.truststore.location", "/etc/kafka/truststore.jks")
  .set("spark.kafka.clusters.prod.ssl.truststore.password", "prod-truststore-pass")
  
  // Cluster 2 - Staging
  .set("spark.kafka.clusters.staging.auth.bootstrap.servers", "staging-kafka:9092")
  .set("spark.kafka.clusters.staging.target.bootstrap.servers.regex", "staging-kafka:9092")
  .set("spark.kafka.clusters.staging.security.protocol", "SASL_PLAINTEXT")

Consumer Configuration Update

import org.apache.kafka.clients.consumer.ConsumerConfig

val baseConsumerConfig = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9092,kafka2:9092",
  ConsumerConfig.GROUP_ID_CONFIG -> "my-consumer-group",
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"
)

val authenticatedConfig = KafkaConfigUpdater("consumer", baseConsumerConfig)
  .setAuthenticationConfigIfNeeded()
  .build()

Producer Configuration Update

import org.apache.kafka.clients.producer.ProducerConfig

val baseProducerConfig = Map[String, Object](
  ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9092,kafka2:9092",
  ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
  ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer"
)

val authenticatedConfig = KafkaConfigUpdater("producer", baseProducerConfig)
  .setAuthenticationConfigIfNeeded()
  .build()