or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdindex.mdtoken-provider.mdtoken-utilities.mdutilities.md
tile.json

tessl/maven-org-apache-spark--spark-token-provider-kafka-0-10_2-13

Kafka 0.10+ Token Provider for Streaming - A specialized security module that handles delegation token management for Kafka integration in Apache Spark streaming applications.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-token-provider-kafka-0-10_2.13@4.0.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-token-provider-kafka-0-10_2-13@4.0.0

index.mddocs/

Kafka 0.10+ Token Provider for Streaming

A specialized security module that handles delegation token management for Kafka integration in Apache Spark streaming applications, providing secure authentication and authorization capabilities when connecting to Kafka clusters using SASL-based security protocols.

Package Information

  • Package Name: spark-token-provider-kafka-0-10_2.13
  • Package Type: Maven
  • Language: Scala
  • Installation: Add to your Maven pom.xml:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-token-provider-kafka-0-10_2.13</artifactId>
      <version>4.0.0</version>
    </dependency>

Core Imports

import org.apache.spark.kafka010.{KafkaDelegationTokenProvider, KafkaConfigUpdater, KafkaTokenUtil}
import org.apache.spark.kafka010.{KafkaTokenClusterConf, KafkaTokenSparkConf}
import org.apache.spark.kafka010.KafkaTokenUtil.KafkaDelegationTokenIdentifier

Basic Usage

import org.apache.spark.SparkConf
import org.apache.spark.kafka010.{KafkaDelegationTokenProvider, KafkaTokenSparkConf}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials

// Configure Spark for Kafka cluster authentication
val sparkConf = new SparkConf()
  .set("spark.kafka.clusters.cluster1.auth.bootstrap.servers", "kafka1:9092,kafka2:9092")
  .set("spark.kafka.clusters.cluster1.security.protocol", "SASL_SSL")
  .set("spark.kafka.clusters.cluster1.sasl.kerberos.service.name", "kafka")

// Create token provider
val tokenProvider = new KafkaDelegationTokenProvider()

// Check if tokens are required
val hadoopConf = new Configuration()
val credentials = new Credentials()

if (tokenProvider.delegationTokensRequired(sparkConf, hadoopConf)) {
  // Obtain delegation tokens
  val nextRenewalTime = tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials)
  println(s"Tokens obtained, next renewal: $nextRenewalTime")
}

Architecture

The Kafka Token Provider is built around several key components:

  • Token Provider: Main KafkaDelegationTokenProvider class implementing Hadoop's HadoopDelegationTokenProvider interface
  • Configuration Management: KafkaTokenSparkConf for parsing Spark configuration and KafkaConfigUpdater for runtime configuration updates
  • Token Utilities: KafkaTokenUtil object providing low-level token operations and authentication helpers
  • Security Integration: Full integration with Kerberos, SSL, and SASL authentication mechanisms
  • Multi-Cluster Support: Supports delegation token management across multiple Kafka clusters simultaneously

Capabilities

Token Provider

Core delegation token provider that integrates with Apache Spark's security framework to obtain and manage Kafka delegation tokens for secure cluster authentication.

class KafkaDelegationTokenProvider extends HadoopDelegationTokenProvider {
  def serviceName: String
  def obtainDelegationTokens(
    hadoopConf: Configuration, 
    sparkConf: SparkConf, 
    creds: Credentials
  ): Option[Long]
  def delegationTokensRequired(
    sparkConf: SparkConf, 
    hadoopConf: Configuration
  ): Boolean
}

Token Provider

Configuration Management

Configuration utilities for parsing Spark configuration properties and managing Kafka client parameters with security and redaction support.

case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) {
  def set(key: String, value: Object): KafkaConfigUpdater.this.type
  def setIfUnset(key: String, value: Object): KafkaConfigUpdater.this.type
  def setAuthenticationConfigIfNeeded(): KafkaConfigUpdater.this.type
  def build(): java.util.Map[String, Object]
}

object KafkaTokenSparkConf {
  def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf
  def getAllClusterConfigs(sparkConf: SparkConf): Set[KafkaTokenClusterConf]
}

Configuration Management

Token Utilities

Low-level token operations, authentication helpers, and Kafka admin client management for delegation token lifecycle management.

object KafkaTokenUtil {
  val TOKEN_KIND: Text
  def getTokenService(identifier: String): Text
  def obtainToken(
    sparkConf: SparkConf, 
    clusterConf: KafkaTokenClusterConf
  ): (Token[KafkaDelegationTokenIdentifier], Long)
  def checkProxyUser(): Unit
  def createAdminClientProperties(
    sparkConf: SparkConf, 
    clusterConf: KafkaTokenClusterConf
  ): java.util.Properties
  def isGlobalJaasConfigurationProvided: Boolean
  def findMatchingTokenClusterConfig(
    sparkConf: SparkConf, 
    bootStrapServers: String
  ): Option[KafkaTokenClusterConf]
  def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String
  def needTokenUpdate(
    params: java.util.Map[String, Object], 
    clusterConfig: Option[KafkaTokenClusterConf]
  ): Boolean
  def getKeytabJaasParams(
    keyTab: String, 
    principal: String, 
    kerberosServiceName: String
  ): String
}

Token Utilities

Utility Classes

Additional utility classes for configuration redaction and error handling.

object KafkaRedactionUtil {
  def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]
  def redactJaasParam(param: String): String
}

object KafkaTokenProviderExceptions {
  def missingKafkaOption(option: String): SparkException
}

Utilities

Types

import org.apache.hadoop.io.Text
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

case class KafkaTokenClusterConf(
  identifier: String,
  authBootstrapServers: String,
  targetServersRegex: String,
  securityProtocol: String,
  kerberosServiceName: String,
  trustStoreType: Option[String],
  trustStoreLocation: Option[String],
  trustStorePassword: Option[String],
  keyStoreType: Option[String],
  keyStoreLocation: Option[String],
  keyStorePassword: Option[String],
  keyPassword: Option[String],
  tokenMechanism: String,
  specifiedKafkaParams: Map[String, String]
)

// Defined within KafkaTokenUtil object
class KafkaTokenUtil.KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
  def getKind: Text
}