or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconsumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdstream-creation.md

location-strategies.mddocs/

0

# Location Strategies

1

2

Location strategies control how Kafka consumers are scheduled for TopicPartitions on Spark executors. Since Kafka 0.10+ consumers prefetch messages, it's crucial for performance to keep cached consumers on appropriate executors rather than recreating them for every partition. The choice of location is a preference, not an absolute requirement - partitions may be scheduled elsewhere if needed.

3

4

## Core Strategies

5

6

### PreferConsistent

7

8

The recommended default strategy that consistently distributes partitions across all available executors.

9

10

```scala { .api }

11

def PreferConsistent: LocationStrategy

12

```

13

14

**Use when:** In most cases - provides good load balancing across executors.

15

16

**Characteristics:**

17

- Distributes partitions evenly across all executors

18

- Maintains consistent assignment across batches

19

- No special host requirements

20

- Best general-purpose strategy

21

22

### PreferBrokers

23

24

Strategy for when your Spark executors are co-located on the same nodes as your Kafka brokers.

25

26

```scala { .api }

27

def PreferBrokers: LocationStrategy

28

```

29

30

**Use when:** Your executors run on the same physical nodes as Kafka brokers.

31

32

**Characteristics:**

33

- Attempts to place consumers on the same hosts as Kafka brokers

34

- Reduces network overhead by keeping data local

35

- Requires executors to be on broker nodes

36

- Cannot be used with `createRDD` (no driver consumer available)

37

38

### PreferFixed (Scala Collection)

39

40

Strategy for custom partition-to-host mapping when you have uneven load distribution or specific placement requirements.

41

42

```scala { .api }

43

def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy

44

```

45

46

**Parameters:**

47

- `hostMap`: collection.Map[TopicPartition, String] - Mapping from TopicPartition to preferred host

48

49

**Use when:** You have specific knowledge about partition load or host capabilities.

50

51

### PreferFixed (Java Collection)

52

53

Java version of the fixed mapping strategy.

54

55

```java { .api }

56

public static LocationStrategy PreferFixed(java.util.Map<TopicPartition, String> hostMap)

57

```

58

59

**Parameters:**

60

- `hostMap`: java.util.Map[TopicPartition, String] - Mapping from TopicPartition to preferred host

61

62

## Usage Examples

63

64

### Default Strategy (Recommended)

65

66

```scala

67

import org.apache.spark.streaming.kafka010._

68

69

// Use PreferConsistent in most cases

70

val stream = KafkaUtils.createDirectStream[String, String](

71

streamingContext,

72

LocationStrategies.PreferConsistent,

73

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

74

)

75

```

76

77

### Broker Co-location Strategy

78

79

```scala

80

import org.apache.spark.streaming.kafka010._

81

82

// Use when executors are on same nodes as Kafka brokers

83

val stream = KafkaUtils.createDirectStream[String, String](

84

streamingContext,

85

LocationStrategies.PreferBrokers,

86

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

87

)

88

89

// Note: Cannot be used with createRDD

90

// This will throw AssertionError:

91

// val rdd = KafkaUtils.createRDD[String, String](

92

// sparkContext, kafkaParams, offsetRanges, LocationStrategies.PreferBrokers

93

// )

94

```

95

96

### Custom Fixed Mapping (Scala)

97

98

```scala

99

import org.apache.spark.streaming.kafka010._

100

import org.apache.kafka.common.TopicPartition

101

102

// Define custom partition-to-host mapping

103

val hostMap = Map(

104

new TopicPartition("high-volume-topic", 0) -> "executor-host-1",

105

new TopicPartition("high-volume-topic", 1) -> "executor-host-2",

106

new TopicPartition("low-volume-topic", 0) -> "executor-host-3"

107

)

108

109

val stream = KafkaUtils.createDirectStream[String, String](

110

streamingContext,

111

LocationStrategies.PreferFixed(hostMap),

112

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

113

)

114

```

115

116

### Custom Fixed Mapping (Java)

117

118

```java

119

import org.apache.spark.streaming.kafka010.*;

120

import org.apache.kafka.common.TopicPartition;

121

122

Map<TopicPartition, String> hostMap = new HashMap<>();

123

hostMap.put(new TopicPartition("topic1", 0), "host1");

124

hostMap.put(new TopicPartition("topic1", 1), "host2");

125

hostMap.put(new TopicPartition("topic2", 0), "host3");

126

127

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(

128

javaStreamingContext,

129

LocationStrategies.PreferFixed(hostMap),

130

ConsumerStrategies.Subscribe(topics, kafkaParams)

131

);

132

```

133

134

### Fallback Behavior with PreferFixed

135

136

```scala

137

import org.apache.spark.streaming.kafka010._

138

import org.apache.kafka.common.TopicPartition

139

140

// Only specify mapping for some partitions

141

val partialHostMap = Map(

142

new TopicPartition("critical-topic", 0) -> "high-performance-host"

143

// Other partitions not specified will use consistent location

144

)

145

146

val stream = KafkaUtils.createDirectStream[String, String](

147

streamingContext,

148

LocationStrategies.PreferFixed(partialHostMap),

149

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

150

)

151

152

// Any TopicPartition not in the map will use consistent location strategy

153

```

154

155

## Strategy Selection Guidelines

156

157

### Use PreferConsistent When:

158

- You don't have specific host requirements

159

- You want simple, balanced partition distribution

160

- You're unsure which strategy to use (default choice)

161

- Your Kafka brokers and Spark executors are on different nodes

162

163

### Use PreferBrokers When:

164

- Your Spark executors run on the same nodes as Kafka brokers

165

- You want to minimize network I/O

166

- You're only using streaming (not batch RDD operations)

167

- You have control over both Kafka and Spark cluster deployment

168

169

### Use PreferFixed When:

170

- You have uneven partition loads and want specific placement

171

- Some partitions require more processing power than others

172

- You have detailed knowledge of your cluster topology

173

- You need to isolate certain partitions on specific hosts

174

175

## Performance Considerations

176

177

### Consumer Caching

178

- Kafka consumers are cached per executor to avoid recreation overhead

179

- Location preferences help maintain cache effectiveness

180

- Consistent placement improves consumer reuse across batches

181

182

### Network Locality

183

- PreferBrokers reduces network traffic when possible

184

- PreferFixed allows fine-tuned placement for optimal network usage

185

- PreferConsistent provides balanced load without network optimization

186

187

### Load Balancing

188

- PreferConsistent ensures even distribution across executors

189

- PreferFixed allows custom load balancing based on partition characteristics

190

- PreferBrokers may create uneven load if broker hosts have different capabilities

191

192

## Error Handling

193

194

### PreferBrokers with createRDD

195

```scala

196

// This will throw AssertionError

197

try {

198

val rdd = KafkaUtils.createRDD[String, String](

199

sparkContext, kafkaParams, offsetRanges, LocationStrategies.PreferBrokers

200

)

201

} catch {

202

case e: AssertionError =>

203

println("PreferBrokers cannot be used with createRDD - use PreferConsistent or PreferFixed")

204

}

205

```

206

207

## Important Notes

208

209

- All location strategies are marked as `@Experimental` in Spark 2.4.8

210

- Location preferences are hints, not guarantees - Spark may place partitions elsewhere

211

- Consumer instances are automatically managed and cached for performance

212

- PreferBrokers requires driver consumer access and cannot be used with batch RDD operations

213

- PreferFixed falls back to consistent location for unmapped TopicPartitions