or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdindex.mdtoken-provider.mdtoken-utilities.mdutilities.md
tile.json

token-provider.mddocs/

Token Provider

Core delegation token provider that integrates with Apache Spark's security framework to obtain and manage Kafka delegation tokens for secure cluster authentication.

Capabilities

KafkaDelegationTokenProvider

Main token provider class implementing Hadoop's delegation token provider interface for Kafka clusters.

/**
 * Kafka delegation token provider for Apache Spark
 * Implements HadoopDelegationTokenProvider to integrate with Spark's security framework
 */
class KafkaDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging {
  
  /**
   * Service name identifier for this token provider
   * @return "kafka"
   */
  def serviceName: String
  
  /**
   * Obtains delegation tokens for all configured Kafka clusters
   * @param hadoopConf Hadoop configuration
   * @param sparkConf Spark configuration containing cluster settings
   * @param creds Credentials object to store obtained tokens
   * @return Optional next renewal time (earliest across all clusters)
   */
  def obtainDelegationTokens(
    hadoopConf: Configuration,
    sparkConf: SparkConf,
    creds: Credentials
  ): Option[Long]
  
  /**
   * Checks if delegation tokens are required for any configured Kafka cluster
   * @param sparkConf Spark configuration
   * @param hadoopConf Hadoop configuration  
   * @return true if tokens are required, false otherwise
   */
  def delegationTokensRequired(
    sparkConf: SparkConf,
    hadoopConf: Configuration
  ): Boolean
}

Usage Examples:

import org.apache.spark.SparkConf
import org.apache.spark.kafka010.KafkaDelegationTokenProvider
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials

// Setup Spark configuration with Kafka cluster details
val sparkConf = new SparkConf()
  .set("spark.kafka.clusters.prod.auth.bootstrap.servers", "kafka1:9092,kafka2:9092")
  .set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")
  .set("spark.kafka.clusters.prod.sasl.kerberos.service.name", "kafka")
  .set("spark.kafka.clusters.prod.ssl.truststore.location", "/path/to/truststore.jks")
  .set("spark.kafka.clusters.prod.ssl.truststore.password", "password")

// Create provider and check requirements
val tokenProvider = new KafkaDelegationTokenProvider()
val hadoopConf = new Configuration()

// Check if tokens are needed
if (tokenProvider.delegationTokensRequired(sparkConf, hadoopConf)) {
  println("Delegation tokens are required for configured Kafka clusters")
  
  // Obtain tokens
  val credentials = new Credentials()
  val nextRenewal = tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials)
  
  nextRenewal match {
    case Some(renewalTime) => 
      println(s"Tokens obtained successfully. Next renewal: $renewalTime")
    case None => 
      println("No tokens obtained or renewal not required")
  }
} else {
  println("No delegation tokens required")
}

Multi-Cluster Configuration:

// Configure multiple Kafka clusters
val sparkConf = new SparkConf()
  // Production cluster
  .set("spark.kafka.clusters.prod.auth.bootstrap.servers", "prod-kafka1:9092")
  .set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")
  .set("spark.kafka.clusters.prod.target.bootstrap.servers.regex", "prod-kafka.*:9092")
  
  // Development cluster  
  .set("spark.kafka.clusters.dev.auth.bootstrap.servers", "dev-kafka1:9092")
  .set("spark.kafka.clusters.dev.security.protocol", "SASL_PLAINTEXT")
  .set("spark.kafka.clusters.dev.target.bootstrap.servers.regex", "dev-kafka.*:9092")

val tokenProvider = new KafkaDelegationTokenProvider()
val credentials = new Credentials()

// This will obtain tokens for both clusters
tokenProvider.obtainDelegationTokens(new Configuration(), sparkConf, credentials)

Service Name

Returns the service name identifier for this token provider.

/**
 * Service name identifier for this token provider
 * @return "kafka" - the service name used by Hadoop security framework
 */
def serviceName: String

Token Requirements Check

Determines if delegation tokens are required based on the current configuration and security settings.

/**
 * Checks if delegation tokens are required for any configured Kafka cluster
 * Returns true if any cluster requires delegation tokens based on:
 * - Security protocol (SASL_SSL, SASL_PLAINTEXT, SSL)
 * - Authentication method availability
 * - Current user credentials
 * 
 * @param sparkConf Spark configuration containing cluster settings
 * @param hadoopConf Hadoop configuration
 * @return true if delegation tokens are required, false otherwise
 */
def delegationTokensRequired(sparkConf: SparkConf, hadoopConf: Configuration): Boolean

Token Obtainment

Obtains delegation tokens for all configured Kafka clusters and stores them in the provided credentials.

/**
 * Obtains delegation tokens for all configured Kafka clusters
 * Processes each cluster configuration and:
 * - Validates security requirements
 * - Connects to Kafka cluster using admin client
 * - Requests delegation token with appropriate credentials
 * - Stores token in provided Credentials object
 * - Tracks renewal times across all clusters
 * 
 * @param hadoopConf Hadoop configuration
 * @param sparkConf Spark configuration containing cluster settings under spark.kafka.clusters.*
 * @param creds Credentials object to store obtained tokens
 * @return Optional next renewal time (earliest across all clusters), None if no tokens obtained
 */
def obtainDelegationTokens(
  hadoopConf: Configuration,
  sparkConf: SparkConf,
  creds: Credentials
): Option[Long]

Error Handling

The token provider handles various error conditions:

  • Authentication failures: When Kerberos or SSL authentication fails
  • Network connectivity issues: When Kafka clusters are unreachable
  • Configuration errors: When required configuration parameters are missing
  • Token creation failures: When Kafka cluster rejects token requests
  • Permission issues: When user lacks required permissions for token operations

All errors are logged appropriately and non-fatal errors are handled gracefully to allow other clusters to succeed.