or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconsumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdstream-creation.md

offset-management.mddocs/

0

# Offset Management

1

2

Offset management provides precise control over Kafka message consumption and enables exactly-once processing semantics. The offset management system includes utilities for tracking offset ranges, committing processed offsets, and maintaining consumption state across streaming batches.

3

4

## Core Types

5

6

### OffsetRange

7

8

Represents a range of offsets from a single Kafka TopicPartition, defining exactly which messages to process.

9

10

```scala { .api }

11

final class OffsetRange(

12

val topic: String,

13

val partition: Int,

14

val fromOffset: Long,

15

val untilOffset: Long

16

) extends Serializable {

17

def topicPartition(): TopicPartition

18

def count(): Long

19

override def equals(obj: Any): Boolean

20

override def hashCode(): Int

21

override def toString(): String

22

}

23

```

24

25

**Properties:**

26

- `topic`: Kafka topic name

27

- `partition`: Kafka partition ID

28

- `fromOffset`: Inclusive starting offset

29

- `untilOffset`: Exclusive ending offset

30

31

**Methods:**

32

- `topicPartition()`: Returns Kafka TopicPartition object for convenience

33

- `count()`: Returns number of messages in this range (untilOffset - fromOffset)

34

35

### OffsetRange Companion Object

36

37

Factory methods for creating OffsetRange instances.

38

39

```scala { .api }

40

object OffsetRange {

41

def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange

42

def create(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange

43

def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange

44

def apply(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange

45

}

46

```

47

48

## Core Interfaces

49

50

### HasOffsetRanges

51

52

Interface for objects that contain offset range information.

53

54

```scala { .api }

55

trait HasOffsetRanges {

56

def offsetRanges: Array[OffsetRange]

57

}

58

```

59

60

**Implemented by:**

61

- KafkaRDD (batch processing)

62

- DirectKafkaInputDStream RDDs (streaming)

63

64

### CanCommitOffsets

65

66

Interface for objects that can commit offset ranges to Kafka.

67

68

```scala { .api }

69

trait CanCommitOffsets {

70

def commitAsync(offsetRanges: Array[OffsetRange]): Unit

71

def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit

72

}

73

```

74

75

**Implemented by:**

76

- DirectKafkaInputDStream (streaming)

77

78

**Methods:**

79

- `commitAsync(offsetRanges)`: Queue offsets for commit without callback

80

- `commitAsync(offsetRanges, callback)`: Queue offsets for commit with completion callback

81

82

## Usage Examples

83

84

### Basic Offset Range Creation

85

86

```scala

87

import org.apache.spark.streaming.kafka010._

88

import org.apache.kafka.common.TopicPartition

89

90

// Create offset ranges using factory methods

91

val range1 = OffsetRange.create("orders", 0, 100, 200)

92

val range2 = OffsetRange.create("payments", 1, 50, 150)

93

94

// Create using TopicPartition

95

val topicPartition = new TopicPartition("users", 0)

96

val range3 = OffsetRange.create(topicPartition, 0, 100)

97

98

// Create using apply methods (Scala)

99

val range4 = OffsetRange("logs", 2, 1000, 2000)

100

101

// Check range properties

102

println(s"Range 1 covers ${range1.count()} messages")

103

println(s"Topic partition: ${range1.topicPartition()}")

104

```

105

106

### Streaming with Offset Tracking

107

108

```scala

109

import org.apache.spark.streaming.kafka010._

110

111

val stream = KafkaUtils.createDirectStream[String, String](

112

streamingContext,

113

LocationStrategies.PreferConsistent,

114

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

115

)

116

117

stream.foreachRDD { rdd =>

118

// Get offset ranges from the RDD

119

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

120

121

// Log offset information

122

offsetRanges.foreach { range =>

123

println(s"Processing ${range.topic} partition ${range.partition}: " +

124

s"${range.fromOffset} to ${range.untilOffset} (${range.count()} messages)")

125

}

126

127

// Process the data

128

val results = rdd.map(record => processRecord(record)).collect()

129

130

// Only commit offsets after successful processing

131

if (results.nonEmpty) {

132

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

133

println("Committed offsets for successful batch")

134

}

135

}

136

```

