Kafka 0.10+ Token Provider for Streaming - A specialized security module that handles delegation token management for Kafka integration in Apache Spark streaming applications.
Additional utility classes for configuration redaction and error handling.
Object providing utilities for redacting sensitive configuration parameters in logging and debugging output.
object KafkaRedactionUtil extends Logging {
/**
* Redacts sensitive parameters in configuration sequences
* Applies redaction patterns to sensitive keys like passwords, tokens, and credentials
* Special handling for SASL JAAS configuration parameters
*
* @param params Sequence of key-value pairs to redact
* @return Sequence of redacted key-value pairs with sensitive values replaced
*/
def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]
/**
* Redacts JAAS configuration passwords specifically
* Replaces password values in JAAS parameter strings with redaction placeholder
*
* @param param JAAS parameter string potentially containing passwords
* @return JAAS parameter string with passwords redacted
*/
def redactJaasParam(param: String): String
}Usage Examples:
import org.apache.spark.kafka010.KafkaRedactionUtil
// Redact sensitive configuration parameters
val sensitiveParams = Seq(
("bootstrap.servers", "kafka1:9092,kafka2:9092"),
("sasl.jaas.config", "ScramLoginModule required username=\"user\" password=\"secret123\";"),
("ssl.keystore.password", "keystore-password"),
("security.protocol", "SASL_SSL")
)
val redactedParams = KafkaRedactionUtil.redactParams(sensitiveParams)
redactedParams.foreach { case (key, value) =>
println(s"$key = $value")
}
// Output:
// bootstrap.servers = kafka1:9092,kafka2:9092
// sasl.jaas.config = ScramLoginModule required username="user" password="[REDACTED]";
// ssl.keystore.password = [REDACTED]
// security.protocol = SASL_SSL
// Redact JAAS parameters specifically
val jaasConfig = """ScramLoginModule required
|username="myuser"
|password="mypassword123";""".stripMargin.replace("\n", " ")
val redactedJaas = KafkaRedactionUtil.redactJaasParam(jaasConfig)
println(redactedJaas)
// Output: ScramLoginModule required username="myuser" password="[REDACTED]";Redacts sensitive parameters in configuration sequences using Spark's built-in redaction patterns.
/**
* Redacts sensitive parameters in configuration sequences
* Uses Spark's SECRET_REDACTION_PATTERN configuration to identify sensitive keys
* Applies special handling for SASL JAAS configuration parameters
*
* @param params Sequence of key-value pairs where values may contain sensitive information
* @return Sequence of key-value pairs with sensitive values replaced by redaction placeholder
*/
def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]Specifically redacts password fields in JAAS configuration strings.
/**
* Redacts JAAS configuration passwords specifically
* Uses regex pattern matching to find and replace password values in JAAS parameter strings
* Preserves the structure of JAAS configuration while hiding sensitive password values
*
* @param param JAAS parameter string potentially containing password="value" patterns
* @return JAAS parameter string with password values replaced by redaction placeholder
*/
def redactJaasParam(param: String): StringObject providing factory methods for creating standardized exceptions related to Kafka token provider operations.
object KafkaTokenProviderExceptions {
/**
* Creates exception for missing Kafka configuration options
* Generates standardized SparkException with appropriate error class and message
*
* @param option Name of the missing Kafka configuration option
* @return SparkException with error class MISSING_KAFKA_OPTION
*/
def missingKafkaOption(option: String): SparkException
}Usage Examples:
import org.apache.spark.kafka010.KafkaTokenProviderExceptions
import org.apache.kafka.clients.CommonClientConfigs
// Validate required configuration options
val bootstrapServers = kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
if (bootstrapServers.isEmpty) {
throw KafkaTokenProviderExceptions.missingKafkaOption(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG
)
}
// This creates a SparkException with:
// - Error class: "MISSING_KAFKA_OPTION"
// - Message: contextual error message about the missing option
// - Parameters: Map containing the missing option nameCreates standardized exceptions for missing Kafka configuration options.
/**
* Creates exception for missing Kafka configuration options
* Generates a SparkException with:
* - Error class: "MISSING_KAFKA_OPTION"
* - Descriptive error message loaded from error conditions JSON
* - Message parameters including the missing option name
*
* @param option Name of the missing Kafka configuration option (e.g., "bootstrap.servers")
* @return SparkException configured with appropriate error class and parameters
*/
def missingKafkaOption(option: String): SparkExceptionThese utility classes are designed to support secure logging practices:
Security Considerations:
// Always use redaction utilities when logging configuration
val configToLog = KafkaRedactionUtil.redactParams(kafkaConfig.toSeq)
logger.info(s"Kafka configuration: $configToLog")
// Never log raw JAAS configurations - always redact passwords
val safeJaasConfig = KafkaRedactionUtil.redactJaasParam(jaasConfig)
logger.debug(s"JAAS configuration: $safeJaasConfig")Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-token-provider-kafka-0-10-2-13