or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconsumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdstream-creation.md

consumer-strategies.mddocs/

0

# Consumer Strategies

1

2

Consumer strategies encapsulate how Kafka consumers are created and configured on driver and executors. They handle the complex setup required for Kafka 0.10+ consumers, including subscription management, offset initialization, and parameter validation. The strategy pattern allows for flexible consumer configuration while maintaining checkpoint compatibility.

3

4

## Core Strategies

5

6

### Subscribe

7

8

Subscribe to a collection of specific topics for dynamic partition assignment.

9

10

```scala { .api }

11

// Scala versions

12

def Subscribe[K, V](

13

topics: Iterable[String],

14

kafkaParams: collection.Map[String, Object]

15

): ConsumerStrategy[K, V]

16

17

def Subscribe[K, V](

18

topics: Iterable[String],

19

kafkaParams: collection.Map[String, Object],

20

offsets: collection.Map[TopicPartition, Long]

21

): ConsumerStrategy[K, V]

22

23

// Java versions

24

def Subscribe[K, V](

25

topics: java.util.Collection[String],

26

kafkaParams: java.util.Map[String, Object]

27

): ConsumerStrategy[K, V]

28

29

def Subscribe[K, V](

30

topics: java.util.Collection[String],

31

kafkaParams: java.util.Map[String, Object],

32

offsets: java.util.Map[TopicPartition, java.lang.Long]

33

): ConsumerStrategy[K, V]

34

```

35

36

**Parameters:**

37

- `topics`: Collection of topic names to subscribe to

38

- `kafkaParams`: Kafka consumer configuration parameters

39

- `offsets`: Optional initial offsets for specific partitions

40

41

**Use when:** You want to consume from specific known topics with automatic partition discovery.

42

43

### SubscribePattern

44

45

Subscribe to all topics matching a regex pattern for dynamic topic and partition discovery.

46

47

```scala { .api }

48

// Scala versions

49

def SubscribePattern[K, V](

50

pattern: java.util.regex.Pattern,

51

kafkaParams: collection.Map[String, Object]

52

): ConsumerStrategy[K, V]

53

54

def SubscribePattern[K, V](

55

pattern: java.util.regex.Pattern,

56

kafkaParams: collection.Map[String, Object],

57

offsets: collection.Map[TopicPartition, Long]

58

): ConsumerStrategy[K, V]

59

60

// Java versions

61

def SubscribePattern[K, V](

62

pattern: java.util.regex.Pattern,

63

kafkaParams: java.util.Map[String, Object]

64

): ConsumerStrategy[K, V]

65

66

def SubscribePattern[K, V](

67

pattern: java.util.regex.Pattern,

68

kafkaParams: java.util.Map[String, Object],

69

offsets: java.util.Map[TopicPartition, java.lang.Long]

70

): ConsumerStrategy[K, V]

71

```

72

73

**Parameters:**

74

- `pattern`: Regex pattern to match topic names

75

- `kafkaParams`: Kafka consumer configuration parameters

76

- `offsets`: Optional initial offsets for specific partitions

77

78

**Use when:** You want to dynamically discover and consume from topics matching a pattern.

79

80

### Assign

81

82

Assign a fixed collection of specific TopicPartitions for static partition assignment.

83

84

```scala { .api }

85

// Scala versions

86

def Assign[K, V](

87

topicPartitions: Iterable[TopicPartition],

88

kafkaParams: collection.Map[String, Object]

89

): ConsumerStrategy[K, V]

90

91

def Assign[K, V](

92

topicPartitions: Iterable[TopicPartition],

93

kafkaParams: collection.Map[String, Object],

94

offsets: collection.Map[TopicPartition, Long]

95

): ConsumerStrategy[K, V]

96

97

// Java versions

98

def Assign[K, V](

99

topicPartitions: java.util.Collection[TopicPartition],

100

kafkaParams: java.util.Map[String, Object]

101

): ConsumerStrategy[K, V]

102

103

def Assign[K, V](

104

topicPartitions: java.util.Collection[TopicPartition],

105

kafkaParams: java.util.Map[String, Object],

106

offsets: java.util.Map[TopicPartition, java.lang.Long]

107

): ConsumerStrategy[K, V]

108

```

109

110

**Parameters:**

111

- `topicPartitions`: Collection of specific TopicPartitions to assign

112

- `kafkaParams`: Kafka consumer configuration parameters

113

- `offsets`: Optional initial offsets for specific partitions

114

115

