Additional utility classes for configuration redaction and error handling.
Object providing utilities for redacting sensitive configuration parameters in logging and debugging output.
object KafkaRedactionUtil extends Logging {
/**
* Redacts sensitive parameters in configuration sequences
* Applies redaction patterns to sensitive keys like passwords, tokens, and credentials
* Special handling for SASL JAAS configuration parameters
*
* @param params Sequence of key-value pairs to redact
* @return Sequence of redacted key-value pairs with sensitive values replaced
*/
def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]
/**
* Redacts JAAS configuration passwords specifically
* Replaces password values in JAAS parameter strings with redaction placeholder
*
* @param param JAAS parameter string potentially containing passwords
* @return JAAS parameter string with passwords redacted
*/
def redactJaasParam(param: String): String
}Usage Examples:
import org.apache.spark.kafka010.KafkaRedactionUtil
// Redact sensitive configuration parameters
val sensitiveParams = Seq(
("bootstrap.servers", "kafka1:9092,kafka2:9092"),
("sasl.jaas.config", "ScramLoginModule required username=\"user\" password=\"secret123\";"),
("ssl.keystore.password", "keystore-password"),
("security.protocol", "SASL_SSL")
)
val redactedParams = KafkaRedactionUtil.redactParams(sensitiveParams)
redactedParams.foreach { case (key, value) =>
println(s"$key = $value")
}
// Output:
// bootstrap.servers = kafka1:9092,kafka2:9092
// sasl.jaas.config = ScramLoginModule required username="user" password="[REDACTED]";
// ssl.keystore.password = [REDACTED]
// security.protocol = SASL_SSL
// Redact JAAS parameters specifically
val jaasConfig = """ScramLoginModule required
|username="myuser"
|password="mypassword123";""".stripMargin.replace("\n", " ")
val redactedJaas = KafkaRedactionUtil.redactJaasParam(jaasConfig)
println(redactedJaas)
// Output: ScramLoginModule required username="myuser" password="[REDACTED]";Redacts sensitive parameters in configuration sequences using Spark's built-in redaction patterns.
/**
* Redacts sensitive parameters in configuration sequences
* Uses Spark's SECRET_REDACTION_PATTERN configuration to identify sensitive keys
* Applies special handling for SASL JAAS configuration parameters
*
* @param params Sequence of key-value pairs where values may contain sensitive information
* @return Sequence of key-value pairs with sensitive values replaced by redaction placeholder
*/
def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]Specifically redacts password fields in JAAS configuration strings.
/**
* Redacts JAAS configuration passwords specifically
* Uses regex pattern matching to find and replace password values in JAAS parameter strings
* Preserves the structure of JAAS configuration while hiding sensitive password values
*
* @param param JAAS parameter string potentially containing password="value" patterns
* @return JAAS parameter string with password values replaced by redaction placeholder
*/
def redactJaasParam(param: String): StringObject providing factory methods for creating standardized exceptions related to Kafka token provider operations.
object KafkaTokenProviderExceptions {
/**
* Creates exception for missing Kafka configuration options
* Generates standardized SparkException with appropriate error class and message
*
* @param option Name of the missing Kafka configuration option
* @return SparkException with error class MISSING_KAFKA_OPTION
*/
def missingKafkaOption(option: String): SparkException
}Usage Examples:
import org.apache.spark.kafka010.KafkaTokenProviderExceptions
import org.apache.kafka.clients.CommonClientConfigs
// Validate required configuration options
val bootstrapServers = kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
if (bootstrapServers.isEmpty) {
throw KafkaTokenProviderExceptions.missingKafkaOption(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG
)
}
// This creates a SparkException with:
// - Error class: "MISSING_KAFKA_OPTION"
// - Message: contextual error message about the missing option
// - Parameters: Map containing the missing option nameCreates standardized exceptions for missing Kafka configuration options.
/**
* Creates exception for missing Kafka configuration options
* Generates a SparkException with:
* - Error class: "MISSING_KAFKA_OPTION"
* - Descriptive error message loaded from error conditions JSON
* - Message parameters including the missing option name
*
* @param option Name of the missing Kafka configuration option (e.g., "bootstrap.servers")
* @return SparkException configured with appropriate error class and parameters
*/
def missingKafkaOption(option: String): SparkExceptionThese utility classes are designed to support secure logging practices:
Security Considerations:
// Always use redaction utilities when logging configuration
val configToLog = KafkaRedactionUtil.redactParams(kafkaConfig.toSeq)
logger.info(s"Kafka configuration: $configToLog")
// Never log raw JAAS configurations - always redact passwords
val safeJaasConfig = KafkaRedactionUtil.redactJaasParam(jaasConfig)
logger.debug(s"JAAS configuration: $safeJaasConfig")