0
# Consumer Strategies
1
2
Consumer strategies define how Spark connects to and consumes data from Kafka topics. The module provides three flexible patterns for topic assignment and subscription.
3
4
## Capabilities
5
6
### Consumer Strategy Base
7
8
Base trait for all consumer strategies that defines how Kafka consumers are created and configured.
9
10
```scala { .api }
11
/**
12
* Base trait for Kafka consumer strategies
13
*/
14
sealed trait ConsumerStrategy {
15
/**
16
* Creates a Kafka consumer with strategy-specific configuration
17
* @param kafkaParams Kafka consumer configuration parameters
18
* @return Configured Kafka consumer
19
*/
20
def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
21
}
22
```
23
24
### Assign Strategy
25
26
Strategy for assigning specific topic partitions to consume from. Provides precise control over which partitions are consumed.
27
28
```scala { .api }
29
/**
30
* Strategy for assigning specific topic partitions
31
* @param partitions Array of TopicPartition objects to assign
32
*/
33
case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy
34
```
35
36
**Usage Examples:**
37
38
```scala
39
import org.apache.kafka.common.TopicPartition
40
41
// Assign specific partitions
42
val partitions = Array(
43
new TopicPartition("topic1", 0),
44
new TopicPartition("topic1", 1),
45
new TopicPartition("topic2", 0)
46
)
47
48
val df = spark
49
.readStream
50
.format("kafka")
51
.option("kafka.bootstrap.servers", "localhost:9092")
52
.option("assign", """{"topic1":[0,1],"topic2":[0]}""")
53
.option("startingOffsets", "earliest")
54
.load()
55
```
56
57
### Subscribe Strategy
58
59
Strategy for subscribing to specific topic names. Automatically handles partition assignment and rebalancing.
60
61
```scala { .api }
62
/**
63
* Strategy for subscribing to specific topic names
64
* @param topics Sequence of topic names to subscribe to
65
*/
66
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy
67
```
68
69
**Usage Examples:**
70
71
```scala
72
// Subscribe to specific topics
73
val df = spark
74
.readStream
75
.format("kafka")
76
.option("kafka.bootstrap.servers", "localhost:9092")
77
.option("subscribe", "topic1,topic2,topic3")
78
.option("startingOffsets", "latest")
79
.load()
80
81
// Subscribe to single topic
82
val singleTopicDF = spark
83
.readStream
84
.format("kafka")
85
.option("kafka.bootstrap.servers", "localhost:9092")
86
.option("subscribe", "events")
87
.load()
88
```
89
90
### Subscribe Pattern Strategy
91
92
Strategy for subscribing to topics matching a regex pattern. Dynamically discovers new topics that match the pattern.
93
94
```scala { .api }
95
/**
96
* Strategy for subscribing to topics matching a regex pattern
97
* @param topicPattern Regular expression pattern for topic names
98
*/
99
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy
100
```
101
102
**Usage Examples:**
103
104
```scala
105
// Subscribe to topics matching pattern
106
val df = spark
107
.readStream
108
.format("kafka")
109
.option("kafka.bootstrap.servers", "localhost:9092")
110
.option("subscribePattern", "events_.*")
111
.option("startingOffsets", "earliest")
112
.load()
113
114
// Pattern for different environments
115
val envTopics = spark
116
.readStream
117
.format("kafka")
118
.option("kafka.bootstrap.servers", "localhost:9092")
119
.option("subscribePattern", s"${environment}_.*")
120
.load()
121
```
122
123
## Strategy Selection Guidelines
124
125
### Use AssignStrategy when:
126
- You need precise control over partition assignment
127
- Working with a static set of partitions
128
- Implementing custom partition assignment logic
129
- Handling partition-specific processing requirements
130
131
### Use SubscribeStrategy when:
132
- Working with a known set of topic names
133
- Want automatic partition assignment and rebalancing
134
- Topics may gain or lose partitions dynamically
135
- Standard consumer group behavior is desired
136
137
### Use SubscribePatternStrategy when:
138
- Topics are created dynamically and follow naming patterns
139
- Working with multi-tenant systems with topic per tenant
140
- Need to consume from topics that may not exist at startup
141
- Topic names follow predictable regex patterns
142
143
## Configuration Integration
144
145
Consumer strategies are integrated with Spark's DataSource options:
146
147
```scala
148
// Strategy options (exactly one must be specified)
149
.option("assign", """{"topic1":[0,1],"topic2":[0]}""") // AssignStrategy
150
.option("subscribe", "topic1,topic2") // SubscribeStrategy
151
.option("subscribePattern", "events_.*") // SubscribePatternStrategy
152
153
// Additional Kafka consumer parameters
154
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
155
.option("kafka.security.protocol", "SASL_SSL")
156
.option("kafka.sasl.mechanism", "PLAIN")
157
```
158
159
## Error Handling
160
161
The module validates consumer strategy configuration at query startup:
162
163
- **Multiple strategies**: Throws `IllegalArgumentException` if more than one strategy is specified
164
- **No strategy**: Throws `IllegalArgumentException` if no strategy is specified
165
- **Invalid assign JSON**: Throws `IllegalArgumentException` for malformed partition assignments
166
- **Empty subscribe list**: Throws `IllegalArgumentException` for empty topic lists
167
- **Empty pattern**: Throws `IllegalArgumentException` for empty regex patterns
168
169
## Advanced Configuration
170
171
### Kafka Consumer Parameters
172
173
All consumer strategies support the full range of Kafka consumer configuration through prefixed parameters:
174
175
```scala
176
.option("kafka.session.timeout.ms", "30000")
177
.option("kafka.heartbeat.interval.ms", "3000")
178
.option("kafka.max.poll.records", "500")
179
.option("kafka.fetch.min.bytes", "1024")
180
.option("kafka.fetch.max.wait.ms", "500")
181
```
182
183
### Unsupported Parameters
184
185
Certain Kafka consumer parameters are managed internally and cannot be overridden:
186
187
- `group.id` - Automatically generated unique group IDs per query
188
- `auto.offset.reset` - Controlled via `startingOffsets` option
189
- `key.deserializer` - Fixed to `ByteArrayDeserializer`
190
- `value.deserializer` - Fixed to `ByteArrayDeserializer`
191
- `enable.auto.commit` - Disabled for offset management
192
- `interceptor.classes` - Not supported for safety