0
# Consumer Strategies
1
2
Consumer strategies encapsulate how Kafka consumers are created and configured on driver and executors. They handle the complex setup required for Kafka 0.10+ consumers, including subscription management, offset initialization, and parameter validation. The strategy pattern allows for flexible consumer configuration while maintaining checkpoint compatibility.
3
4
## Core Strategies
5
6
### Subscribe
7
8
Subscribe to a collection of specific topics for dynamic partition assignment.
9
10
```scala { .api }
11
// Scala versions
12
def Subscribe[K, V](
13
topics: Iterable[String],
14
kafkaParams: collection.Map[String, Object]
15
): ConsumerStrategy[K, V]
16
17
def Subscribe[K, V](
18
topics: Iterable[String],
19
kafkaParams: collection.Map[String, Object],
20
offsets: collection.Map[TopicPartition, Long]
21
): ConsumerStrategy[K, V]
22
23
// Java versions
24
def Subscribe[K, V](
25
topics: java.util.Collection[String],
26
kafkaParams: java.util.Map[String, Object]
27
): ConsumerStrategy[K, V]
28
29
def Subscribe[K, V](
30
topics: java.util.Collection[String],
31
kafkaParams: java.util.Map[String, Object],
32
offsets: java.util.Map[TopicPartition, java.lang.Long]
33
): ConsumerStrategy[K, V]
34
```
35
36
**Parameters:**
37
- `topics`: Collection of topic names to subscribe to
38
- `kafkaParams`: Kafka consumer configuration parameters
39
- `offsets`: Optional initial offsets for specific partitions
40
41
**Use when:** You want to consume from specific known topics with automatic partition discovery.
42
43
### SubscribePattern
44
45
Subscribe to all topics matching a regex pattern for dynamic topic and partition discovery.
46
47
```scala { .api }
48
// Scala versions
49
def SubscribePattern[K, V](
50
pattern: java.util.regex.Pattern,
51
kafkaParams: collection.Map[String, Object]
52
): ConsumerStrategy[K, V]
53
54
def SubscribePattern[K, V](
55
pattern: java.util.regex.Pattern,
56
kafkaParams: collection.Map[String, Object],
57
offsets: collection.Map[TopicPartition, Long]
58
): ConsumerStrategy[K, V]
59
60
// Java versions
61
def SubscribePattern[K, V](
62
pattern: java.util.regex.Pattern,
63
kafkaParams: java.util.Map[String, Object]
64
): ConsumerStrategy[K, V]
65
66
def SubscribePattern[K, V](
67
pattern: java.util.regex.Pattern,
68
kafkaParams: java.util.Map[String, Object],
69
offsets: java.util.Map[TopicPartition, java.lang.Long]
70
): ConsumerStrategy[K, V]
71
```
72
73
**Parameters:**
74
- `pattern`: Regex pattern to match topic names
75
- `kafkaParams`: Kafka consumer configuration parameters
76
- `offsets`: Optional initial offsets for specific partitions
77
78
**Use when:** You want to dynamically discover and consume from topics matching a pattern.
79
80
### Assign
81
82
Assign a fixed collection of specific TopicPartitions for static partition assignment.
83
84
```scala { .api }
85
// Scala versions
86
def Assign[K, V](
87
topicPartitions: Iterable[TopicPartition],
88
kafkaParams: collection.Map[String, Object]
89
): ConsumerStrategy[K, V]
90
91
def Assign[K, V](
92
topicPartitions: Iterable[TopicPartition],
93
kafkaParams: collection.Map[String, Object],
94
offsets: collection.Map[TopicPartition, Long]
95
): ConsumerStrategy[K, V]
96
97
// Java versions
98
def Assign[K, V](
99
topicPartitions: java.util.Collection[TopicPartition],
100
kafkaParams: java.util.Map[String, Object]
101
): ConsumerStrategy[K, V]
102
103
def Assign[K, V](
104
topicPartitions: java.util.Collection[TopicPartition],
105
kafkaParams: java.util.Map[String, Object],
106
offsets: java.util.Map[TopicPartition, java.lang.Long]
107
): ConsumerStrategy[K, V]
108
```
109
110
**Parameters:**
111
- `topicPartitions`: Collection of specific TopicPartitions to assign
112
- `kafkaParams`: Kafka consumer configuration parameters
113
- `offsets`: Optional initial offsets for specific partitions
114
115
**Use when:** You want precise control over which partitions to consume from.
116
117
## Usage Examples
118
119
### Subscribe to Specific Topics
120
121
```scala
122
import org.apache.spark.streaming.kafka010._
123
import org.apache.kafka.common.serialization.StringDeserializer
124
125
val kafkaParams = Map[String, Object](
126
"bootstrap.servers" -> "localhost:9092",
127
"key.deserializer" -> classOf[StringDeserializer],
128
"value.deserializer" -> classOf[StringDeserializer],
129
"group.id" -> "my-consumer-group",
130
"auto.offset.reset" -> "latest",
131
"enable.auto.commit" -> (false: java.lang.Boolean)
132
)
133
134
val topics = Array("orders", "payments", "users")
135
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
136
137
val stream = KafkaUtils.createDirectStream[String, String](
138
streamingContext,
139
LocationStrategies.PreferConsistent,
140
consumerStrategy
141
)
142
```
143
144
### Subscribe with Initial Offsets
145
146
```scala
147
import org.apache.kafka.common.TopicPartition
148
149
val kafkaParams = Map[String, Object](
150
"bootstrap.servers" -> "localhost:9092",
151
"key.deserializer" -> classOf[StringDeserializer],
152
"value.deserializer" -> classOf[StringDeserializer],
153
"group.id" -> "my-consumer-group",
154
"auto.offset.reset" -> "none" // Will use provided offsets
155
)
156
157
val topics = Array("orders", "payments")
158
val offsets = Map(
159
new TopicPartition("orders", 0) -> 100L,
160
new TopicPartition("orders", 1) -> 200L,
161
new TopicPartition("payments", 0) -> 50L
162
)
163
164
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
165
```
166
167
### Pattern-based Topic Subscription
168
169
```scala
170
import java.util.regex.Pattern
171
172
val kafkaParams = Map[String, Object](
173
"bootstrap.servers" -> "localhost:9092",
174
"key.deserializer" -> classOf[StringDeserializer],
175
"value.deserializer" -> classOf[StringDeserializer],
176
"group.id" -> "pattern-consumer-group"
177
)
178
179
// Subscribe to all topics starting with "events-"
180
val pattern = Pattern.compile("events-.*")
181
val consumerStrategy = ConsumerStrategies.SubscribePattern[String, String](pattern, kafkaParams)
182
183
val stream = KafkaUtils.createDirectStream[String, String](
184
streamingContext,
185
LocationStrategies.PreferConsistent,
186
consumerStrategy
187
)
188
```
189
190
### Fixed Partition Assignment
191
192
```scala
193
import org.apache.kafka.common.TopicPartition
194
195
val kafkaParams = Map[String, Object](
196
"bootstrap.servers" -> "localhost:9092",
197
"key.deserializer" -> classOf[StringDeserializer],
198
"value.deserializer" -> classOf[StringDeserializer],
199
"group.id" -> "assigned-consumer-group"
200
)
201
202
// Assign specific partitions
203
val topicPartitions = Array(
204
new TopicPartition("orders", 0),
205
new TopicPartition("orders", 2),
206
new TopicPartition("payments", 1)
207
)
208
209
val consumerStrategy = ConsumerStrategies.Assign[String, String](topicPartitions, kafkaParams)
210
211
val stream = KafkaUtils.createDirectStream[String, String](
212
streamingContext,
213
LocationStrategies.PreferConsistent,
214
consumerStrategy
215
)
216
```
217
218
### Java API Examples
219
220
```java
221
import org.apache.spark.streaming.kafka010.*;
222
import org.apache.kafka.clients.consumer.ConsumerRecord;
223
import org.apache.kafka.common.TopicPartition;
224
import org.apache.kafka.common.serialization.StringDeserializer;
225
226
// Subscribe strategy
227
Map<String, Object> kafkaParams = new HashMap<>();
228
kafkaParams.put("bootstrap.servers", "localhost:9092");
229
kafkaParams.put("key.deserializer", StringDeserializer.class);
230
kafkaParams.put("value.deserializer", StringDeserializer.class);
231
kafkaParams.put("group.id", "java-consumer-group");
232
233
Collection<String> topics = Arrays.asList("topic1", "topic2");
234
ConsumerStrategy<String, String> strategy = ConsumerStrategies.Subscribe(topics, kafkaParams);
235
236
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
237
javaStreamingContext,
238
LocationStrategies.PreferConsistent(),
239
strategy
240
);
241
```
242
243
### Pattern Strategy with Java
244
245
```java
246
import java.util.regex.Pattern;
247
248
Pattern pattern = Pattern.compile("logs-\\d{4}-\\d{2}-\\d{2}");
249
ConsumerStrategy<String, String> patternStrategy =
250
ConsumerStrategies.SubscribePattern(pattern, kafkaParams);
251
252
JavaInputDStream<ConsumerRecord<String, String>> patternStream = KafkaUtils.createDirectStream(
253
javaStreamingContext,
254
LocationStrategies.PreferConsistent(),
255
patternStrategy
256
);
257
```
258
259
## Configuration Details
260
261
### Required Kafka Parameters
262
263
All strategies require these parameters:
264
265
- `bootstrap.servers`: Kafka broker addresses
266
- `key.deserializer`: Key deserializer class
267
- `value.deserializer`: Value deserializer class
268
269
### Important Kafka Parameters
270
271
- `group.id`: Consumer group ID (required for Subscribe and SubscribePattern)
272
- `auto.offset.reset`: What to do when no initial offset ("earliest", "latest", or "none")
273
- `enable.auto.commit`: Set to `false` for manual offset management
274
- `session.timeout.ms`: Session timeout for consumer group management
275
- `heartbeat.interval.ms`: Heartbeat interval for consumer liveness
276
277
### Offset Initialization
278
279
When providing initial offsets:
280
281
1. **With current offsets (restart from checkpoint)**: Uses current offsets from checkpoint
282
2. **Without current offsets (fresh start)**: Uses provided initial offsets
283
3. **No offsets provided**: Uses `auto.offset.reset` configuration or committed offsets
284
285
## Strategy Selection Guidelines
286
287
### Use Subscribe When:
288
- You know the specific topic names to consume from
289
- You want automatic partition discovery as topics scale
290
- You want consumer group management and rebalancing
291
- You need dynamic partition assignment
292
293
### Use SubscribePattern When:
294
- Topics are created dynamically with predictable naming patterns
295
- You want to automatically consume from new matching topics
296
- You have time-based or categorized topic naming schemes
297
- You need the most flexible topic discovery
298
299
### Use Assign When:
300
- You need precise control over partition assignment
301
- You want to avoid consumer group coordination
302
- You're implementing custom partition assignment logic
303
- You're doing partition-specific processing
304
305
## Advanced Configuration
306
307
### KAFKA-3370 Workaround
308
309
The library automatically handles the KAFKA-3370 issue when `auto.offset.reset` is "none":
310
311
```scala
312
val kafkaParams = Map[String, Object](
313
"bootstrap.servers" -> "localhost:9092",
314
"auto.offset.reset" -> "none", // Will trigger automatic workaround
315
// ... other params
316
)
317
318
// The strategy automatically handles NoOffsetForPartitionException
319
val strategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
320
```
321
322
### Consumer Lifecycle Management
323
324
All strategies handle:
325
- Consumer creation and configuration
326
- Subscription or assignment setup
327
- Offset seeking for initial positions
328
- Consumer pause/resume for rate limiting
329
- Proper cleanup and resource management
330
331
## Error Handling
332
333
### Common Exceptions
334
335
- **NoOffsetForPartitionException**: Automatically handled when `auto.offset.reset` is "none"
336
- **InvalidTopicException**: Thrown for invalid topic names
337
- **TimeoutException**: Thrown for broker connectivity issues
338
- **AuthenticationException**: Thrown for authentication failures
339
340
### Configuration Validation
341
342
```scala
343
// Invalid configuration will be caught at consumer creation time
344
val invalidParams = Map[String, Object](
345
"bootstrap.servers" -> "", // Empty - will cause error
346
"key.deserializer" -> "invalid.class.name" // Invalid class - will cause error
347
)
348
349
try {
350
val strategy = ConsumerStrategies.Subscribe[String, String](topics, invalidParams)
351
// Error will occur when consumer is created, not when strategy is created
352
} catch {
353
case e: Exception => println(s"Configuration error: ${e.getMessage}")
354
}
355
```
356
357
## Important Notes
358
359
- All consumer strategies are marked as `@Experimental` in Spark 2.4.8
360
- Consumer strategies are serializable and checkpoint-compatible
361
- The same Kafka parameters are used on driver and executors with automatic modifications
362
- Consumer instances are automatically cached and managed per executor
363
- Pattern-based subscription checks for new topics periodically
364
- Fixed assignment (Assign) doesn't require consumer group coordination