Consumer strategies define how the Kafka connector subscribes to and reads from Kafka topics. The connector supports three different subscription patterns to accommodate various use cases.
Base interface for all consumer subscription strategies.
/**
* Base interface for different Kafka subscription strategies
* Handles consumer creation and topic partition discovery
*/
sealed trait ConsumerStrategy {
/** Creates a Kafka consumer with the specified configuration */
def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
/** Creates a Kafka admin client for metadata operations */
def createAdmin(kafkaParams: ju.Map[String, Object]): Admin
/** Discovers all topic partitions assigned to this strategy */
def assignedTopicPartitions(admin: Admin): Set[TopicPartition]
}Direct assignment of specific topic partitions to the consumer. Provides precise control over which partitions are consumed.
/**
* Assigns specific topic partitions to the consumer
* Provides direct control over partition assignment
*
* @param partitions Array of TopicPartition objects to assign
*/
case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategyConfiguration:
Use the assign option with JSON specification of topic partitions:
.option("assign", """{"topic1":[0,1,2],"topic2":[0,1]}""")Usage Examples:
// Assign specific partitions from multiple topics
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"orders":[0,1,2],"payments":[0,1]}""")
.load()
// Assign single partition
val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"high-priority-topic":[0]}""")
.load()When to Use:
Subscribes to a fixed collection of topics by name. Kafka handles partition assignment automatically.
/**
* Subscribes to a fixed collection of topics
* Kafka handles partition assignment automatically
*
* @param topics Sequence of topic names to subscribe to
*/
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategyConfiguration:
Use the subscribe option with comma-delimited topic names:
.option("subscribe", "topic1,topic2,topic3")Usage Examples:
// Subscribe to multiple topics
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "orders,payments,inventory")
.load()
// Subscribe to single topic
val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.load()When to Use:
Uses a regular expression pattern to match topic names. Automatically discovers and subscribes to matching topics.
/**
* Uses regex pattern to specify topics of interest
* Automatically discovers matching topics
*
* @param topicPattern Regular expression pattern for topic matching
*/
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategyConfiguration:
Use the subscribePattern option with a regular expression:
.option("subscribePattern", "user-events-.*")Usage Examples:
// Subscribe to all topics matching a pattern
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribePattern", "logs-.*")
.load()
// Subscribe to topics with specific prefix and suffix
val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribePattern", "analytics-.*-events")
.load()
// Subscribe to topics from specific environment
val df3 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribePattern", "prod-.*")
.load()Pattern Compilation:
The topic pattern is compiled as a Java regex pattern using Pattern.compile(). The connector automatically discovers all topics matching the pattern at startup and during execution.
When to Use:
The connector validates that exactly one strategy is specified:
// Valid - exactly one strategy specified
.option("subscribe", "my-topic")
// Valid - exactly one strategy specified
.option("subscribePattern", "logs-.*")
// Valid - exactly one strategy specified
.option("assign", """{"topic1":[0,1]}""")
// Invalid - no strategy specified
// Will throw: "One of the following options must be specified for Kafka source: subscribe, subscribePattern, assign"
// Invalid - multiple strategies specified
.option("subscribe", "topic1")
.option("subscribePattern", "topic.*")
// Will throw: "Only one of the following options can be specified for Kafka source: subscribe, subscribePattern, assign"// Must be valid JSON with topic-partition mapping
.option("assign", """{"topic1":[0,1,2],"topic2":[0]}""") // Valid
.option("assign", "topic1") // Invalid - not JSON
// Will throw: "No topicpartitions to assign as specified value for option 'assign'"// Must contain at least one non-empty topic name
.option("subscribe", "topic1,topic2") // Valid
.option("subscribe", "topic1") // Valid
.option("subscribe", "") // Invalid - empty
.option("subscribe", ",,,") // Invalid - no valid topics
// Will throw: "No topics to subscribe to as specified value for option 'subscribe'"// Must be non-empty pattern
.option("subscribePattern", "logs-.*") // Valid
.option("subscribePattern", ".*") // Valid
.option("subscribePattern", "") // Invalid - empty pattern
// Will throw: "Pattern to subscribe is empty as specified value for option 'subscribePattern'"All strategies work with the Kafka TopicPartition type:
// From org.apache.kafka.common.TopicPartition
case class TopicPartition(topic: String, partition: Int) {
def topic(): String
def partition(): Int
}Each strategy handles consumer groups differently:
The connector automatically generates unique group IDs to prevent interference between queries:
// Default group ID pattern for streaming queries
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
// Default group ID pattern for batch queries
s"spark-kafka-relation-${UUID.randomUUID}"
// Custom group ID prefix (optional)
.option("groupIdPrefix", "my-app")
// Results in: "my-app-${UUID.randomUUID}-${metadataPath.hashCode}"