137

138

### Manual Offset Commit with Callback

139

140

```scala

141

import org.apache.kafka.clients.consumer.{OffsetCommitCallback, OffsetAndMetadata}

142

import org.apache.kafka.common.TopicPartition

143

144

stream.foreachRDD { rdd =>

145

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

146

147

// Process data with error handling

148

try {

149

val processedData = rdd.map(record => processRecord(record)).collect()

150

151

// Custom commit callback for monitoring

152

val commitCallback = new OffsetCommitCallback {

153

def onComplete(metadata: java.util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {

154

if (exception != null) {

155

println(s"Offset commit failed: ${exception.getMessage}")

156

} else {

157

println(s"Successfully committed ${metadata.size()} partition offsets")

158

metadata.forEach { case (tp, offsetMeta) =>

159

println(s" ${tp.topic()}-${tp.partition()}: ${offsetMeta.offset()}")

160

}

161

}

162

}

163

}

164

165

// Commit with callback

166

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, commitCallback)

167

168

} catch {

169

case e: Exception =>

170

println(s"Processing failed, not committing offsets: ${e.getMessage}")

171

}

172

}

173

```

174

175

### Batch Processing with Offset Ranges

176

177

```scala

178

import org.apache.spark.streaming.kafka010._

179

180

// Define specific ranges to process

181

val offsetRanges = Array(

182

OffsetRange("orders", 0, 1000, 1500),

183

OffsetRange("orders", 1, 2000, 2500),

184

OffsetRange("payments", 0, 500, 1000)

185

)

186

187

val rdd = KafkaUtils.createRDD[String, String](

188

sparkContext,

189

kafkaParams,

190

offsetRanges,

191

LocationStrategies.PreferConsistent

192

)

193

194

// Process and track progress

195

val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

196

val totalMessages = ranges.map(_.count()).sum

197

println(s"Processing {totalMessages} messages across ${ranges.length} partitions")

198

199

val processedCount = rdd.count()

200

println(s"Successfully processed ${processedCount} messages")

201

202

// Verify ranges match expectations

203

ranges.zip(offsetRanges).foreach { case (actual, expected) =>

204

assert(actual == expected, "Offset ranges don't match")

205

}

206

```

207

208

### Exactly-Once Processing Pattern

209

210

```scala

211

import org.apache.spark.streaming.kafka010._

212

213

stream.foreachRDD { rdd =>

214

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

215

216

// Begin transaction or mark processing start

217

val transactionId = startTransaction(offsetRanges)

218

219

try {

220

// Process data

221

val results = rdd.mapPartitionsWithIndex { (partitionId, iterator) =>

222

val range = offsetRanges(partitionId)

223

iterator.map { record =>

224

// Process with partition-specific logic

225

processWithTransaction(record, transactionId, range)

226

}

227

}.collect()

228

229

// Only commit Kafka offsets after successful transaction commit

230

commitTransaction(transactionId)

231

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

232

233

println(s"Successfully processed and committed batch ${transactionId}")

234

235

} catch {

236

case e: Exception =>

237

rollbackTransaction(transactionId)

238

println(s"Processing failed, rolled back transaction ${transactionId}: ${e.getMessage}")

239

throw e // Re-throw to trigger stream retry

240

}

241

}

242

243

def startTransaction(ranges: Array[OffsetRange]): String = {

244

val transactionId = java.util.UUID.randomUUID().toString

245

// Store offset ranges with transaction for recovery

246

storeTransactionOffsets(transactionId, ranges)

247

transactionId

248

}

249

```

250

251

### Offset Range Utilities

252

253

