0
# Offset Management
1
2
Offset management provides precise control over Kafka message consumption and enables exactly-once processing semantics. The offset management system includes utilities for tracking offset ranges, committing processed offsets, and maintaining consumption state across streaming batches.
3
4
## Core Types
5
6
### OffsetRange
7
8
Represents a range of offsets from a single Kafka TopicPartition, defining exactly which messages to process.
9
10
```scala { .api }
11
final class OffsetRange(
12
val topic: String,
13
val partition: Int,
14
val fromOffset: Long,
15
val untilOffset: Long
16
) extends Serializable {
17
def topicPartition(): TopicPartition
18
def count(): Long
19
override def equals(obj: Any): Boolean
20
override def hashCode(): Int
21
override def toString(): String
22
}
23
```
24
25
**Properties:**
26
- `topic`: Kafka topic name
27
- `partition`: Kafka partition ID
28
- `fromOffset`: Inclusive starting offset
29
- `untilOffset`: Exclusive ending offset
30
31
**Methods:**
32
- `topicPartition()`: Returns Kafka TopicPartition object for convenience
33
- `count()`: Returns number of messages in this range (untilOffset - fromOffset)
34
35
### OffsetRange Companion Object
36
37
Factory methods for creating OffsetRange instances.
38
39
```scala { .api }
40
object OffsetRange {
41
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
42
def create(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
43
def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
44
def apply(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
45
}
46
```
47
48
## Core Interfaces
49
50
### HasOffsetRanges
51
52
Interface for objects that contain offset range information.
53
54
```scala { .api }
55
trait HasOffsetRanges {
56
def offsetRanges: Array[OffsetRange]
57
}
58
```
59
60
**Implemented by:**
61
- KafkaRDD (batch processing)
62
- DirectKafkaInputDStream RDDs (streaming)
63
64
### CanCommitOffsets
65
66
Interface for objects that can commit offset ranges to Kafka.
67
68
```scala { .api }
69
trait CanCommitOffsets {
70
def commitAsync(offsetRanges: Array[OffsetRange]): Unit
71
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
72
}
73
```
74
75
**Implemented by:**
76
- DirectKafkaInputDStream (streaming)
77
78
**Methods:**
79
- `commitAsync(offsetRanges)`: Queue offsets for commit without callback
80
- `commitAsync(offsetRanges, callback)`: Queue offsets for commit with completion callback
81
82
## Usage Examples
83
84
### Basic Offset Range Creation
85
86
```scala
87
import org.apache.spark.streaming.kafka010._
88
import org.apache.kafka.common.TopicPartition
89
90
// Create offset ranges using factory methods
91
val range1 = OffsetRange.create("orders", 0, 100, 200)
92
val range2 = OffsetRange.create("payments", 1, 50, 150)
93
94
// Create using TopicPartition
95
val topicPartition = new TopicPartition("users", 0)
96
val range3 = OffsetRange.create(topicPartition, 0, 100)
97
98
// Create using apply methods (Scala)
99
val range4 = OffsetRange("logs", 2, 1000, 2000)
100
101
// Check range properties
102
println(s"Range 1 covers ${range1.count()} messages")
103
println(s"Topic partition: ${range1.topicPartition()}")
104
```
105
106
### Streaming with Offset Tracking
107
108
```scala
109
import org.apache.spark.streaming.kafka010._
110
111
val stream = KafkaUtils.createDirectStream[String, String](
112
streamingContext,
113
LocationStrategies.PreferConsistent,
114
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
115
)
116
117
stream.foreachRDD { rdd =>
118
// Get offset ranges from the RDD
119
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
120
121
// Log offset information
122
offsetRanges.foreach { range =>
123
println(s"Processing ${range.topic} partition ${range.partition}: " +
124
s"${range.fromOffset} to ${range.untilOffset} (${range.count()} messages)")
125
}
126
127
// Process the data
128
val results = rdd.map(record => processRecord(record)).collect()
129
130
// Only commit offsets after successful processing
131
if (results.nonEmpty) {
132
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
133
println("Committed offsets for successful batch")
134
}
135
}
136
```
137
138
### Manual Offset Commit with Callback
139
140
```scala
141
import org.apache.kafka.clients.consumer.{OffsetCommitCallback, OffsetAndMetadata}
142
import org.apache.kafka.common.TopicPartition
143
144
stream.foreachRDD { rdd =>
145
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
146
147
// Process data with error handling
148
try {
149
val processedData = rdd.map(record => processRecord(record)).collect()
150
151
// Custom commit callback for monitoring
152
val commitCallback = new OffsetCommitCallback {
153
def onComplete(metadata: java.util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
154
if (exception != null) {
155
println(s"Offset commit failed: ${exception.getMessage}")
156
} else {
157
println(s"Successfully committed ${metadata.size()} partition offsets")
158
metadata.forEach { case (tp, offsetMeta) =>
159
println(s" ${tp.topic()}-${tp.partition()}: ${offsetMeta.offset()}")
160
}
161
}
162
}
163
}
164
165
// Commit with callback
166
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, commitCallback)
167
168
} catch {
169
case e: Exception =>
170
println(s"Processing failed, not committing offsets: ${e.getMessage}")
171
}
172
}
173
```
174
175
### Batch Processing with Offset Ranges
176
177
```scala
178
import org.apache.spark.streaming.kafka010._
179
180
// Define specific ranges to process
181
val offsetRanges = Array(
182
OffsetRange("orders", 0, 1000, 1500),
183
OffsetRange("orders", 1, 2000, 2500),
184
OffsetRange("payments", 0, 500, 1000)
185
)
186
187
val rdd = KafkaUtils.createRDD[String, String](
188
sparkContext,
189
kafkaParams,
190
offsetRanges,
191
LocationStrategies.PreferConsistent
192
)
193
194
// Process and track progress
195
val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
196
val totalMessages = ranges.map(_.count()).sum
197
println(s"Processing {totalMessages} messages across ${ranges.length} partitions")
198
199
val processedCount = rdd.count()
200
println(s"Successfully processed ${processedCount} messages")
201
202
// Verify ranges match expectations
203
ranges.zip(offsetRanges).foreach { case (actual, expected) =>
204
assert(actual == expected, "Offset ranges don't match")
205
}
206
```
207
208
### Exactly-Once Processing Pattern
209
210
```scala
211
import org.apache.spark.streaming.kafka010._
212
213
stream.foreachRDD { rdd =>
214
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
215
216
// Begin transaction or mark processing start
217
val transactionId = startTransaction(offsetRanges)
218
219
try {
220
// Process data
221
val results = rdd.mapPartitionsWithIndex { (partitionId, iterator) =>
222
val range = offsetRanges(partitionId)
223
iterator.map { record =>
224
// Process with partition-specific logic
225
processWithTransaction(record, transactionId, range)
226
}
227
}.collect()
228
229
// Only commit Kafka offsets after successful transaction commit
230
commitTransaction(transactionId)
231
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
232
233
println(s"Successfully processed and committed batch ${transactionId}")
234
235
} catch {
236
case e: Exception =>
237
rollbackTransaction(transactionId)
238
println(s"Processing failed, rolled back transaction ${transactionId}: ${e.getMessage}")
239
throw e // Re-throw to trigger stream retry
240
}
241
}
242
243
def startTransaction(ranges: Array[OffsetRange]): String = {
244
val transactionId = java.util.UUID.randomUUID().toString
245
// Store offset ranges with transaction for recovery
246
storeTransactionOffsets(transactionId, ranges)
247
transactionId
248
}
249
```
250
251
### Offset Range Utilities
252
253
```scala
254
import org.apache.spark.streaming.kafka010._
255
256
// Utility functions for working with offset ranges
257
def mergeContiguousRanges(ranges: Array[OffsetRange]): Array[OffsetRange] = {
258
ranges.groupBy(r => (r.topic, r.partition))
259
.values
260
.flatMap { partitionRanges =>
261
val sorted = partitionRanges.sortBy(_.fromOffset)
262
sorted.foldLeft(List.empty[OffsetRange]) { (acc, range) =>
263
acc match {
264
case Nil => List(range)
265
case head :: tail if head.untilOffset == range.fromOffset =>
266
// Merge contiguous ranges
267
OffsetRange(head.topic, head.partition, head.fromOffset, range.untilOffset) :: tail
268
case _ => range :: acc
269
}
270
}.reverse
271
}.toArray
272
}
273
274
def calculateLag(currentRanges: Array[OffsetRange], latestOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
275
currentRanges.map { range =>
276
val tp = range.topicPartition()
277
val lag = latestOffsets.get(tp).map(_ - range.untilOffset).getOrElse(0L)
278
tp -> math.max(0L, lag)
279
}.toMap
280
}
281
```
282
283
## Configuration for Offset Management
284
285
### Kafka Parameters for Manual Offset Management
286
287
```scala
288
val kafkaParams = Map[String, Object](
289
"bootstrap.servers" -> "localhost:9092",
290
"key.deserializer" -> classOf[StringDeserializer],
291
"value.deserializer" -> classOf[StringDeserializer],
292
"group.id" -> "manual-offset-group",
293
"enable.auto.commit" -> (false: java.lang.Boolean), // Disable auto-commit
294
"auto.offset.reset" -> "earliest" // or "latest" based on requirements
295
)
296
```
297
298
### Offset Commit Configuration
299
300
```scala
301
val kafkaParams = Map[String, Object](
302
// ... other parameters
303
"offset.commit.timeout.ms" -> "5000", // Commit timeout
304
"retry.backoff.ms" -> "100", // Retry backoff
305
"request.timeout.ms" -> "30000" // Request timeout
306
)
307
```
308
309
## Error Handling
310
311
### Handling Commit Failures
312
313
```scala
314
val commitCallback = new OffsetCommitCallback {
315
def onComplete(metadata: java.util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
316
if (exception != null) {
317
exception match {
318
case _: org.apache.kafka.clients.consumer.CommitFailedException =>
319
println("Commit failed - likely due to consumer group rebalance")
320
case _: org.apache.kafka.common.errors.TimeoutException =>
321
println("Commit timed out - may succeed later")
322
case _: org.apache.kafka.common.errors.AuthorizationException =>
323
println("Not authorized to commit offsets")
324
case _ =>
325
println(s"Unexpected commit error: ${exception.getMessage}")
326
}
327
}
328
}
329
}
330
```
331
332
### Validation and Bounds Checking
333
334
```scala
335
def validateOffsetRange(range: OffsetRange, latestOffset: Long): Boolean = {
336
range.fromOffset >= 0 &&
337
range.untilOffset > range.fromOffset &&
338
range.untilOffset <= latestOffset
339
}
340
341
def safeCreateOffsetRange(topic: String, partition: Int, from: Long, until: Long): Option[OffsetRange] = {
342
if (from >= 0 && until > from) {
343
Some(OffsetRange(topic, partition, from, until))
344
} else {
345
println(s"Invalid offset range: $topic-$partition [$from, $until)")
346
None
347
}
348
}
349
```
350
351
## Important Notes
352
353
- All offset management classes are marked as `@Experimental` in Spark 2.4.8
354
- OffsetRange uses inclusive start (fromOffset) and exclusive end (untilOffset)
355
- Offset commits are asynchronous and may complete after the calling method returns
356
- HasOffsetRanges interface allows offset introspection on RDDs and DStream RDDs
357
- CanCommitOffsets interface enables manual offset management for exactly-once semantics
358
- Offset ranges are preserved across RDD transformations that maintain partitioning
359
- Use offset commits only after successful data processing to ensure exactly-once semantics