Kafka 0.10+ Token Provider for Streaming - A specialized security module that handles delegation token management for Kafka integration in Apache Spark streaming applications.
npx @tessl/cli install tessl/maven-org-apache-spark--spark-token-provider-kafka-0-10_2-13@4.0.0A specialized security module that handles delegation token management for Kafka integration in Apache Spark streaming applications, providing secure authentication and authorization capabilities when connecting to Kafka clusters using SASL-based security protocols.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-token-provider-kafka-0-10_2.13</artifactId>
<version>4.0.0</version>
</dependency>import org.apache.spark.kafka010.{KafkaDelegationTokenProvider, KafkaConfigUpdater, KafkaTokenUtil}
import org.apache.spark.kafka010.{KafkaTokenClusterConf, KafkaTokenSparkConf}
import org.apache.spark.kafka010.KafkaTokenUtil.KafkaDelegationTokenIdentifierimport org.apache.spark.SparkConf
import org.apache.spark.kafka010.{KafkaDelegationTokenProvider, KafkaTokenSparkConf}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
// Configure Spark for Kafka cluster authentication
val sparkConf = new SparkConf()
.set("spark.kafka.clusters.cluster1.auth.bootstrap.servers", "kafka1:9092,kafka2:9092")
.set("spark.kafka.clusters.cluster1.security.protocol", "SASL_SSL")
.set("spark.kafka.clusters.cluster1.sasl.kerberos.service.name", "kafka")
// Create token provider
val tokenProvider = new KafkaDelegationTokenProvider()
// Check if tokens are required
val hadoopConf = new Configuration()
val credentials = new Credentials()
if (tokenProvider.delegationTokensRequired(sparkConf, hadoopConf)) {
// Obtain delegation tokens
val nextRenewalTime = tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials)
println(s"Tokens obtained, next renewal: $nextRenewalTime")
}The Kafka Token Provider is built around several key components:
KafkaDelegationTokenProvider class implementing Hadoop's HadoopDelegationTokenProvider interfaceKafkaTokenSparkConf for parsing Spark configuration and KafkaConfigUpdater for runtime configuration updatesKafkaTokenUtil object providing low-level token operations and authentication helpersCore delegation token provider that integrates with Apache Spark's security framework to obtain and manage Kafka delegation tokens for secure cluster authentication.
class KafkaDelegationTokenProvider extends HadoopDelegationTokenProvider {
def serviceName: String
def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials
): Option[Long]
def delegationTokensRequired(
sparkConf: SparkConf,
hadoopConf: Configuration
): Boolean
}Configuration utilities for parsing Spark configuration properties and managing Kafka client parameters with security and redaction support.
case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) {
def set(key: String, value: Object): KafkaConfigUpdater.this.type
def setIfUnset(key: String, value: Object): KafkaConfigUpdater.this.type
def setAuthenticationConfigIfNeeded(): KafkaConfigUpdater.this.type
def build(): java.util.Map[String, Object]
}
object KafkaTokenSparkConf {
def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf
def getAllClusterConfigs(sparkConf: SparkConf): Set[KafkaTokenClusterConf]
}Low-level token operations, authentication helpers, and Kafka admin client management for delegation token lifecycle management.
object KafkaTokenUtil {
val TOKEN_KIND: Text
def getTokenService(identifier: String): Text
def obtainToken(
sparkConf: SparkConf,
clusterConf: KafkaTokenClusterConf
): (Token[KafkaDelegationTokenIdentifier], Long)
def checkProxyUser(): Unit
def createAdminClientProperties(
sparkConf: SparkConf,
clusterConf: KafkaTokenClusterConf
): java.util.Properties
def isGlobalJaasConfigurationProvided: Boolean
def findMatchingTokenClusterConfig(
sparkConf: SparkConf,
bootStrapServers: String
): Option[KafkaTokenClusterConf]
def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String
def needTokenUpdate(
params: java.util.Map[String, Object],
clusterConfig: Option[KafkaTokenClusterConf]
): Boolean
def getKeytabJaasParams(
keyTab: String,
principal: String,
kerberosServiceName: String
): String
}Additional utility classes for configuration redaction and error handling.
object KafkaRedactionUtil {
def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]
def redactJaasParam(param: String): String
}
object KafkaTokenProviderExceptions {
def missingKafkaOption(option: String): SparkException
}import org.apache.hadoop.io.Text
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
case class KafkaTokenClusterConf(
identifier: String,
authBootstrapServers: String,
targetServersRegex: String,
securityProtocol: String,
kerberosServiceName: String,
trustStoreType: Option[String],
trustStoreLocation: Option[String],
trustStorePassword: Option[String],
keyStoreType: Option[String],
keyStoreLocation: Option[String],
keyStorePassword: Option[String],
keyPassword: Option[String],
tokenMechanism: String,
specifiedKafkaParams: Map[String, String]
)
// Defined within KafkaTokenUtil object
class KafkaTokenUtil.KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
def getKind: Text
}