Configuration utilities for parsing Spark configuration properties and managing Kafka client parameters with security and redaction support.
Utility class for building and updating Kafka configuration parameters with logging and authentication support.
/**
* Class to conveniently update Kafka config params, while logging the changes
* @param module Module name for logging context
* @param kafkaParams Initial Kafka parameters map
*/
case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) extends Logging {
/**
* Sets a configuration parameter
* @param key Configuration parameter key
* @param value Configuration parameter value
* @return Updated KafkaConfigUpdater instance for chaining
*/
def set(key: String, value: Object): this.type
/**
* Sets a configuration parameter only if it's not already set
* @param key Configuration parameter key
* @param value Configuration parameter value
* @return Updated KafkaConfigUpdater instance for chaining
*/
def setIfUnset(key: String, value: Object): this.type
/**
* Configures authentication settings if needed based on environment
* Uses bootstrap servers from kafkaParams to find matching cluster configuration
* @return Updated KafkaConfigUpdater instance for chaining
*/
def setAuthenticationConfigIfNeeded(): this.type
/**
* Configures authentication settings with specific cluster configuration
* @param clusterConfig Optional cluster configuration for authentication
* @return Updated KafkaConfigUpdater instance for chaining
*/
def setAuthenticationConfigIfNeeded(clusterConfig: Option[KafkaTokenClusterConf]): this.type
/**
* Builds and returns the final configuration map
* @return Java Map containing all configuration parameters
*/
def build(): java.util.Map[String, Object]
}Usage Examples:
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenClusterConf}
// Create config updater with initial parameters
val initialParams = Map(
"bootstrap.servers" -> "kafka1:9092,kafka2:9092",
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer"
)
val configUpdater = KafkaConfigUpdater("streaming", initialParams)
// Chain configuration updates
val finalConfig = configUpdater
.set("security.protocol", "SASL_SSL")
.set("sasl.mechanism", "SCRAM-SHA-512")
.setIfUnset("client.id", "spark-streaming-client")
.setAuthenticationConfigIfNeeded()
.build()
// Use the configuration with Kafka clients
println(s"Final config size: ${finalConfig.size()}")Authentication Configuration:
// Configure with cluster-specific authentication
val clusterConf = KafkaTokenClusterConf(
identifier = "prod-cluster",
authBootstrapServers = "kafka1:9092",
targetServersRegex = ".*",
securityProtocol = "SASL_SSL",
kerberosServiceName = "kafka",
// ... other SSL/auth settings
)
val authenticatedConfig = KafkaConfigUpdater("producer", Map.empty)
.set("bootstrap.servers", clusterConf.authBootstrapServers)
.setAuthenticationConfigIfNeeded(Some(clusterConf))
.build()Object providing utilities for parsing Kafka cluster configurations from Spark configuration.
object KafkaTokenSparkConf {
/** Configuration prefix for Kafka clusters */
val CLUSTERS_CONFIG_PREFIX: String = "spark.kafka.clusters."
/** Default target servers regex pattern */
val DEFAULT_TARGET_SERVERS_REGEX: String = ".*"
/** Default Kerberos service name */
val DEFAULT_SASL_KERBEROS_SERVICE_NAME: String = "kafka"
/** Default security protocol */
val DEFAULT_SECURITY_PROTOCOL_CONFIG: String = "SASL_SSL"
/** Default SASL token mechanism */
val DEFAULT_SASL_TOKEN_MECHANISM: String = "SCRAM-SHA-512"
/**
* Parses cluster configuration from Spark config for specific identifier
* @param sparkConf Spark configuration
* @param identifier Cluster identifier
* @return KafkaTokenClusterConf containing parsed cluster settings
*/
def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf
/**
* Gets all configured cluster configurations from Spark config
* Parses all clusters defined under spark.kafka.clusters.*
* @param sparkConf Spark configuration
* @return Set of all configured cluster configurations
*/
def getAllClusterConfigs(sparkConf: SparkConf): Set[KafkaTokenClusterConf]
}Configuration Examples:
import org.apache.spark.SparkConf
import org.apache.spark.kafka010.KafkaTokenSparkConf
// Setup Spark configuration with multiple clusters
val sparkConf = new SparkConf()
// Production cluster configuration
.set("spark.kafka.clusters.prod.auth.bootstrap.servers", "prod-kafka1:9092,prod-kafka2:9092")
.set("spark.kafka.clusters.prod.target.bootstrap.servers.regex", "prod-kafka.*:9092")
.set("spark.kafka.clusters.prod.security.protocol", "SASL_SSL")
.set("spark.kafka.clusters.prod.sasl.kerberos.service.name", "kafka")
.set("spark.kafka.clusters.prod.ssl.truststore.location", "/etc/kafka/truststore.jks")
.set("spark.kafka.clusters.prod.ssl.truststore.password", "truststore-password")
// Development cluster configuration
.set("spark.kafka.clusters.dev.auth.bootstrap.servers", "dev-kafka1:9092")
.set("spark.kafka.clusters.dev.security.protocol", "SASL_PLAINTEXT")
.set("spark.kafka.clusters.dev.sasl.token.mechanism", "SCRAM-SHA-256")
// Get specific cluster configuration
val prodClusterConf = KafkaTokenSparkConf.getClusterConfig(sparkConf, "prod")
println(s"Production cluster: ${prodClusterConf.authBootstrapServers}")
// Get all cluster configurations
val allClusters = KafkaTokenSparkConf.getAllClusterConfigs(sparkConf)
println(s"Found ${allClusters.size} configured clusters")
allClusters.foreach { cluster =>
println(s"Cluster ${cluster.identifier}: ${cluster.securityProtocol}")
}Configuration data class representing a single Kafka cluster's settings.
/**
* Configuration data for a Kafka cluster
* @param identifier Unique cluster identifier
* @param authBootstrapServers Bootstrap servers for authentication
* @param targetServersRegex Regex pattern for target servers
* @param securityProtocol Security protocol (SASL_SSL, SSL, SASL_PLAINTEXT)
* @param kerberosServiceName Kerberos service name for SASL authentication
* @param trustStoreType Optional SSL truststore type
* @param trustStoreLocation Optional SSL truststore location
* @param trustStorePassword Optional SSL truststore password
* @param keyStoreType Optional SSL keystore type
* @param keyStoreLocation Optional SSL keystore location
* @param keyStorePassword Optional SSL keystore password
* @param keyPassword Optional SSL key password
* @param tokenMechanism SASL token mechanism (default: SCRAM-SHA-512)
* @param specifiedKafkaParams Additional Kafka parameters
*/
case class KafkaTokenClusterConf(
identifier: String,
authBootstrapServers: String,
targetServersRegex: String,
securityProtocol: String,
kerberosServiceName: String,
trustStoreType: Option[String],
trustStoreLocation: Option[String],
trustStorePassword: Option[String],
keyStoreType: Option[String],
keyStoreLocation: Option[String],
keyStorePassword: Option[String],
keyPassword: Option[String],
tokenMechanism: String,
specifiedKafkaParams: Map[String, String]
) {
/**
* String representation with redacted sensitive fields
* @return String representation suitable for logging
*/
override def toString: String
}The library uses Spark configuration properties with the prefix spark.kafka.clusters.<identifier> for cluster-specific settings:
auth.bootstrap.servers - Bootstrap servers for token acquisitionsecurity.protocol - Security protocol (SASL_SSL, SSL, SASL_PLAINTEXT)target.bootstrap.servers.regex - Target servers regex pattern (default: ".*")sasl.kerberos.service.name - Kerberos service name (default: "kafka")sasl.token.mechanism - Token mechanism (default: "SCRAM-SHA-512")ssl.truststore.type - Truststore typessl.truststore.location - Truststore file locationssl.truststore.password - Truststore passwordssl.keystore.type - Keystore typessl.keystore.location - Keystore file locationssl.keystore.password - Keystore passwordssl.key.password - Key passwordkafka.* - Additional Kafka client properties (prefixed with kafka.)Configuration Example:
# Production Kafka cluster
spark.kafka.clusters.prod.auth.bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
spark.kafka.clusters.prod.target.bootstrap.servers.regex=kafka[1-3]:9092
spark.kafka.clusters.prod.security.protocol=SASL_SSL
spark.kafka.clusters.prod.sasl.kerberos.service.name=kafka
spark.kafka.clusters.prod.ssl.truststore.location=/etc/kafka/client.truststore.jks
spark.kafka.clusters.prod.ssl.truststore.password=changeit
spark.kafka.clusters.prod.sasl.token.mechanism=SCRAM-SHA-512
# Additional Kafka client properties
spark.kafka.clusters.prod.kafka.client.id=spark-streaming-app
spark.kafka.clusters.prod.kafka.session.timeout.ms=30000