or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdper-partition-config.mdstream-creation.md

consumer-strategies.mddocs/

0

# Consumer Strategies

1

2

Configuration strategies for creating and managing Kafka consumers with different subscription patterns. Consumer strategies encapsulate the complex setup required for Kafka 0.10 consumers and allow the configuration to be checkpointed with Spark Streaming applications.

3

4

## Capabilities

5

6

### Subscribe Strategy

7

8

Subscribe to a collection of specific topics. This is the most common pattern for consuming from known topics.

9

10

```scala { .api }

11

// Scala versions

12

def Subscribe[K, V](

13

topics: Iterable[java.lang.String],

14

kafkaParams: collection.Map[String, Object]

15

): ConsumerStrategy[K, V]

16

17

def Subscribe[K, V](

18

topics: Iterable[java.lang.String],

19

kafkaParams: collection.Map[String, Object],

20

offsets: collection.Map[TopicPartition, Long]

21

): ConsumerStrategy[K, V]

22

23

// Java versions

24

def Subscribe[K, V](

25

topics: java.util.Collection[java.lang.String],

26

kafkaParams: java.util.Map[String, Object]

27

): ConsumerStrategy[K, V]

28

29

def Subscribe[K, V](

30

topics: java.util.Collection[java.lang.String],

31

kafkaParams: java.util.Map[String, Object],

32

offsets: java.util.Map[TopicPartition, java.lang.Long]

33

): ConsumerStrategy[K, V]

34

```

35

36

**Parameters:**

37

- `topics`: Collection of topic names to subscribe to

38

- `kafkaParams`: Kafka configuration parameters (must include "bootstrap.servers")

39

- `offsets`: Optional starting offsets for specific partitions

40

41

**Usage Example (Scala):**

42

43

```scala

44

import org.apache.spark.streaming.kafka010.ConsumerStrategies

45

import org.apache.kafka.common.serialization.StringDeserializer

46

import org.apache.kafka.common.TopicPartition

47

48

val kafkaParams = Map[String, Object](

49

"bootstrap.servers" -> "localhost:9092",

50

"key.deserializer" -> classOf[StringDeserializer],

51

"value.deserializer" -> classOf[StringDeserializer],

52

"group.id" -> "my-consumer-group",

53

"auto.offset.reset" -> "latest",

54

"enable.auto.commit" -> (false: java.lang.Boolean)

55

)

56

57

val topics = Array("orders", "payments", "inventory")

58

59

// Subscribe without specific starting offsets

60

val strategy1 = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

61

62

// Subscribe with specific starting offsets

63

val startingOffsets = Map[TopicPartition, Long](

64

new TopicPartition("orders", 0) -> 1000L,

65

new TopicPartition("orders", 1) -> 2000L,

66

new TopicPartition("payments", 0) -> 500L

67

)

68

69

val strategy2 = ConsumerStrategies.Subscribe[String, String](

70

topics,

71

kafkaParams,

72

startingOffsets

73

)

74

75

val stream = KafkaUtils.createDirectStream[String, String](

76

ssc,

77

LocationStrategies.PreferConsistent,

78

strategy1

79

)

80

```

81

82

**Usage Example (Java):**

83

84

```java

85

import org.apache.spark.streaming.kafka010.ConsumerStrategies;

86

import org.apache.kafka.common.serialization.StringDeserializer;

87

import org.apache.kafka.common.TopicPartition;

88

import java.util.*;

89

90

Map<String, Object> kafkaParams = new HashMap<>();

91

kafkaParams.put("bootstrap.servers", "localhost:9092");

92

kafkaParams.put("key.deserializer", StringDeserializer.class);

93

kafkaParams.put("value.deserializer", StringDeserializer.class);

94

kafkaParams.put("group.id", "my-consumer-group");

95

kafkaParams.put("auto.offset.reset", "latest");

96

97

Collection<String> topics = Arrays.asList("orders", "payments", "inventory");

98

99

ConsumerStrategy<String, String> strategy =

100

ConsumerStrategies.Subscribe(topics, kafkaParams);

101

102

JavaInputDStream<ConsumerRecord<String, String>> stream =

103

KafkaUtils.createDirectStream(

104

jssc,

105

LocationStrategies.PreferConsistent(),

106

strategy

107

);

108

```

109

110

### SubscribePattern Strategy

111

112

Subscribe to all topics matching a specified regular expression pattern. Useful for consuming from dynamically created topics that follow a naming convention.

113

114

