0
# Location Strategies
1
2
Location strategies control how Kafka consumers are scheduled for TopicPartitions on Spark executors. Since Kafka 0.10+ consumers prefetch messages, it's crucial for performance to keep cached consumers on appropriate executors rather than recreating them for every partition. The choice of location is a preference, not an absolute requirement - partitions may be scheduled elsewhere if needed.
3
4
## Core Strategies
5
6
### PreferConsistent
7
8
The recommended default strategy that consistently distributes partitions across all available executors.
9
10
```scala { .api }
11
def PreferConsistent: LocationStrategy
12
```
13
14
**Use when:** In most cases - provides good load balancing across executors.
15
16
**Characteristics:**
17
- Distributes partitions evenly across all executors
18
- Maintains consistent assignment across batches
19
- No special host requirements
20
- Best general-purpose strategy
21
22
### PreferBrokers
23
24
Strategy for when your Spark executors are co-located on the same nodes as your Kafka brokers.
25
26
```scala { .api }
27
def PreferBrokers: LocationStrategy
28
```
29
30
**Use when:** Your executors run on the same physical nodes as Kafka brokers.
31
32
**Characteristics:**
33
- Attempts to place consumers on the same hosts as Kafka brokers
34
- Reduces network overhead by keeping data local
35
- Requires executors to be on broker nodes
36
- Cannot be used with `createRDD` (no driver consumer available)
37
38
### PreferFixed (Scala Collection)
39
40
Strategy for custom partition-to-host mapping when you have uneven load distribution or specific placement requirements.
41
42
```scala { .api }
43
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy
44
```
45
46
**Parameters:**
47
- `hostMap`: collection.Map[TopicPartition, String] - Mapping from TopicPartition to preferred host
48
49
**Use when:** You have specific knowledge about partition load or host capabilities.
50
51
### PreferFixed (Java Collection)
52
53
Java version of the fixed mapping strategy.
54
55
```java { .api }
56
public static LocationStrategy PreferFixed(java.util.Map<TopicPartition, String> hostMap)
57
```
58
59
**Parameters:**
60
- `hostMap`: java.util.Map[TopicPartition, String] - Mapping from TopicPartition to preferred host
61
62
## Usage Examples
63
64
### Default Strategy (Recommended)
65
66
```scala
67
import org.apache.spark.streaming.kafka010._
68
69
// Use PreferConsistent in most cases
70
val stream = KafkaUtils.createDirectStream[String, String](
71
streamingContext,
72
LocationStrategies.PreferConsistent,
73
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
74
)
75
```
76
77
### Broker Co-location Strategy
78
79
```scala
80
import org.apache.spark.streaming.kafka010._
81
82
// Use when executors are on same nodes as Kafka brokers
83
val stream = KafkaUtils.createDirectStream[String, String](
84
streamingContext,
85
LocationStrategies.PreferBrokers,
86
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
87
)
88
89
// Note: Cannot be used with createRDD
90
// This will throw AssertionError:
91
// val rdd = KafkaUtils.createRDD[String, String](
92
// sparkContext, kafkaParams, offsetRanges, LocationStrategies.PreferBrokers
93
// )
94
```
95
96
### Custom Fixed Mapping (Scala)
97
98
```scala
99
import org.apache.spark.streaming.kafka010._
100
import org.apache.kafka.common.TopicPartition
101
102
// Define custom partition-to-host mapping
103
val hostMap = Map(
104
new TopicPartition("high-volume-topic", 0) -> "executor-host-1",
105
new TopicPartition("high-volume-topic", 1) -> "executor-host-2",
106
new TopicPartition("low-volume-topic", 0) -> "executor-host-3"
107
)
108
109
val stream = KafkaUtils.createDirectStream[String, String](
110
streamingContext,
111
LocationStrategies.PreferFixed(hostMap),
112
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
113
)
114
```
115
116
### Custom Fixed Mapping (Java)
117
118
```java
119
import org.apache.spark.streaming.kafka010.*;
120
import org.apache.kafka.common.TopicPartition;
121
122
Map<TopicPartition, String> hostMap = new HashMap<>();
123
hostMap.put(new TopicPartition("topic1", 0), "host1");
124
hostMap.put(new TopicPartition("topic1", 1), "host2");
125
hostMap.put(new TopicPartition("topic2", 0), "host3");
126
127
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
128
javaStreamingContext,
129
LocationStrategies.PreferFixed(hostMap),
130
ConsumerStrategies.Subscribe(topics, kafkaParams)
131
);
132
```
133
134
### Fallback Behavior with PreferFixed
135
136
```scala
137
import org.apache.spark.streaming.kafka010._
138
import org.apache.kafka.common.TopicPartition
139
140
// Only specify mapping for some partitions
141
val partialHostMap = Map(
142
new TopicPartition("critical-topic", 0) -> "high-performance-host"
143
// Other partitions not specified will use consistent location
144
)
145
146
val stream = KafkaUtils.createDirectStream[String, String](
147
streamingContext,
148
LocationStrategies.PreferFixed(partialHostMap),
149
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
150
)
151
152
// Any TopicPartition not in the map will use consistent location strategy
153
```
154
155
## Strategy Selection Guidelines
156
157
### Use PreferConsistent When:
158
- You don't have specific host requirements
159
- You want simple, balanced partition distribution
160
- You're unsure which strategy to use (default choice)
161
- Your Kafka brokers and Spark executors are on different nodes
162
163
### Use PreferBrokers When:
164
- Your Spark executors run on the same nodes as Kafka brokers
165
- You want to minimize network I/O
166
- You're only using streaming (not batch RDD operations)
167
- You have control over both Kafka and Spark cluster deployment
168
169
### Use PreferFixed When:
170
- You have uneven partition loads and want specific placement
171
- Some partitions require more processing power than others
172
- You have detailed knowledge of your cluster topology
173
- You need to isolate certain partitions on specific hosts
174
175
## Performance Considerations
176
177
### Consumer Caching
178
- Kafka consumers are cached per executor to avoid recreation overhead
179
- Location preferences help maintain cache effectiveness
180
- Consistent placement improves consumer reuse across batches
181
182
### Network Locality
183
- PreferBrokers reduces network traffic when possible
184
- PreferFixed allows fine-tuned placement for optimal network usage
185
- PreferConsistent provides balanced load without network optimization
186
187
### Load Balancing
188
- PreferConsistent ensures even distribution across executors
189
- PreferFixed allows custom load balancing based on partition characteristics
190
- PreferBrokers may create uneven load if broker hosts have different capabilities
191
192
## Error Handling
193
194
### PreferBrokers with createRDD
195
```scala
196
// This will throw AssertionError
197
try {
198
val rdd = KafkaUtils.createRDD[String, String](
199
sparkContext, kafkaParams, offsetRanges, LocationStrategies.PreferBrokers
200
)
201
} catch {
202
case e: AssertionError =>
203
println("PreferBrokers cannot be used with createRDD - use PreferConsistent or PreferFixed")
204
}
205
```
206
207
## Important Notes
208
209
- All location strategies are marked as `@Experimental` in Spark 2.4.8
210
- Location preferences are hints, not guarantees - Spark may place partitions elsewhere
211
- Consumer instances are automatically managed and cached for performance
212
- PreferBrokers requires driver consumer access and cannot be used with batch RDD operations
213
- PreferFixed falls back to consistent location for unmapped TopicPartitions