or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdper-partition-config.mdstream-creation.md

offset-management.mddocs/

0

# Offset Management

1

2

Comprehensive offset range management and commit operations for exactly-once processing semantics. The offset management system provides precise control over which Kafka messages are consumed and enables reliable offset tracking for fault-tolerant stream processing.

3

4

## Capabilities

5

6

### OffsetRange Class

7

8

Represents a range of offsets from a single Kafka TopicPartition, defining exactly which messages to consume.

9

10

```scala { .api }

11

final class OffsetRange {

12

val topic: String // Kafka topic name

13

val partition: Int // Kafka partition id

14

val fromOffset: Long // Inclusive starting offset

15

val untilOffset: Long // Exclusive ending offset

16

17

def topicPartition(): TopicPartition // Kafka TopicPartition object for convenience

18

def count(): Long // Number of messages this OffsetRange refers to

19

}

20

```

21

22

**Properties:**

23

- `topic`: The name of the Kafka topic

24

- `partition`: The partition number within the topic

25

- `fromOffset`: Starting offset (inclusive) - first message to consume

26

- `untilOffset`: Ending offset (exclusive) - first message NOT to consume

27

28

**Methods:**

29

- `topicPartition()`: Returns a Kafka TopicPartition object

30

- `count()`: Returns the number of messages in this range (untilOffset - fromOffset)

31

32

### OffsetRange Factory Methods

33

34

The OffsetRange companion object provides factory methods for creating instances.

35

36

```scala { .api }

37

object OffsetRange {

38

def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange

39

def create(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange

40

def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange

41

def apply(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange

42

}

43

```

44

45

**Usage Examples:**

46

47

```scala

48

import org.apache.spark.streaming.kafka010.OffsetRange

49

import org.apache.kafka.common.TopicPartition

50

51

// Create using topic name and partition number

52

val range1 = OffsetRange.create("orders", 0, 1000L, 2000L)

53

val range2 = OffsetRange.apply("payments", 1, 500L, 1500L)

54

55

// Create using TopicPartition object

56

val topicPartition = new TopicPartition("inventory", 2)

57

val range3 = OffsetRange.create(topicPartition, 0L, 1000L)

58

val range4 = OffsetRange.apply(topicPartition, 2000L, 3000L)

59

60

// Access properties

61

println(s"Topic: ${range1.topic}, Partition: ${range1.partition}")

62

println(s"Range: ${range1.fromOffset} to ${range1.untilOffset}")

63

println(s"Message count: ${range1.count()}")

64

println(s"TopicPartition: ${range1.topicPartition()}")

65

```

66

67

### HasOffsetRanges Trait

68

69

Interface for objects that contain a collection of OffsetRanges, typically implemented by Kafka RDDs.

70

71

```scala { .api }

72

trait HasOffsetRanges {

73

def offsetRanges: Array[OffsetRange]

74

}

75

```

76

77

**Usage Example:**

78

79

```scala

80

import org.apache.spark.streaming.kafka010.{KafkaUtils, HasOffsetRanges}

81

82

val stream = KafkaUtils.createDirectStream[String, String](

83

ssc,

84

LocationStrategies.PreferConsistent,

85

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

86

)

87

88

stream.foreachRDD { rdd =>

89

// Cast RDD to HasOffsetRanges to access offset information

90

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

91

92

// Process each offset range

93

offsetRanges.foreach { offsetRange =>

94

println(s"Topic: ${offsetRange.topic}, " +

95

s"Partition: ${offsetRange.partition}, " +

96

s"From: ${offsetRange.fromOffset}, " +

97

s"Until: ${offsetRange.untilOffset}, " +

98

s"Count: ${offsetRange.count()}")

99

}

100

101

// Process the actual data

102

rdd.foreach { record =>

103

// Your processing logic here

104

println(s"Processing: ${record.key} -> ${record.value}")

105

}

106

}

107

```

108

109

### CanCommitOffsets Trait

110

111

Interface for objects that can commit offset ranges to Kafka for offset management.

112

113

```scala { .api }

114

trait CanCommitOffsets {

115

def commitAsync(offsetRanges: Array[OffsetRange]): Unit

116

def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit

117

}

118

```

119

120

**Methods:**

121

- `commitAsync(offsetRanges)`: Queue offset ranges for commit to Kafka asynchronously

122

- `commitAsync(offsetRanges, callback)`: Queue offset ranges with a completion callback

123

124

**Usage Example:**

125

126

```scala

127

import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges}

128

import org.apache.kafka.clients.consumer.{OffsetCommitCallback, OffsetAndMetadata}

129

import org.apache.kafka.common.TopicPartition

130

import java.util.{Map => JMap}

131

132

val stream = KafkaUtils.createDirectStream[String, String](

133

ssc,

134

LocationStrategies.PreferConsistent,

135

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

136

)

137

138

stream.foreachRDD { rdd =>

139

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

140

141

// Process your data

142

val processedCount = rdd.map { record =>

143

// Your processing logic

144

processMessage(record)

145

1

146

}.reduce(_ + _)

147

148

// Commit offsets after successful processing

149

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

150

151

println(s"Processed $processedCount messages and committed offsets")

152

}

153

```

154

155

**Usage with Callback:**

156

157

```scala

158

val callback = new OffsetCommitCallback {

159

override def onComplete(

160

metadata: JMap[TopicPartition, OffsetAndMetadata],

161

exception: Exception

162

): Unit = {

163

if (exception != null) {

164

println(s"Offset commit failed: ${exception.getMessage}")

165

// Handle commit failure - maybe retry or alert

166

} else {

167

println("Offset commit successful")

168

metadata.forEach { (tp, om) =>

169

println(s"Committed ${tp.topic}-${tp.partition} at offset ${om.offset}")

170

}

171

}

172

}

173

}

174

175

stream.foreachRDD { rdd =>

176

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

177

178

// Process data...

179

processRDD(rdd)

180

181

// Commit with callback

182

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, callback)

183

}

184

```

185

186

## Advanced Usage Patterns

187

188

### Manual Offset Management

189

190

For maximum control, you can manage offsets manually using external storage:

191

192

```scala

193

import org.apache.spark.streaming.kafka010._

194

195

// Custom offset storage (could be database, ZooKeeper, etc.)

196

object OffsetStorage {

197

def saveOffsets(offsetRanges: Array[OffsetRange]): Unit = {

198

// Save to your preferred storage system

199

offsetRanges.foreach { range =>

200

// Save range.topic, range.partition, range.untilOffset

201

database.saveOffset(range.topic, range.partition, range.untilOffset)

202

}

203

}

204

205

def loadOffsets(topics: Array[String]): Map[TopicPartition, Long] = {

206

// Load from your storage system

207

val offsets = topics.flatMap { topic =>

208

getPartitionsForTopic(topic).map { partition =>

209

val offset = database.loadOffset(topic, partition)

210

new TopicPartition(topic, partition) -> offset

211

}

212

}.toMap

213

offsets

214

}

215

}

216

217

// Use stored offsets when creating consumer strategy

218

val storedOffsets = OffsetStorage.loadOffsets(topics)

219

val consumerStrategy = ConsumerStrategies.Subscribe[String, String](

220

topics,

221

kafkaParams,

222

storedOffsets

223

)

224

225

stream.foreachRDD { rdd =>

226

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

227

228

// Process data

229

rdd.foreach(processRecord)

230

231

// Save offsets to external storage instead of Kafka

232

OffsetStorage.saveOffsets(offsetRanges)

233

234

// Don't use CanCommitOffsets.commitAsync in this case

235

}

236

```

237

238

### Exactly-Once Processing Pattern

239

240

Implement exactly-once semantics using offset management:

241

242

