0
# Consumer Strategies
1
2
Configuration strategies for creating and managing Kafka consumers with different subscription patterns. Consumer strategies encapsulate the complex setup required for Kafka 0.10 consumers and allow the configuration to be checkpointed with Spark Streaming applications.
3
4
## Capabilities
5
6
### Subscribe Strategy
7
8
Subscribe to a collection of specific topics. This is the most common pattern for consuming from known topics.
9
10
```scala { .api }
11
// Scala versions
12
def Subscribe[K, V](
13
topics: Iterable[java.lang.String],
14
kafkaParams: collection.Map[String, Object]
15
): ConsumerStrategy[K, V]
16
17
def Subscribe[K, V](
18
topics: Iterable[java.lang.String],
19
kafkaParams: collection.Map[String, Object],
20
offsets: collection.Map[TopicPartition, Long]
21
): ConsumerStrategy[K, V]
22
23
// Java versions
24
def Subscribe[K, V](
25
topics: java.util.Collection[java.lang.String],
26
kafkaParams: java.util.Map[String, Object]
27
): ConsumerStrategy[K, V]
28
29
def Subscribe[K, V](
30
topics: java.util.Collection[java.lang.String],
31
kafkaParams: java.util.Map[String, Object],
32
offsets: java.util.Map[TopicPartition, java.lang.Long]
33
): ConsumerStrategy[K, V]
34
```
35
36
**Parameters:**
37
- `topics`: Collection of topic names to subscribe to
38
- `kafkaParams`: Kafka configuration parameters (must include "bootstrap.servers")
39
- `offsets`: Optional starting offsets for specific partitions
40
41
**Usage Example (Scala):**
42
43
```scala
44
import org.apache.spark.streaming.kafka010.ConsumerStrategies
45
import org.apache.kafka.common.serialization.StringDeserializer
46
import org.apache.kafka.common.TopicPartition
47
48
val kafkaParams = Map[String, Object](
49
"bootstrap.servers" -> "localhost:9092",
50
"key.deserializer" -> classOf[StringDeserializer],
51
"value.deserializer" -> classOf[StringDeserializer],
52
"group.id" -> "my-consumer-group",
53
"auto.offset.reset" -> "latest",
54
"enable.auto.commit" -> (false: java.lang.Boolean)
55
)
56
57
val topics = Array("orders", "payments", "inventory")
58
59
// Subscribe without specific starting offsets
60
val strategy1 = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
61
62
// Subscribe with specific starting offsets
63
val startingOffsets = Map[TopicPartition, Long](
64
new TopicPartition("orders", 0) -> 1000L,
65
new TopicPartition("orders", 1) -> 2000L,
66
new TopicPartition("payments", 0) -> 500L
67
)
68
69
val strategy2 = ConsumerStrategies.Subscribe[String, String](
70
topics,
71
kafkaParams,
72
startingOffsets
73
)
74
75
val stream = KafkaUtils.createDirectStream[String, String](
76
ssc,
77
LocationStrategies.PreferConsistent,
78
strategy1
79
)
80
```
81
82
**Usage Example (Java):**
83
84
```java
85
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
86
import org.apache.kafka.common.serialization.StringDeserializer;
87
import org.apache.kafka.common.TopicPartition;
88
import java.util.*;
89
90
Map<String, Object> kafkaParams = new HashMap<>();
91
kafkaParams.put("bootstrap.servers", "localhost:9092");
92
kafkaParams.put("key.deserializer", StringDeserializer.class);
93
kafkaParams.put("value.deserializer", StringDeserializer.class);
94
kafkaParams.put("group.id", "my-consumer-group");
95
kafkaParams.put("auto.offset.reset", "latest");
96
97
Collection<String> topics = Arrays.asList("orders", "payments", "inventory");
98
99
ConsumerStrategy<String, String> strategy =
100
ConsumerStrategies.Subscribe(topics, kafkaParams);
101
102
JavaInputDStream<ConsumerRecord<String, String>> stream =
103
KafkaUtils.createDirectStream(
104
jssc,
105
LocationStrategies.PreferConsistent(),
106
strategy
107
);
108
```
109
110
### SubscribePattern Strategy
111
112
Subscribe to all topics matching a specified regular expression pattern. Useful for consuming from dynamically created topics that follow a naming convention.
113
114
```scala { .api }
115
// Scala versions
116
def SubscribePattern[K, V](
117
pattern: java.util.regex.Pattern,
118
kafkaParams: collection.Map[String, Object]
119
): ConsumerStrategy[K, V]
120
121
def SubscribePattern[K, V](
122
pattern: java.util.regex.Pattern,
123
kafkaParams: collection.Map[String, Object],
124
offsets: collection.Map[TopicPartition, Long]
125
): ConsumerStrategy[K, V]
126
127
// Java versions
128
def SubscribePattern[K, V](
129
pattern: java.util.regex.Pattern,
130
kafkaParams: java.util.Map[String, Object]
131
): ConsumerStrategy[K, V]
132
133
def SubscribePattern[K, V](
134
pattern: java.util.regex.Pattern,
135
kafkaParams: java.util.Map[String, Object],
136
offsets: java.util.Map[TopicPartition, java.lang.Long]
137
): ConsumerStrategy[K, V]
138
```
139
140
**Parameters:**
141
- `pattern`: Regular expression pattern to match topic names
142
- `kafkaParams`: Kafka configuration parameters
143
- `offsets`: Optional starting offsets for specific partitions
144
145
**Usage Example (Scala):**
146
147
```scala
148
import org.apache.spark.streaming.kafka010.ConsumerStrategies
149
import java.util.regex.Pattern
150
151
// Subscribe to all topics starting with "user-events-"
152
val topicPattern = Pattern.compile("user-events-.*")
153
154
val strategy = ConsumerStrategies.SubscribePattern[String, String](
155
topicPattern,
156
kafkaParams
157
)
158
159
val stream = KafkaUtils.createDirectStream[String, String](
160
ssc,
161
LocationStrategies.PreferConsistent,
162
strategy
163
)
164
165
// This will automatically consume from topics like:
166
// - user-events-clicks
167
// - user-events-purchases
168
// - user-events-registrations
169
// as they are created
170
```
171
172
**Usage Example (Java):**
173
174
```java
175
import java.util.regex.Pattern;
176
177
Pattern topicPattern = Pattern.compile("log-.*-\\d{4}-\\d{2}-\\d{2}");
178
179
ConsumerStrategy<String, String> strategy =
180
ConsumerStrategies.SubscribePattern(topicPattern, kafkaParams);
181
```
182
183
**Dynamic Topic Discovery:**
184
185
The pattern matching is performed periodically against topics existing at the time of check. New topics matching the pattern will be automatically included in subsequent micro-batches.
186
187
### Assign Strategy
188
189
Assign a fixed collection of specific TopicPartitions. This gives you complete control over which partitions are consumed, useful for advanced use cases requiring precise partition assignment.
190
191
```scala { .api }
192
// Scala versions
193
def Assign[K, V](
194
topicPartitions: Iterable[TopicPartition],
195
kafkaParams: collection.Map[String, Object]
196
): ConsumerStrategy[K, V]
197
198
def Assign[K, V](
199
topicPartitions: Iterable[TopicPartition],
200
kafkaParams: collection.Map[String, Object],
201
offsets: collection.Map[TopicPartition, Long]
202
): ConsumerStrategy[K, V]
203
204
// Java versions
205
def Assign[K, V](
206
topicPartitions: java.util.Collection[TopicPartition],
207
kafkaParams: java.util.Map[String, Object]
208
): ConsumerStrategy[K, V]
209
210
def Assign[K, V](
211
topicPartitions: java.util.Collection[TopicPartition],
212
kafkaParams: java.util.Map[String, Object],
213
offsets: java.util.Map[TopicPartition, java.lang.Long]
214
): ConsumerStrategy[K, V]
215
```
216
217
**Parameters:**
218
- `topicPartitions`: Specific collection of TopicPartitions to consume from
219
- `kafkaParams`: Kafka configuration parameters
220
- `offsets`: Optional starting offsets for the assigned partitions
221
222
**Usage Example (Scala):**
223
224
```scala
225
import org.apache.spark.streaming.kafka010.ConsumerStrategies
226
import org.apache.kafka.common.TopicPartition
227
228
// Assign specific partitions for precise control
229
val assignedPartitions = Array(
230
new TopicPartition("high-priority", 0),
231
new TopicPartition("high-priority", 1),
232
new TopicPartition("medium-priority", 0),
233
new TopicPartition("low-priority", 2)
234
)
235
236
val strategy = ConsumerStrategies.Assign[String, String](
237
assignedPartitions,
238
kafkaParams
239
)
240
241
// With specific starting offsets
242
val partitionOffsets = Map[TopicPartition, Long](
243
new TopicPartition("high-priority", 0) -> 10000L,
244
new TopicPartition("high-priority", 1) -> 15000L,
245
new TopicPartition("medium-priority", 0) -> 5000L,
246
new TopicPartition("low-priority", 2) -> 1000L
247
)
248
249
val strategyWithOffsets = ConsumerStrategies.Assign[String, String](
250
assignedPartitions,
251
kafkaParams,
252
partitionOffsets
253
)
254
255
val stream = KafkaUtils.createDirectStream[String, String](
256
ssc,
257
LocationStrategies.PreferConsistent,
258
strategyWithOffsets
259
)
260
```
261
262
**Usage Example (Java):**
263
264
```java
265
import org.apache.kafka.common.TopicPartition;
266
import java.util.*;
267
268
Collection<TopicPartition> partitions = Arrays.asList(
269
new TopicPartition("transactions", 0),
270
new TopicPartition("transactions", 1),
271
new TopicPartition("transactions", 2)
272
);
273
274
Map<TopicPartition, Long> offsets = new HashMap<>();
275
offsets.put(new TopicPartition("transactions", 0), 50000L);
276
offsets.put(new TopicPartition("transactions", 1), 75000L);
277
offsets.put(new TopicPartition("transactions", 2), 60000L);
278
279
ConsumerStrategy<String, String> strategy =
280
ConsumerStrategies.Assign(partitions, kafkaParams, offsets);
281
```
282
283
## Advanced Configuration
284
285
### Security Configuration
286
287
Consumer strategies automatically handle security configuration updates:
288
289
```scala
290
// Security parameters are automatically processed
291
val secureKafkaParams = Map[String, Object](
292
"bootstrap.servers" -> "secure-broker:9093",
293
"security.protocol" -> "SASL_SSL",
294
"sasl.mechanism" -> "PLAIN",
295
"sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";",
296
// ... other parameters
297
)
298
299
val strategy = ConsumerStrategies.Subscribe[String, String](topics, secureKafkaParams)
300
```
301
302
### Custom Deserializers
303
304
```scala
305
import org.apache.kafka.common.serialization.StringDeserializer
306
import com.example.MyCustomDeserializer
307
308
val kafkaParams = Map[String, Object](
309
"bootstrap.servers" -> "localhost:9092",
310
"key.deserializer" -> classOf[StringDeserializer],
311
"value.deserializer" -> classOf[MyCustomDeserializer],
312
"group.id" -> "custom-deserializer-group"
313
)
314
315
val strategy = ConsumerStrategies.Subscribe[String, MyCustomObject](topics, kafkaParams)
316
```
317
318
### Offset Reset Strategies
319
320
```scala
321
// Start from earliest available offset
322
val earliestParams = kafkaParams + ("auto.offset.reset" -> "earliest")
323
324
// Start from latest offset (default)
325
val latestParams = kafkaParams + ("auto.offset.reset" -> "latest")
326
327
// Fail if no committed offset exists
328
val noneParams = kafkaParams + ("auto.offset.reset" -> "none")
329
330
val strategy = ConsumerStrategies.Subscribe[String, String](topics, earliestParams)
331
```
332
333
## Error Handling and Edge Cases
334
335
### KAFKA-3370 Workaround
336
337
Consumer strategies automatically handle the KAFKA-3370 issue when `auto.offset.reset=none`:
338
339
```scala
340
val strictParams = Map[String, Object](
341
"bootstrap.servers" -> "localhost:9092",
342
"key.deserializer" -> classOf[StringDeserializer],
343
"value.deserializer" -> classOf[StringDeserializer],
344
"group.id" -> "strict-group",
345
"auto.offset.reset" -> "none" // This triggers the workaround
346
)
347
348
// The strategy will handle NoOffsetForPartitionException internally
349
val strategy = ConsumerStrategies.Subscribe[String, String](topics, strictParams)
350
```
351
352
### Automatic Parameter Fixing
353
354
Consumer strategies ensure parameters are properly configured for Spark executors:
355
356
- `enable.auto.commit` is automatically set to `false`
357
- `auto.offset.reset` is set to `"none"` for executors
358
- `group.id` is modified for executors to avoid conflicts
359
- `receive.buffer.bytes` is increased to 65536 as a KAFKA-3135 workaround
360
361
### Pattern Matching Edge Cases
362
363
For `SubscribePattern`, the strategy handles:
364
365
- **Empty matches**: Gracefully handles patterns that match no existing topics
366
- **Dynamic topic creation**: Automatically includes newly created topics in subsequent batches
367
- **Topic deletion**: Continues processing remaining topics if some are deleted
368
369
## Best Practices
370
371
1. **Use Subscribe for known topics**: When you know the exact topic names, prefer `Subscribe` over pattern matching.
372
373
2. **Use SubscribePattern for dynamic topics**: When topics are created dynamically with consistent naming patterns.
374
375
3. **Use Assign for advanced control**: When you need precise control over partition assignment or want to implement custom load balancing.
376
377
4. **Always specify group.id**: Required for offset management and consumer coordination.
378
379
5. **Disable auto-commit**: Set `enable.auto.commit=false` and manage offsets manually for exactly-once semantics.
380
381
6. **Handle starting offsets**: Specify starting offsets when resuming from checkpoints or specific points in time.
382
383
7. **Security first**: Include all necessary security parameters for production deployments.