Configuration options for connecting to Kafka clusters and managing consumer/producer behavior.
Required configuration for connecting to Kafka brokers.
/**
* Kafka bootstrap servers configuration (required)
* Comma-separated list of Kafka broker addresses
*/
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")Usage Example:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.load()Different strategies for subscribing to Kafka topics. You must specify exactly one strategy.
Subscribe to a fixed collection of topics.
/**
* Subscribe to specific topics by name
* @param topics - Comma-separated list of topic names
*/
.option("subscribe", "topic1,topic2,topic3")Usage Example:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "orders,payments,inventory")
.load()Subscribe to topics using a regular expression pattern.
/**
* Subscribe to topics matching a regex pattern
* @param pattern - Regular expression for topic names
*/
.option("subscribePattern", "logs-.*")Usage Example:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribePattern", "events-\\d{4}-\\d{2}") // matches events-2023-01, etc.
.load()Assign specific topic partitions directly.
/**
* Assign specific topic partitions
* @param partitions - JSON object mapping topics to partition arrays
*/
.option("assign", """{"topic1":[0,1,2],"topic2":[0]}""")Usage Example:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"high-priority":[0,1],"normal":[0]}""")
.load()Control consumer group behavior for streaming queries.
/**
* Prefix for automatically generated consumer group IDs
* Default: "spark-kafka-source" for streaming, "spark-kafka-relation" for batch
*/
.option("groupIdPrefix", "my-app")
/**
* Custom group ID (not recommended for concurrent queries)
* Warning: Multiple queries with same group ID will interfere
*/
.option("kafka.group.id", "custom-group")Usage Example:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("groupIdPrefix", "analytics-pipeline")
.load()Pass-through configuration for underlying Kafka clients using kafka. prefix.
/**
* Consumer configuration options
* All options prefixed with "kafka." are passed to KafkaConsumer
*/
.option("kafka.session.timeout.ms", "30000")
.option("kafka.request.timeout.ms", "40000")
.option("kafka.max.poll.records", "1000")
.option("kafka.fetch.min.bytes", "1024")
.option("kafka.receive.buffer.bytes", "65536")
/**
* Producer configuration options (for writing)
* All options prefixed with "kafka." are passed to KafkaProducer
*/
.option("kafka.acks", "all")
.option("kafka.retries", "5")
.option("kafka.batch.size", "16384")
.option("kafka.linger.ms", "5")Restricted Options:
The following Kafka options are not supported and will cause errors:
kafka.group.id - Use groupIdPrefix insteadkafka.auto.offset.reset - Use startingOffsets insteadkafka.key.deserializer - Fixed to ByteArrayDeserializerkafka.value.deserializer - Fixed to ByteArrayDeserializerkafka.key.serializer - Fixed to ByteArraySerializer (writing)kafka.value.serializer - Fixed to ByteArraySerializer (writing)kafka.enable.auto.commit - Managed by Sparkkafka.interceptor.classes - Not supportedUsage Example:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "high-volume-topic")
.option("kafka.session.timeout.ms", "10000")
.option("kafka.max.poll.records", "500")
.option("kafka.fetch.min.bytes", "50000")
.load()Authentication and encryption settings for secure Kafka clusters.
/**
* Security protocol configuration
*/
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";")
/**
* SSL configuration
*/
.option("kafka.ssl.truststore.location", "/path/to/truststore.jks")
.option("kafka.ssl.truststore.password", "truststore-password")
.option("kafka.ssl.keystore.location", "/path/to/keystore.jks")
.option("kafka.ssl.keystore.password", "keystore-password")Usage Example:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "secure-broker:9093")
.option("subscribe", "secure-topic")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"myuser\" password=\"mypassword\";")
.load()// Consumer strategy types
sealed trait ConsumerStrategy {
def createConsumer(kafkaParams: java.util.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
}
case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy