0
# Location Strategies
1
2
Strategies for scheduling Kafka consumers on executors to optimize performance and network locality. Location strategies control where Kafka consumers are created and cached, which is crucial for performance since Kafka 0.10 consumers prefetch messages.
3
4
## Capabilities
5
6
### PreferConsistent Strategy
7
8
Use this strategy in most cases - it consistently distributes partitions across all executors for balanced load distribution.
9
10
```scala { .api }
11
def PreferConsistent: LocationStrategy
12
```
13
14
**Usage:**
15
16
```scala
17
import org.apache.spark.streaming.kafka010.LocationStrategies
18
19
val stream = KafkaUtils.createDirectStream[String, String](
20
ssc,
21
LocationStrategies.PreferConsistent, // Recommended for most use cases
22
consumerStrategy
23
)
24
```
25
26
**When to use:**
27
- Default choice for most applications
28
- When you want balanced load distribution across executors
29
- When you don't have specific locality requirements
30
- When your Kafka brokers are not co-located with Spark executors
31
32
### PreferBrokers Strategy
33
34
Use this strategy only when your Spark executors are running on the same nodes as your Kafka brokers to minimize network traffic.
35
36
```scala { .api }
37
def PreferBrokers: LocationStrategy
38
```
39
40
**Usage:**
41
42
```scala
43
import org.apache.spark.streaming.kafka010.LocationStrategies
44
45
val stream = KafkaUtils.createDirectStream[String, String](
46
ssc,
47
LocationStrategies.PreferBrokers, // Use only when executors are co-located with brokers
48
consumerStrategy
49
)
50
```
51
52
**When to use:**
53
- When Spark executors run on the same physical machines as Kafka brokers
54
- To minimize network I/O by keeping data local
55
- In containerized environments where Spark and Kafka pods are co-scheduled
56
57
**Note:** This strategy will throw an IllegalArgumentException when used with `KafkaUtils.createRDD` because RDDs don't have a driver consumer to look up broker locations.
58
59
### PreferFixed Strategy
60
61
Use this strategy to explicitly control which executors handle specific topic partitions, useful for load balancing when you have uneven partition sizes.
62
63
```scala { .api }
64
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy
65
def PreferFixed(hostMap: java.util.Map[TopicPartition, String]): LocationStrategy
66
```
67
68
**Parameters:**
69
- `hostMap`: Map from TopicPartition to preferred host/executor - Any TopicPartition not in the map will use consistent location
70
71
**Usage (Scala):**
72
73
```scala
74
import org.apache.spark.streaming.kafka010.LocationStrategies
75
import org.apache.kafka.common.TopicPartition
76
import scala.collection.mutable
77
78
// Create a map of preferred locations
79
val hostMap = mutable.Map[TopicPartition, String]()
80
hostMap += new TopicPartition("high-volume-topic", 0) -> "executor-host-1"
81
hostMap += new TopicPartition("high-volume-topic", 1) -> "executor-host-2"
82
hostMap += new TopicPartition("low-volume-topic", 0) -> "executor-host-3"
83
84
val stream = KafkaUtils.createDirectStream[String, String](
85
ssc,
86
LocationStrategies.PreferFixed(hostMap),
87
consumerStrategy
88
)
89
```
90
91
**Usage (Java):**
92
93
```java
94
import org.apache.spark.streaming.kafka010.LocationStrategies;
95
import org.apache.kafka.common.TopicPartition;
96
import java.util.HashMap;
97
import java.util.Map;
98
99
Map<TopicPartition, String> hostMap = new HashMap<>();
100
hostMap.put(new TopicPartition("high-volume-topic", 0), "executor-host-1");
101
hostMap.put(new TopicPartition("high-volume-topic", 1), "executor-host-2");
102
103
JavaInputDStream<ConsumerRecord<String, String>> stream =
104
KafkaUtils.createDirectStream(
105
jssc,
106
LocationStrategies.PreferFixed(hostMap),
107
consumerStrategy
108
);
109
```
110
111
**When to use:**
112
- When you have uneven partition sizes and want to balance load manually
113
- When certain partitions have different processing requirements
114
- When you want to isolate high-volume partitions to specific executors
115
- For debugging or testing specific partition assignments
116
117
## Performance Considerations
118
119
### Consumer Caching
120
121
Location strategies directly impact consumer caching performance:
122
123
- **Consistent placement** allows consumers to be reused across micro-batches
124
- **Inconsistent placement** forces consumer recreation, hurting performance
125
- **Cache configuration** can be tuned via Spark configuration:
126
- `spark.streaming.kafka.consumer.cache.enabled=true` (default)
127
- `spark.streaming.kafka.consumer.cache.maxCapacity=64` (default)
128
- `spark.streaming.kafka.consumer.cache.initialCapacity=16` (default)
129
130
### Network Optimization
131
132
```scala
133
// Example: Optimize for network locality
134
val brokerExecutorMap = Map[TopicPartition, String](
135
new TopicPartition("topic1", 0) -> "broker1.example.com",
136
new TopicPartition("topic1", 1) -> "broker2.example.com",
137
new TopicPartition("topic2", 0) -> "broker3.example.com"
138
)
139
140
val stream = KafkaUtils.createDirectStream[String, String](
141
ssc,
142
LocationStrategies.PreferFixed(brokerExecutorMap),
143
consumerStrategy
144
)
145
```
146
147
### Load Balancing
148
149
```scala
150
// Example: Balance high-volume partitions across dedicated executors
151
val loadBalancedMap = Map[TopicPartition, String](
152
// Distribute high-volume partitions
153
new TopicPartition("metrics", 0) -> "high-memory-executor-1",
154
new TopicPartition("metrics", 1) -> "high-memory-executor-2",
155
new TopicPartition("metrics", 2) -> "high-memory-executor-3",
156
157
// Group low-volume partitions
158
new TopicPartition("alerts", 0) -> "standard-executor-1",
159
new TopicPartition("alerts", 1) -> "standard-executor-1"
160
)
161
162
val stream = KafkaUtils.createDirectStream[String, String](
163
ssc,
164
LocationStrategies.PreferFixed(loadBalancedMap),
165
consumerStrategy
166
)
167
```
168
169
## Best Practices
170
171
1. **Start with PreferConsistent**: Use this as your default choice unless you have specific locality requirements.
172
173
2. **Use PreferBrokers carefully**: Only when executors are truly co-located with Kafka brokers and you've verified the performance benefit.
174
175
3. **Monitor cache effectiveness**: Check consumer cache hit rates and adjust cache settings if needed.
176
177
4. **Profile before optimizing**: Measure actual performance impact before implementing complex PreferFixed strategies.
178
179
5. **Consider partition count**: Location strategies become more important with higher partition counts.
180
181
6. **Account for dynamic scaling**: PreferFixed strategies may need adjustment when cluster size changes.
182
183
## Error Handling
184
185
Location strategies include built-in error handling:
186
187
- **PreferBrokers with RDD**: Throws IllegalArgumentException with clear error message
188
- **Invalid host mapping**: Falls back to consistent placement for unmapped partitions
189
- **Executor unavailability**: Automatically reassigns to available executors
190
- **Cache misses**: Gracefully creates new consumers when cache entries are unavailable