or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdper-partition-config.mdstream-creation.md

location-strategies.mddocs/

0

# Location Strategies

1

2

Strategies for scheduling Kafka consumers on executors to optimize performance and network locality. Location strategies control where Kafka consumers are created and cached, which is crucial for performance since Kafka 0.10 consumers prefetch messages.

3

4

## Capabilities

5

6

### PreferConsistent Strategy

7

8

Use this strategy in most cases - it consistently distributes partitions across all executors for balanced load distribution.

9

10

```scala { .api }

11

def PreferConsistent: LocationStrategy

12

```

13

14

**Usage:**

15

16

```scala

17

import org.apache.spark.streaming.kafka010.LocationStrategies

18

19

val stream = KafkaUtils.createDirectStream[String, String](

20

ssc,

21

LocationStrategies.PreferConsistent, // Recommended for most use cases

22

consumerStrategy

23

)

24

```

25

26

**When to use:**

27

- Default choice for most applications

28

- When you want balanced load distribution across executors

29

- When you don't have specific locality requirements

30

- When your Kafka brokers are not co-located with Spark executors

31

32

### PreferBrokers Strategy

33

34

Use this strategy only when your Spark executors are running on the same nodes as your Kafka brokers to minimize network traffic.

35

36

```scala { .api }

37

def PreferBrokers: LocationStrategy

38

```

39

40

**Usage:**

41

42

```scala

43

import org.apache.spark.streaming.kafka010.LocationStrategies

44

45

val stream = KafkaUtils.createDirectStream[String, String](

46

ssc,

47

LocationStrategies.PreferBrokers, // Use only when executors are co-located with brokers

48

consumerStrategy

49

)

50

```

51

52

**When to use:**

53

- When Spark executors run on the same physical machines as Kafka brokers

54

- To minimize network I/O by keeping data local

55

- In containerized environments where Spark and Kafka pods are co-scheduled

56

57

**Note:** This strategy will throw an IllegalArgumentException when used with `KafkaUtils.createRDD` because RDDs don't have a driver consumer to look up broker locations.

58

59

### PreferFixed Strategy

60

61

Use this strategy to explicitly control which executors handle specific topic partitions, useful for load balancing when you have uneven partition sizes.

62

63

```scala { .api }

64

def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy

65

def PreferFixed(hostMap: java.util.Map[TopicPartition, String]): LocationStrategy

66

```

67

68

**Parameters:**

69

- `hostMap`: Map from TopicPartition to preferred host/executor - Any TopicPartition not in the map will use consistent location

70

71

**Usage (Scala):**

72

73

```scala

74

import org.apache.spark.streaming.kafka010.LocationStrategies

75

import org.apache.kafka.common.TopicPartition

76

import scala.collection.mutable

77

78

// Create a map of preferred locations

79

val hostMap = mutable.Map[TopicPartition, String]()

80

hostMap += new TopicPartition("high-volume-topic", 0) -> "executor-host-1"

81

hostMap += new TopicPartition("high-volume-topic", 1) -> "executor-host-2"

82

hostMap += new TopicPartition("low-volume-topic", 0) -> "executor-host-3"

83

84

val stream = KafkaUtils.createDirectStream[String, String](

85

ssc,

86

LocationStrategies.PreferFixed(hostMap),

87

consumerStrategy

88

)

89

```

90

91

**Usage (Java):**

92

93

```java

94

import org.apache.spark.streaming.kafka010.LocationStrategies;

95

import org.apache.kafka.common.TopicPartition;

96

import java.util.HashMap;

97

import java.util.Map;

98

99

Map<TopicPartition, String> hostMap = new HashMap<>();

100

hostMap.put(new TopicPartition("high-volume-topic", 0), "executor-host-1");

101

hostMap.put(new TopicPartition("high-volume-topic", 1), "executor-host-2");

102

103

JavaInputDStream<ConsumerRecord<String, String>> stream =

104

KafkaUtils.createDirectStream(

105

jssc,

106

LocationStrategies.PreferFixed(hostMap),

107

consumerStrategy

108

);

109

```

110

111

**When to use:**

112

- When you have uneven partition sizes and want to balance load manually

113

- When certain partitions have different processing requirements

114

- When you want to isolate high-volume partitions to specific executors

115

- For debugging or testing specific partition assignments

116

117

## Performance Considerations

118

119

### Consumer Caching

120

121

Location strategies directly impact consumer caching performance:

122

123

- **Consistent placement** allows consumers to be reused across micro-batches

124

- **Inconsistent placement** forces consumer recreation, hurting performance

125

- **Cache configuration** can be tuned via Spark configuration:

126

- `spark.streaming.kafka.consumer.cache.enabled=true` (default)

127

- `spark.streaming.kafka.consumer.cache.maxCapacity=64` (default)

128

- `spark.streaming.kafka.consumer.cache.initialCapacity=16` (default)

129

130

### Network Optimization

131

132

```scala

133

// Example: Optimize for network locality

134

val brokerExecutorMap = Map[TopicPartition, String](

135

new TopicPartition("topic1", 0) -> "broker1.example.com",

136

new TopicPartition("topic1", 1) -> "broker2.example.com",

137

new TopicPartition("topic2", 0) -> "broker3.example.com"

138

)

139

140

val stream = KafkaUtils.createDirectStream[String, String](

141

ssc,

142

LocationStrategies.PreferFixed(brokerExecutorMap),

143

consumerStrategy

144

)

145

```

146

147

### Load Balancing

148

149

```scala

150

// Example: Balance high-volume partitions across dedicated executors

151

val loadBalancedMap = Map[TopicPartition, String](

152

// Distribute high-volume partitions

153

new TopicPartition("metrics", 0) -> "high-memory-executor-1",

154

new TopicPartition("metrics", 1) -> "high-memory-executor-2",

155

new TopicPartition("metrics", 2) -> "high-memory-executor-3",

156

157

// Group low-volume partitions

158

new TopicPartition("alerts", 0) -> "standard-executor-1",

159

new TopicPartition("alerts", 1) -> "standard-executor-1"

160

)

161

162

val stream = KafkaUtils.createDirectStream[String, String](

163

ssc,

164

LocationStrategies.PreferFixed(loadBalancedMap),

165

consumerStrategy

166

)

167

```

168

169

## Best Practices

170

171

1. **Start with PreferConsistent**: Use this as your default choice unless you have specific locality requirements.

172

173

2. **Use PreferBrokers carefully**: Only when executors are truly co-located with Kafka brokers and you've verified the performance benefit.

174

175

3. **Monitor cache effectiveness**: Check consumer cache hit rates and adjust cache settings if needed.

176

177

4. **Profile before optimizing**: Measure actual performance impact before implementing complex PreferFixed strategies.

178

179

5. **Consider partition count**: Location strategies become more important with higher partition counts.

180

181

6. **Account for dynamic scaling**: PreferFixed strategies may need adjustment when cluster size changes.

182

183

## Error Handling

184

185

Location strategies include built-in error handling:

186

187

- **PreferBrokers with RDD**: Throws IllegalArgumentException with clear error message

188

- **Invalid host mapping**: Falls back to consistent placement for unmapped partitions

189

- **Executor unavailability**: Automatically reassigns to available executors

190

- **Cache misses**: Gracefully creates new consumers when cache entries are unavailable