```scala

243

import org.apache.spark.streaming.kafka010._

244

245

def processExactlyOnce(rdd: RDD[ConsumerRecord[String, String]]): Unit = {

246

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

247

248

// Begin transaction or prepare idempotent operations

249

val transaction = database.beginTransaction()

250

251

try {

252

// Process each partition

253

rdd.foreachPartition { partition =>

254

partition.foreach { record =>

255

// Your idempotent processing logic here

256

val result = processMessage(record)

257

transaction.write(result)

258

}

259

}

260

261

// Commit transaction and offsets atomically

262

transaction.commit()

263

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

264

265

} catch {

266

case ex: Exception =>

267

transaction.rollback()

268

throw ex

269

}

270

}

271

272

stream.foreachRDD(processExactlyOnce)

273

```

274

275

### Batch Processing with Specific Ranges

276

277

Use offset ranges for batch processing with precise control:

278

279

```scala

280

import org.apache.spark.streaming.kafka010._

281

282

// Define specific ranges for batch processing

283

val batchRanges = Array(

284

OffsetRange("transactions", 0, 0L, 10000L), // Process first 10k messages

285

OffsetRange("transactions", 1, 5000L, 15000L), // Process messages 5k-15k

286

OffsetRange("transactions", 2, 0L, 8000L) // Process first 8k messages

287

)

288

289

val rdd = KafkaUtils.createRDD[String, String](

290

spark.sparkContext,

291

kafkaParams,

292

batchRanges,

293

LocationStrategies.PreferConsistent

294

)

295

296

// The RDD contains exactly the specified ranges

297

val totalMessages = batchRanges.map(_.count()).sum

298

println(s"Processing exactly $totalMessages messages")

299

300

rdd.foreach { record =>

301

println(s"Processing: ${record.topic}-${record.partition} " +

302

s"offset ${record.offset}: ${record.key} -> ${record.value}")

303

}

304

```

305

306

## Error Handling and Recovery

307

308

### Offset Commit Failure Handling

309

310

```scala

311

val resilientCallback = new OffsetCommitCallback {

312

override def onComplete(

313

metadata: JMap[TopicPartition, OffsetAndMetadata],

314

exception: Exception

315

): Unit = {

316

if (exception != null) {

317

exception match {

318

case _: org.apache.kafka.clients.consumer.CommitFailedException =>

319

// Consumer group rebalanced, offsets may be stale

320

println("Commit failed due to rebalance, will retry on next batch")

321

322

case _: org.apache.kafka.common.errors.TimeoutException =>

323

// Network timeout, might retry

324

println("Commit timeout, will retry")

325

326

case other =>

327

// Other errors might need different handling

328

println(s"Unexpected commit error: ${other.getMessage}")

329

}

330

}

331

}

332

}

333

```

334

335

### Duplicate Message Handling

336

337

```scala

338

// Use offset tracking to detect duplicates

339

val processedOffsets = scala.collection.mutable.Set[String]()

340

341

stream.foreachRDD { rdd =>

342

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

343

344

rdd.foreach { record =>

345

val offsetKey = s"${record.topic}-${record.partition}-${record.offset}"

346

347

if (!processedOffsets.contains(offsetKey)) {

348

processMessage(record)

349

processedOffsets.add(offsetKey)

350

} else {

351

println(s"Skipping duplicate message at $offsetKey")

352

}

353

}

354

355

// Clean up old offset tracking to prevent memory leaks

356

if (processedOffsets.size > 100000) {

357

processedOffsets.clear()

358

}

359

360

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

361

}

362

```

363

364

## Best Practices

365

366

1. **Always access offsets**: Use `HasOffsetRanges` to get offset information from RDDs for monitoring and debugging.

367

368

2. **Commit after processing**: Only commit offsets after successfully processing all messages in the batch.

369

370

3. **Use callbacks for monitoring**: Implement `OffsetCommitCallback` to monitor commit success/failure.

371

372

4. **Handle commit failures gracefully**: Don't fail the entire job on offset commit failures - implement retry logic.

373

374

5. **External offset storage for critical apps**: For applications requiring strict exactly-once semantics, consider storing offsets externally.

375

376

6. **Monitor offset lag**: Track the difference between latest available offsets and committed offsets.

377

378

7. **Partition-aware processing**: Remember that offset ranges are per-partition - design your processing accordingly.