```scala

254

import org.apache.spark.streaming.kafka010._

255

256

// Utility functions for working with offset ranges

257

def mergeContiguousRanges(ranges: Array[OffsetRange]): Array[OffsetRange] = {

258

ranges.groupBy(r => (r.topic, r.partition))

259

.values

260

.flatMap { partitionRanges =>

261

val sorted = partitionRanges.sortBy(_.fromOffset)

262

sorted.foldLeft(List.empty[OffsetRange]) { (acc, range) =>

263

acc match {

264

case Nil => List(range)

265

case head :: tail if head.untilOffset == range.fromOffset =>

266

// Merge contiguous ranges

267

OffsetRange(head.topic, head.partition, head.fromOffset, range.untilOffset) :: tail

268

case _ => range :: acc

269

}

270

}.reverse

271

}.toArray

272

}

273

274

def calculateLag(currentRanges: Array[OffsetRange], latestOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {

275

currentRanges.map { range =>

276

val tp = range.topicPartition()

277

val lag = latestOffsets.get(tp).map(_ - range.untilOffset).getOrElse(0L)

278

tp -> math.max(0L, lag)

279

}.toMap

280

}

281

```

282

283

## Configuration for Offset Management

284

285

### Kafka Parameters for Manual Offset Management

286

287

```scala

288

val kafkaParams = Map[String, Object](

289

"bootstrap.servers" -> "localhost:9092",

290

"key.deserializer" -> classOf[StringDeserializer],

291

"value.deserializer" -> classOf[StringDeserializer],

292

"group.id" -> "manual-offset-group",

293

"enable.auto.commit" -> (false: java.lang.Boolean), // Disable auto-commit

294

"auto.offset.reset" -> "earliest" // or "latest" based on requirements

295

)

296

```

297

298

### Offset Commit Configuration

299

300

```scala

301

val kafkaParams = Map[String, Object](

302

// ... other parameters

303

"offset.commit.timeout.ms" -> "5000", // Commit timeout

304

"retry.backoff.ms" -> "100", // Retry backoff

305

"request.timeout.ms" -> "30000" // Request timeout

306

)

307

```

308

309

## Error Handling

310

311

### Handling Commit Failures

312

313

```scala

314

val commitCallback = new OffsetCommitCallback {

315

def onComplete(metadata: java.util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {

316

if (exception != null) {

317

exception match {

318

case _: org.apache.kafka.clients.consumer.CommitFailedException =>

319

println("Commit failed - likely due to consumer group rebalance")

320

case _: org.apache.kafka.common.errors.TimeoutException =>

321

println("Commit timed out - may succeed later")

322

case _: org.apache.kafka.common.errors.AuthorizationException =>

323

println("Not authorized to commit offsets")

324

case _ =>

325

println(s"Unexpected commit error: ${exception.getMessage}")

326

}

327

}

328

}

329

}

330

```

331

332

### Validation and Bounds Checking

333

334

```scala

335

def validateOffsetRange(range: OffsetRange, latestOffset: Long): Boolean = {

336

range.fromOffset >= 0 &&

337

range.untilOffset > range.fromOffset &&

338

range.untilOffset <= latestOffset

339

}

340

341

def safeCreateOffsetRange(topic: String, partition: Int, from: Long, until: Long): Option[OffsetRange] = {

342

if (from >= 0 && until > from) {

343

Some(OffsetRange(topic, partition, from, until))

344

} else {

345

println(s"Invalid offset range: $topic-$partition [$from, $until)")

346

None

347

}

348

}

349

```

350

351

## Important Notes

352

353

- All offset management classes are marked as `@Experimental` in Spark 2.4.8

354

- OffsetRange uses inclusive start (fromOffset) and exclusive end (untilOffset)

355

- Offset commits are asynchronous and may complete after the calling method returns

356

- HasOffsetRanges interface allows offset introspection on RDDs and DStream RDDs

357

- CanCommitOffsets interface enables manual offset management for exactly-once semantics

358

- Offset ranges are preserved across RDD transformations that maintain partitioning

359

- Use offset commits only after successful data processing to ensure exactly-once semantics