**Use when:** You want precise control over which partitions to consume from.

116

117

## Usage Examples

118

119

### Subscribe to Specific Topics

120

121

```scala

122

import org.apache.spark.streaming.kafka010._

123

import org.apache.kafka.common.serialization.StringDeserializer

124

125

val kafkaParams = Map[String, Object](

126

"bootstrap.servers" -> "localhost:9092",

127

"key.deserializer" -> classOf[StringDeserializer],

128

"value.deserializer" -> classOf[StringDeserializer],

129

"group.id" -> "my-consumer-group",

130

"auto.offset.reset" -> "latest",

131

"enable.auto.commit" -> (false: java.lang.Boolean)

132

)

133

134

val topics = Array("orders", "payments", "users")

135

val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

136

137

val stream = KafkaUtils.createDirectStream[String, String](

138

streamingContext,

139

LocationStrategies.PreferConsistent,

140

consumerStrategy

141

)

142

```

143

144

### Subscribe with Initial Offsets

145

146

```scala

147

import org.apache.kafka.common.TopicPartition

148

149

val kafkaParams = Map[String, Object](

150

"bootstrap.servers" -> "localhost:9092",

151

"key.deserializer" -> classOf[StringDeserializer],

152

"value.deserializer" -> classOf[StringDeserializer],

153

"group.id" -> "my-consumer-group",

154

"auto.offset.reset" -> "none" // Will use provided offsets

155

)

156

157

val topics = Array("orders", "payments")

158

val offsets = Map(

159

new TopicPartition("orders", 0) -> 100L,

160

new TopicPartition("orders", 1) -> 200L,

161

new TopicPartition("payments", 0) -> 50L

162

)

163

164

val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)

165

```

166

167

### Pattern-based Topic Subscription

168

169

```scala

170

import java.util.regex.Pattern

171

172

val kafkaParams = Map[String, Object](

173

"bootstrap.servers" -> "localhost:9092",

174

"key.deserializer" -> classOf[StringDeserializer],

175

"value.deserializer" -> classOf[StringDeserializer],

176

"group.id" -> "pattern-consumer-group"

177

)

178

179

// Subscribe to all topics starting with "events-"

180

val pattern = Pattern.compile("events-.*")

181

val consumerStrategy = ConsumerStrategies.SubscribePattern[String, String](pattern, kafkaParams)

182

183

val stream = KafkaUtils.createDirectStream[String, String](

184

streamingContext,

185

LocationStrategies.PreferConsistent,

186

consumerStrategy

187

)

188

```

189

190

### Fixed Partition Assignment

191

192

```scala

193

import org.apache.kafka.common.TopicPartition

194

195

val kafkaParams = Map[String, Object](

196

"bootstrap.servers" -> "localhost:9092",

197

"key.deserializer" -> classOf[StringDeserializer],

198

"value.deserializer" -> classOf[StringDeserializer],

199

"group.id" -> "assigned-consumer-group"

200

)

201

202

// Assign specific partitions

203

val topicPartitions = Array(

204

new TopicPartition("orders", 0),

205

new TopicPartition("orders", 2),

206

new TopicPartition("payments", 1)

207

)

208

209

val consumerStrategy = ConsumerStrategies.Assign[String, String](topicPartitions, kafkaParams)

210

211

val stream = KafkaUtils.createDirectStream[String, String](

212

streamingContext,

213

LocationStrategies.PreferConsistent,

214

consumerStrategy

215

)

216

```

217

218

### Java API Examples

219

220

```java

221

import org.apache.spark.streaming.kafka010.*;

222

import org.apache.kafka.clients.consumer.ConsumerRecord;

223

import org.apache.kafka.common.TopicPartition;

224

import org.apache.kafka.common.serialization.StringDeserializer;

225

226

// Subscribe strategy

227

Map<String, Object> kafkaParams = new HashMap<>();

228

kafkaParams.put("bootstrap.servers", "localhost:9092");

229

kafkaParams.put("key.deserializer", StringDeserializer.class);

230

kafkaParams.put("value.deserializer", StringDeserializer.class);

231

kafkaParams.put("group.id", "java-consumer-group");

232

233

Collection<String> topics = Arrays.asList("topic1", "topic2");

234

ConsumerStrategy<String, String> strategy = ConsumerStrategies.Subscribe(topics, kafkaParams);

235

236

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(

237

javaStreamingContext,

238

LocationStrategies.PreferConsistent(),

239

strategy

240

);

241

```

242

243