```scala { .api }

115

// Scala versions

116

def SubscribePattern[K, V](

117

pattern: java.util.regex.Pattern,

118

kafkaParams: collection.Map[String, Object]

119

): ConsumerStrategy[K, V]

120

121

def SubscribePattern[K, V](

122

pattern: java.util.regex.Pattern,

123

kafkaParams: collection.Map[String, Object],

124

offsets: collection.Map[TopicPartition, Long]

125

): ConsumerStrategy[K, V]

126

127

// Java versions

128

def SubscribePattern[K, V](

129

pattern: java.util.regex.Pattern,

130

kafkaParams: java.util.Map[String, Object]

131

): ConsumerStrategy[K, V]

132

133

def SubscribePattern[K, V](

134

pattern: java.util.regex.Pattern,

135

kafkaParams: java.util.Map[String, Object],

136

offsets: java.util.Map[TopicPartition, java.lang.Long]

137

): ConsumerStrategy[K, V]

138

```

139

140

**Parameters:**

141

- `pattern`: Regular expression pattern to match topic names

142

- `kafkaParams`: Kafka configuration parameters

143

- `offsets`: Optional starting offsets for specific partitions

144

145

**Usage Example (Scala):**

146

147

```scala

148

import org.apache.spark.streaming.kafka010.ConsumerStrategies

149

import java.util.regex.Pattern

150

151

// Subscribe to all topics starting with "user-events-"

152

val topicPattern = Pattern.compile("user-events-.*")

153

154

val strategy = ConsumerStrategies.SubscribePattern[String, String](

155

topicPattern,

156

kafkaParams

157

)

158

159

val stream = KafkaUtils.createDirectStream[String, String](

160

ssc,

161

LocationStrategies.PreferConsistent,

162

strategy

163

)

164

165

// This will automatically consume from topics like:

166

// - user-events-clicks

167

// - user-events-purchases

168

// - user-events-registrations

169

// as they are created

170

```

171

172

**Usage Example (Java):**

173

174

```java

175

import java.util.regex.Pattern;

176

177

Pattern topicPattern = Pattern.compile("log-.*-\\d{4}-\\d{2}-\\d{2}");

178

179

ConsumerStrategy<String, String> strategy =

180

ConsumerStrategies.SubscribePattern(topicPattern, kafkaParams);

181

```

182

183

**Dynamic Topic Discovery:**

184

185

The pattern matching is performed periodically against topics existing at the time of check. New topics matching the pattern will be automatically included in subsequent micro-batches.

186

187

### Assign Strategy

188

189

Assign a fixed collection of specific TopicPartitions. This gives you complete control over which partitions are consumed, useful for advanced use cases requiring precise partition assignment.

190

191

```scala { .api }

192

// Scala versions

193

def Assign[K, V](

194

topicPartitions: Iterable[TopicPartition],

195

kafkaParams: collection.Map[String, Object]

196

): ConsumerStrategy[K, V]

197

198

def Assign[K, V](

199

topicPartitions: Iterable[TopicPartition],

200

kafkaParams: collection.Map[String, Object],

201

offsets: collection.Map[TopicPartition, Long]

202

): ConsumerStrategy[K, V]

203

204

// Java versions

205

def Assign[K, V](

206

topicPartitions: java.util.Collection[TopicPartition],

207

kafkaParams: java.util.Map[String, Object]

208

): ConsumerStrategy[K, V]

209

210

def Assign[K, V](

211

topicPartitions: java.util.Collection[TopicPartition],

212

kafkaParams: java.util.Map[String, Object],

213

offsets: java.util.Map[TopicPartition, java.lang.Long]

214

): ConsumerStrategy[K, V]

215

```

216

217

**Parameters:**

218

- `topicPartitions`: Specific collection of TopicPartitions to consume from

219

- `kafkaParams`: Kafka configuration parameters

220

- `offsets`: Optional starting offsets for the assigned partitions

221

222

**Usage Example (Scala):**

223

224

```scala

225

import org.apache.spark.streaming.kafka010.ConsumerStrategies

226

import org.apache.kafka.common.TopicPartition

227

228

// Assign specific partitions for precise control

229

val assignedPartitions = Array(

230

new TopicPartition("high-priority", 0),

231

new TopicPartition("high-priority", 1),

232

new TopicPartition("medium-priority", 0),

233

new TopicPartition("low-priority", 2)

234

)

235

236

val strategy = ConsumerStrategies.Assign[String, String](

237

assignedPartitions,

238

kafkaParams

239

)

240

241

// With specific starting offsets

242

val partitionOffsets = Map[TopicPartition, Long](

243

new TopicPartition("high-priority", 0) -> 10000L,

244

new TopicPartition("high-priority", 1) -> 15000L,

245

new TopicPartition("medium-priority", 0) -> 5000L,

246

new TopicPartition("low-priority", 2) -> 1000L

247

)

248

249

val strategyWithOffsets = ConsumerStrategies.Assign[String, String](

250

assignedPartitions,

251

kafkaParams,

252

partitionOffsets

253

)

254

255

val stream = KafkaUtils.createDirectStream[String, String](

256

ssc,

257

LocationStrategies.PreferConsistent,

258

strategyWithOffsets

259

)

260

```

