Consumer strategies define how Spark connects to and consumes data from Kafka topics. The module provides three flexible patterns for topic assignment and subscription.
Base trait for all consumer strategies that defines how Kafka consumers are created and configured.
/**
* Base trait for Kafka consumer strategies
*/
sealed trait ConsumerStrategy {
/**
* Creates a Kafka consumer with strategy-specific configuration
* @param kafkaParams Kafka consumer configuration parameters
* @return Configured Kafka consumer
*/
def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
}Strategy for assigning specific topic partitions to consume from. Provides precise control over which partitions are consumed.
/**
* Strategy for assigning specific topic partitions
* @param partitions Array of TopicPartition objects to assign
*/
case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategyUsage Examples:
import org.apache.kafka.common.TopicPartition
// Assign specific partitions
val partitions = Array(
new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1),
new TopicPartition("topic2", 0)
)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"topic1":[0,1],"topic2":[0]}""")
.option("startingOffsets", "earliest")
.load()Strategy for subscribing to specific topic names. Automatically handles partition assignment and rebalancing.
/**
* Strategy for subscribing to specific topic names
* @param topics Sequence of topic names to subscribe to
*/
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategyUsage Examples:
// Subscribe to specific topics
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1,topic2,topic3")
.option("startingOffsets", "latest")
.load()
// Subscribe to single topic
val singleTopicDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.load()Strategy for subscribing to topics matching a regex pattern. Dynamically discovers new topics that match the pattern.
/**
* Strategy for subscribing to topics matching a regex pattern
* @param topicPattern Regular expression pattern for topic names
*/
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategyUsage Examples:
// Subscribe to topics matching pattern
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribePattern", "events_.*")
.option("startingOffsets", "earliest")
.load()
// Pattern for different environments
val envTopics = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribePattern", s"${environment}_.*")
.load()Consumer strategies are integrated with Spark's DataSource options:
// Strategy options (exactly one must be specified)
.option("assign", """{"topic1":[0,1],"topic2":[0]}""") // AssignStrategy
.option("subscribe", "topic1,topic2") // SubscribeStrategy
.option("subscribePattern", "events_.*") // SubscribePatternStrategy
// Additional Kafka consumer parameters
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")The module validates consumer strategy configuration at query startup:
IllegalArgumentException if more than one strategy is specifiedIllegalArgumentException if no strategy is specifiedIllegalArgumentException for malformed partition assignmentsIllegalArgumentException for empty topic listsIllegalArgumentException for empty regex patternsAll consumer strategies support the full range of Kafka consumer configuration through prefixed parameters:
.option("kafka.session.timeout.ms", "30000")
.option("kafka.heartbeat.interval.ms", "3000")
.option("kafka.max.poll.records", "500")
.option("kafka.fetch.min.bytes", "1024")
.option("kafka.fetch.max.wait.ms", "500")Certain Kafka consumer parameters are managed internally and cannot be overridden:
group.id - Automatically generated unique group IDs per queryauto.offset.reset - Controlled via startingOffsets optionkey.deserializer - Fixed to ByteArrayDeserializervalue.deserializer - Fixed to ByteArrayDeserializerenable.auto.commit - Disabled for offset managementinterceptor.classes - Not supported for safety