### Pattern Strategy with Java

244

245

```java

246

import java.util.regex.Pattern;

247

248

Pattern pattern = Pattern.compile("logs-\\d{4}-\\d{2}-\\d{2}");

249

ConsumerStrategy<String, String> patternStrategy =

250

ConsumerStrategies.SubscribePattern(pattern, kafkaParams);

251

252

JavaInputDStream<ConsumerRecord<String, String>> patternStream = KafkaUtils.createDirectStream(

253

javaStreamingContext,

254

LocationStrategies.PreferConsistent(),

255

patternStrategy

256

);

257

```

258

259

## Configuration Details

260

261

### Required Kafka Parameters

262

263

All strategies require these parameters:

264

265

- `bootstrap.servers`: Kafka broker addresses

266

- `key.deserializer`: Key deserializer class

267

- `value.deserializer`: Value deserializer class

268

269

### Important Kafka Parameters

270

271

- `group.id`: Consumer group ID (required for Subscribe and SubscribePattern)

272

- `auto.offset.reset`: What to do when no initial offset ("earliest", "latest", or "none")

273

- `enable.auto.commit`: Set to `false` for manual offset management

274

- `session.timeout.ms`: Session timeout for consumer group management

275

- `heartbeat.interval.ms`: Heartbeat interval for consumer liveness

276

277

### Offset Initialization

278

279

When providing initial offsets:

280

281

1. **With current offsets (restart from checkpoint)**: Uses current offsets from checkpoint

282

2. **Without current offsets (fresh start)**: Uses provided initial offsets

283

3. **No offsets provided**: Uses `auto.offset.reset` configuration or committed offsets

284

285

## Strategy Selection Guidelines

286

287

### Use Subscribe When:

288

- You know the specific topic names to consume from

289

- You want automatic partition discovery as topics scale

290

- You want consumer group management and rebalancing

291

- You need dynamic partition assignment

292

293

### Use SubscribePattern When:

294

- Topics are created dynamically with predictable naming patterns

295

- You want to automatically consume from new matching topics

296

- You have time-based or categorized topic naming schemes

297

- You need the most flexible topic discovery

298

299

### Use Assign When:

300

- You need precise control over partition assignment

301

- You want to avoid consumer group coordination

302

- You're implementing custom partition assignment logic

303

- You're doing partition-specific processing

304

305

## Advanced Configuration

306

307

### KAFKA-3370 Workaround

308

309

The library automatically handles the KAFKA-3370 issue when `auto.offset.reset` is "none":

310

311

```scala

312

val kafkaParams = Map[String, Object](

313

"bootstrap.servers" -> "localhost:9092",

314

"auto.offset.reset" -> "none", // Will trigger automatic workaround

315

// ... other params

316

)

317

318

// The strategy automatically handles NoOffsetForPartitionException

319

val strategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)

320

```

321

322

### Consumer Lifecycle Management

323

324

All strategies handle:

325

- Consumer creation and configuration

326

- Subscription or assignment setup

327

- Offset seeking for initial positions

328

- Consumer pause/resume for rate limiting

329

- Proper cleanup and resource management

330

331

## Error Handling

332

333

### Common Exceptions

334

335

- **NoOffsetForPartitionException**: Automatically handled when `auto.offset.reset` is "none"

336

- **InvalidTopicException**: Thrown for invalid topic names

337

- **TimeoutException**: Thrown for broker connectivity issues

338

- **AuthenticationException**: Thrown for authentication failures

339

340

### Configuration Validation

341

342

```scala

343

// Invalid configuration will be caught at consumer creation time

344

val invalidParams = Map[String, Object](

345

"bootstrap.servers" -> "", // Empty - will cause error

346

"key.deserializer" -> "invalid.class.name" // Invalid class - will cause error

347

)

348

349

try {

350

val strategy = ConsumerStrategies.Subscribe[String, String](topics, invalidParams)

351

// Error will occur when consumer is created, not when strategy is created

352

} catch {

353

case e: Exception => println(s"Configuration error: ${e.getMessage}")

354

}

355

```

356

357

## Important Notes

358

359

- All consumer strategies are marked as `@Experimental` in Spark 2.4.8

360

- Consumer strategies are serializable and checkpoint-compatible

361

- The same Kafka parameters are used on driver and executors with automatic modifications

362

- Consumer instances are automatically cached and managed per executor

363

- Pattern-based subscription checks for new topics periodically

364

- Fixed assignment (Assign) doesn't require consumer group coordination