261

262

**Usage Example (Java):**

263

264

```java

265

import org.apache.kafka.common.TopicPartition;

266

import java.util.*;

267

268

Collection<TopicPartition> partitions = Arrays.asList(

269

new TopicPartition("transactions", 0),

270

new TopicPartition("transactions", 1),

271

new TopicPartition("transactions", 2)

272

);

273

274

Map<TopicPartition, Long> offsets = new HashMap<>();

275

offsets.put(new TopicPartition("transactions", 0), 50000L);

276

offsets.put(new TopicPartition("transactions", 1), 75000L);

277

offsets.put(new TopicPartition("transactions", 2), 60000L);

278

279

ConsumerStrategy<String, String> strategy =

280

ConsumerStrategies.Assign(partitions, kafkaParams, offsets);

281

```

282

283

## Advanced Configuration

284

285

### Security Configuration

286

287

Consumer strategies automatically handle security configuration updates:

288

289

```scala

290

// Security parameters are automatically processed

291

val secureKafkaParams = Map[String, Object](

292

"bootstrap.servers" -> "secure-broker:9093",

293

"security.protocol" -> "SASL_SSL",

294

"sasl.mechanism" -> "PLAIN",

295

"sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";",

296

// ... other parameters

297

)

298

299

val strategy = ConsumerStrategies.Subscribe[String, String](topics, secureKafkaParams)

300

```

301

302

### Custom Deserializers

303

304

```scala

305

import org.apache.kafka.common.serialization.StringDeserializer

306

import com.example.MyCustomDeserializer

307

308

val kafkaParams = Map[String, Object](

309

"bootstrap.servers" -> "localhost:9092",

310

"key.deserializer" -> classOf[StringDeserializer],

311

"value.deserializer" -> classOf[MyCustomDeserializer],

312

"group.id" -> "custom-deserializer-group"

313

)

314

315

val strategy = ConsumerStrategies.Subscribe[String, MyCustomObject](topics, kafkaParams)

316

```

317

318

### Offset Reset Strategies

319

320

```scala

321

// Start from earliest available offset

322

val earliestParams = kafkaParams + ("auto.offset.reset" -> "earliest")

323

324

// Start from latest offset (default)

325

val latestParams = kafkaParams + ("auto.offset.reset" -> "latest")

326

327

// Fail if no committed offset exists

328

val noneParams = kafkaParams + ("auto.offset.reset" -> "none")

329

330

val strategy = ConsumerStrategies.Subscribe[String, String](topics, earliestParams)

331

```

332

333

## Error Handling and Edge Cases

334

335

### KAFKA-3370 Workaround

336

337

Consumer strategies automatically handle the KAFKA-3370 issue when `auto.offset.reset=none`:

338

339

```scala

340

val strictParams = Map[String, Object](

341

"bootstrap.servers" -> "localhost:9092",

342

"key.deserializer" -> classOf[StringDeserializer],

343

"value.deserializer" -> classOf[StringDeserializer],

344

"group.id" -> "strict-group",

345

"auto.offset.reset" -> "none" // This triggers the workaround

346

)

347

348

// The strategy will handle NoOffsetForPartitionException internally

349

val strategy = ConsumerStrategies.Subscribe[String, String](topics, strictParams)

350

```

351

352

### Automatic Parameter Fixing

353

354

Consumer strategies ensure parameters are properly configured for Spark executors:

355

356

- `enable.auto.commit` is automatically set to `false`

357

- `auto.offset.reset` is set to `"none"` for executors

358

- `group.id` is modified for executors to avoid conflicts

359

- `receive.buffer.bytes` is increased to 65536 as a KAFKA-3135 workaround

360

361

### Pattern Matching Edge Cases

362

363

For `SubscribePattern`, the strategy handles:

364

365

- **Empty matches**: Gracefully handles patterns that match no existing topics

366

- **Dynamic topic creation**: Automatically includes newly created topics in subsequent batches

367

- **Topic deletion**: Continues processing remaining topics if some are deleted

368

369

## Best Practices

370

371

1. **Use Subscribe for known topics**: When you know the exact topic names, prefer `Subscribe` over pattern matching.

372

373

2. **Use SubscribePattern for dynamic topics**: When topics are created dynamically with consistent naming patterns.

374

375

3. **Use Assign for advanced control**: When you need precise control over partition assignment or want to implement custom load balancing.

376

377

4. **Always specify group.id**: Required for offset management and consumer coordination.

378

379

5. **Disable auto-commit**: Set `enable.auto.commit=false` and manage offsets manually for exactly-once semantics.

380

381

6. **Handle starting offsets**: Specify starting offsets when resuming from checkpoints or specific points in time.

382

383

7. **Security first**: Include all necessary security parameters for production deployments.