0
# Consumer Strategies
1
2
Consumer strategies define how the Kafka connector subscribes to and reads from Kafka topics. The connector supports three different subscription patterns to accommodate various use cases.
3
4
## Capabilities
5
6
### ConsumerStrategy Interface
7
8
Base interface for all consumer subscription strategies.
9
10
```scala { .api }
11
/**
12
* Base interface for different Kafka subscription strategies
13
* Handles consumer creation and topic partition discovery
14
*/
15
sealed trait ConsumerStrategy {
16
17
/** Creates a Kafka consumer with the specified configuration */
18
def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
19
20
/** Creates a Kafka admin client for metadata operations */
21
def createAdmin(kafkaParams: ju.Map[String, Object]): Admin
22
23
/** Discovers all topic partitions assigned to this strategy */
24
def assignedTopicPartitions(admin: Admin): Set[TopicPartition]
25
}
26
```
27
28
### AssignStrategy
29
30
Direct assignment of specific topic partitions to the consumer. Provides precise control over which partitions are consumed.
31
32
```scala { .api }
33
/**
34
* Assigns specific topic partitions to the consumer
35
* Provides direct control over partition assignment
36
*
37
* @param partitions Array of TopicPartition objects to assign
38
*/
39
case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy
40
```
41
42
**Configuration:**
43
44
Use the `assign` option with JSON specification of topic partitions:
45
46
```scala
47
.option("assign", """{"topic1":[0,1,2],"topic2":[0,1]}""")
48
```
49
50
**Usage Examples:**
51
52
```scala
53
// Assign specific partitions from multiple topics
54
val df = spark
55
.readStream
56
.format("kafka")
57
.option("kafka.bootstrap.servers", "localhost:9092")
58
.option("assign", """{"orders":[0,1,2],"payments":[0,1]}""")
59
.load()
60
61
// Assign single partition
62
val df2 = spark
63
.readStream
64
.format("kafka")
65
.option("kafka.bootstrap.servers", "localhost:9092")
66
.option("assign", """{"high-priority-topic":[0]}""")
67
.load()
68
```
69
70
**When to Use:**
71
- When you need precise control over partition assignment
72
- For consuming specific partitions in multi-consumer scenarios
73
- When implementing custom partitioning strategies
74
- For testing with specific partition data
75
76
### SubscribeStrategy
77
78
Subscribes to a fixed collection of topics by name. Kafka handles partition assignment automatically.
79
80
```scala { .api }
81
/**
82
* Subscribes to a fixed collection of topics
83
* Kafka handles partition assignment automatically
84
*
85
* @param topics Sequence of topic names to subscribe to
86
*/
87
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy
88
```
89
90
**Configuration:**
91
92
Use the `subscribe` option with comma-delimited topic names:
93
94
```scala
95
.option("subscribe", "topic1,topic2,topic3")
96
```
97
98
**Usage Examples:**
99
100
```scala
101
// Subscribe to multiple topics
102
val df = spark
103
.readStream
104
.format("kafka")
105
.option("kafka.bootstrap.servers", "localhost:9092")
106
.option("subscribe", "orders,payments,inventory")
107
.load()
108
109
// Subscribe to single topic
110
val df2 = spark
111
.readStream
112
.format("kafka")
113
.option("kafka.bootstrap.servers", "localhost:9092")
114
.option("subscribe", "user-events")
115
.load()
116
```
117
118
**When to Use:**
119
- When you want to consume all partitions from specific topics
120
- For simple topic subscription scenarios
121
- When Kafka's built-in partition assignment is suitable
122
- Most common use case for topic consumption
123
124
### SubscribePatternStrategy
125
126
Uses a regular expression pattern to match topic names. Automatically discovers and subscribes to matching topics.
127
128
```scala { .api }
129
/**
130
* Uses regex pattern to specify topics of interest
131
* Automatically discovers matching topics
132
*
133
* @param topicPattern Regular expression pattern for topic matching
134
*/
135
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy
136
```
137
138
**Configuration:**
139
140
Use the `subscribePattern` option with a regular expression:
141
142
```scala
143
.option("subscribePattern", "user-events-.*")
144
```
145
146
**Usage Examples:**
147
148
```scala
149
// Subscribe to all topics matching a pattern
150
val df = spark
151
.readStream
152
.format("kafka")
153
.option("kafka.bootstrap.servers", "localhost:9092")
154
.option("subscribePattern", "logs-.*")
155
.load()
156
157
// Subscribe to topics with specific prefix and suffix
158
val df2 = spark
159
.readStream
160
.format("kafka")
161
.option("kafka.bootstrap.servers", "localhost:9092")
162
.option("subscribePattern", "analytics-.*-events")
163
.load()
164
165
// Subscribe to topics from specific environment
166
val df3 = spark
167
.readStream
168
.format("kafka")
169
.option("kafka.bootstrap.servers", "localhost:9092")
170
.option("subscribePattern", "prod-.*")
171
.load()
172
```
173
174
**Pattern Compilation:**
175
176
The topic pattern is compiled as a Java regex pattern using `Pattern.compile()`. The connector automatically discovers all topics matching the pattern at startup and during execution.
177
178
**When to Use:**
179
- When topic names follow a predictable pattern
180
- For dynamic topic discovery scenarios
181
- When new topics matching the pattern should be automatically included
182
- For environment-specific topic consumption (dev-, staging-, prod-)
183
184
## Strategy Selection Logic
185
186
The connector validates that exactly one strategy is specified:
187
188
```scala
189
// Valid - exactly one strategy specified
190
.option("subscribe", "my-topic")
191
192
// Valid - exactly one strategy specified
193
.option("subscribePattern", "logs-.*")
194
195
// Valid - exactly one strategy specified
196
.option("assign", """{"topic1":[0,1]}""")
197
198
// Invalid - no strategy specified
199
// Will throw: "One of the following options must be specified for Kafka source: subscribe, subscribePattern, assign"
200
201
// Invalid - multiple strategies specified
202
.option("subscribe", "topic1")
203
.option("subscribePattern", "topic.*")
204
// Will throw: "Only one of the following options can be specified for Kafka source: subscribe, subscribePattern, assign"
205
```
206
207
## Validation Rules
208
209
### AssignStrategy Validation
210
211
```scala
212
// Must be valid JSON with topic-partition mapping
213
.option("assign", """{"topic1":[0,1,2],"topic2":[0]}""") // Valid
214
215
.option("assign", "topic1") // Invalid - not JSON
216
// Will throw: "No topicpartitions to assign as specified value for option 'assign'"
217
```
218
219
### SubscribeStrategy Validation
220
221
```scala
222
// Must contain at least one non-empty topic name
223
.option("subscribe", "topic1,topic2") // Valid
224
.option("subscribe", "topic1") // Valid
225
226
.option("subscribe", "") // Invalid - empty
227
.option("subscribe", ",,,") // Invalid - no valid topics
228
// Will throw: "No topics to subscribe to as specified value for option 'subscribe'"
229
```
230
231
### SubscribePatternStrategy Validation
232
233
```scala
234
// Must be non-empty pattern
235
.option("subscribePattern", "logs-.*") // Valid
236
.option("subscribePattern", ".*") // Valid
237
238
.option("subscribePattern", "") // Invalid - empty pattern
239
// Will throw: "Pattern to subscribe is empty as specified value for option 'subscribePattern'"
240
```
241
242
## TopicPartition Type
243
244
All strategies work with the Kafka `TopicPartition` type:
245
246
```scala { .api }
247
// From org.apache.kafka.common.TopicPartition
248
case class TopicPartition(topic: String, partition: Int) {
249
def topic(): String
250
def partition(): Int
251
}
252
```
253
254
## Consumer Group Behavior
255
256
Each strategy handles consumer groups differently:
257
258
- **AssignStrategy**: Uses generated unique group ID, no rebalancing
259
- **SubscribeStrategy**: Uses generated unique group ID, supports rebalancing
260
- **SubscribePatternStrategy**: Uses generated unique group ID, supports rebalancing and topic discovery
261
262
The connector automatically generates unique group IDs to prevent interference between queries:
263
264
```scala
265
// Default group ID pattern for streaming queries
266
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
267
268
// Default group ID pattern for batch queries
269
s"spark-kafka-relation-${UUID.randomUUID}"
270
271
// Custom group ID prefix (optional)
272
.option("groupIdPrefix", "my-app")
273
// Results in: "my-app-${UUID.randomUUID}-${metadataPath.hashCode}"
274
```