0
# Offset Management
1
2
Comprehensive offset range management and commit operations for exactly-once processing semantics. The offset management system provides precise control over which Kafka messages are consumed and enables reliable offset tracking for fault-tolerant stream processing.
3
4
## Capabilities
5
6
### OffsetRange Class
7
8
Represents a range of offsets from a single Kafka TopicPartition, defining exactly which messages to consume.
9
10
```scala { .api }
11
final class OffsetRange {
12
val topic: String // Kafka topic name
13
val partition: Int // Kafka partition id
14
val fromOffset: Long // Inclusive starting offset
15
val untilOffset: Long // Exclusive ending offset
16
17
def topicPartition(): TopicPartition // Kafka TopicPartition object for convenience
18
def count(): Long // Number of messages this OffsetRange refers to
19
}
20
```
21
22
**Properties:**
23
- `topic`: The name of the Kafka topic
24
- `partition`: The partition number within the topic
25
- `fromOffset`: Starting offset (inclusive) - first message to consume
26
- `untilOffset`: Ending offset (exclusive) - first message NOT to consume
27
28
**Methods:**
29
- `topicPartition()`: Returns a Kafka TopicPartition object
30
- `count()`: Returns the number of messages in this range (untilOffset - fromOffset)
31
32
### OffsetRange Factory Methods
33
34
The OffsetRange companion object provides factory methods for creating instances.
35
36
```scala { .api }
37
object OffsetRange {
38
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
39
def create(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
40
def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
41
def apply(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
42
}
43
```
44
45
**Usage Examples:**
46
47
```scala
48
import org.apache.spark.streaming.kafka010.OffsetRange
49
import org.apache.kafka.common.TopicPartition
50
51
// Create using topic name and partition number
52
val range1 = OffsetRange.create("orders", 0, 1000L, 2000L)
53
val range2 = OffsetRange.apply("payments", 1, 500L, 1500L)
54
55
// Create using TopicPartition object
56
val topicPartition = new TopicPartition("inventory", 2)
57
val range3 = OffsetRange.create(topicPartition, 0L, 1000L)
58
val range4 = OffsetRange.apply(topicPartition, 2000L, 3000L)
59
60
// Access properties
61
println(s"Topic: ${range1.topic}, Partition: ${range1.partition}")
62
println(s"Range: ${range1.fromOffset} to ${range1.untilOffset}")
63
println(s"Message count: ${range1.count()}")
64
println(s"TopicPartition: ${range1.topicPartition()}")
65
```
66
67
### HasOffsetRanges Trait
68
69
Interface for objects that contain a collection of OffsetRanges, typically implemented by Kafka RDDs.
70
71
```scala { .api }
72
trait HasOffsetRanges {
73
def offsetRanges: Array[OffsetRange]
74
}
75
```
76
77
**Usage Example:**
78
79
```scala
80
import org.apache.spark.streaming.kafka010.{KafkaUtils, HasOffsetRanges}
81
82
val stream = KafkaUtils.createDirectStream[String, String](
83
ssc,
84
LocationStrategies.PreferConsistent,
85
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
86
)
87
88
stream.foreachRDD { rdd =>
89
// Cast RDD to HasOffsetRanges to access offset information
90
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
91
92
// Process each offset range
93
offsetRanges.foreach { offsetRange =>
94
println(s"Topic: ${offsetRange.topic}, " +
95
s"Partition: ${offsetRange.partition}, " +
96
s"From: ${offsetRange.fromOffset}, " +
97
s"Until: ${offsetRange.untilOffset}, " +
98
s"Count: ${offsetRange.count()}")
99
}
100
101
// Process the actual data
102
rdd.foreach { record =>
103
// Your processing logic here
104
println(s"Processing: ${record.key} -> ${record.value}")
105
}
106
}
107
```
108
109
### CanCommitOffsets Trait
110
111
Interface for objects that can commit offset ranges to Kafka for offset management.
112
113
```scala { .api }
114
trait CanCommitOffsets {
115
def commitAsync(offsetRanges: Array[OffsetRange]): Unit
116
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
117
}
118
```
119
120
**Methods:**
121
- `commitAsync(offsetRanges)`: Queue offset ranges for commit to Kafka asynchronously
122
- `commitAsync(offsetRanges, callback)`: Queue offset ranges with a completion callback
123
124
**Usage Example:**
125
126
```scala
127
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges}
128
import org.apache.kafka.clients.consumer.{OffsetCommitCallback, OffsetAndMetadata}
129
import org.apache.kafka.common.TopicPartition
130
import java.util.{Map => JMap}
131
132
val stream = KafkaUtils.createDirectStream[String, String](
133
ssc,
134
LocationStrategies.PreferConsistent,
135
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
136
)
137
138
stream.foreachRDD { rdd =>
139
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
140
141
// Process your data
142
val processedCount = rdd.map { record =>
143
// Your processing logic
144
processMessage(record)
145
1
146
}.reduce(_ + _)
147
148
// Commit offsets after successful processing
149
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
150
151
println(s"Processed $processedCount messages and committed offsets")
152
}
153
```
154
155
**Usage with Callback:**
156
157
```scala
158
val callback = new OffsetCommitCallback {
159
override def onComplete(
160
metadata: JMap[TopicPartition, OffsetAndMetadata],
161
exception: Exception
162
): Unit = {
163
if (exception != null) {
164
println(s"Offset commit failed: ${exception.getMessage}")
165
// Handle commit failure - maybe retry or alert
166
} else {
167
println("Offset commit successful")
168
metadata.forEach { (tp, om) =>
169
println(s"Committed ${tp.topic}-${tp.partition} at offset ${om.offset}")
170
}
171
}
172
}
173
}
174
175
stream.foreachRDD { rdd =>
176
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
177
178
// Process data...
179
processRDD(rdd)
180
181
// Commit with callback
182
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, callback)
183
}
184
```
185
186
## Advanced Usage Patterns
187
188
### Manual Offset Management
189
190
For maximum control, you can manage offsets manually using external storage:
191
192
```scala
193
import org.apache.spark.streaming.kafka010._
194
195
// Custom offset storage (could be database, ZooKeeper, etc.)
196
object OffsetStorage {
197
def saveOffsets(offsetRanges: Array[OffsetRange]): Unit = {
198
// Save to your preferred storage system
199
offsetRanges.foreach { range =>
200
// Save range.topic, range.partition, range.untilOffset
201
database.saveOffset(range.topic, range.partition, range.untilOffset)
202
}
203
}
204
205
def loadOffsets(topics: Array[String]): Map[TopicPartition, Long] = {
206
// Load from your storage system
207
val offsets = topics.flatMap { topic =>
208
getPartitionsForTopic(topic).map { partition =>
209
val offset = database.loadOffset(topic, partition)
210
new TopicPartition(topic, partition) -> offset
211
}
212
}.toMap
213
offsets
214
}
215
}
216
217
// Use stored offsets when creating consumer strategy
218
val storedOffsets = OffsetStorage.loadOffsets(topics)
219
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](
220
topics,
221
kafkaParams,
222
storedOffsets
223
)
224
225
stream.foreachRDD { rdd =>
226
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
227
228
// Process data
229
rdd.foreach(processRecord)
230
231
// Save offsets to external storage instead of Kafka
232
OffsetStorage.saveOffsets(offsetRanges)
233
234
// Don't use CanCommitOffsets.commitAsync in this case
235
}
236
```
237
238
### Exactly-Once Processing Pattern
239
240
Implement exactly-once semantics using offset management:
241
242
```scala
243
import org.apache.spark.streaming.kafka010._
244
245
def processExactlyOnce(rdd: RDD[ConsumerRecord[String, String]]): Unit = {
246
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
247
248
// Begin transaction or prepare idempotent operations
249
val transaction = database.beginTransaction()
250
251
try {
252
// Process each partition
253
rdd.foreachPartition { partition =>
254
partition.foreach { record =>
255
// Your idempotent processing logic here
256
val result = processMessage(record)
257
transaction.write(result)
258
}
259
}
260
261
// Commit transaction and offsets atomically
262
transaction.commit()
263
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
264
265
} catch {
266
case ex: Exception =>
267
transaction.rollback()
268
throw ex
269
}
270
}
271
272
stream.foreachRDD(processExactlyOnce)
273
```
274
275
### Batch Processing with Specific Ranges
276
277
Use offset ranges for batch processing with precise control:
278
279
```scala
280
import org.apache.spark.streaming.kafka010._
281
282
// Define specific ranges for batch processing
283
val batchRanges = Array(
284
OffsetRange("transactions", 0, 0L, 10000L), // Process first 10k messages
285
OffsetRange("transactions", 1, 5000L, 15000L), // Process messages 5k-15k
286
OffsetRange("transactions", 2, 0L, 8000L) // Process first 8k messages
287
)
288
289
val rdd = KafkaUtils.createRDD[String, String](
290
spark.sparkContext,
291
kafkaParams,
292
batchRanges,
293
LocationStrategies.PreferConsistent
294
)
295
296
// The RDD contains exactly the specified ranges
297
val totalMessages = batchRanges.map(_.count()).sum
298
println(s"Processing exactly $totalMessages messages")
299
300
rdd.foreach { record =>
301
println(s"Processing: ${record.topic}-${record.partition} " +
302
s"offset ${record.offset}: ${record.key} -> ${record.value}")
303
}
304
```
305
306
## Error Handling and Recovery
307
308
### Offset Commit Failure Handling
309
310
```scala
311
val resilientCallback = new OffsetCommitCallback {
312
override def onComplete(
313
metadata: JMap[TopicPartition, OffsetAndMetadata],
314
exception: Exception
315
): Unit = {
316
if (exception != null) {
317
exception match {
318
case _: org.apache.kafka.clients.consumer.CommitFailedException =>
319
// Consumer group rebalanced, offsets may be stale
320
println("Commit failed due to rebalance, will retry on next batch")
321
322
case _: org.apache.kafka.common.errors.TimeoutException =>
323
// Network timeout, might retry
324
println("Commit timeout, will retry")
325
326
case other =>
327
// Other errors might need different handling
328
println(s"Unexpected commit error: ${other.getMessage}")
329
}
330
}
331
}
332
}
333
```
334
335
### Duplicate Message Handling
336
337
```scala
338
// Use offset tracking to detect duplicates
339
val processedOffsets = scala.collection.mutable.Set[String]()
340
341
stream.foreachRDD { rdd =>
342
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
343
344
rdd.foreach { record =>
345
val offsetKey = s"${record.topic}-${record.partition}-${record.offset}"
346
347
if (!processedOffsets.contains(offsetKey)) {
348
processMessage(record)
349
processedOffsets.add(offsetKey)
350
} else {
351
println(s"Skipping duplicate message at $offsetKey")
352
}
353
}
354
355
// Clean up old offset tracking to prevent memory leaks
356
if (processedOffsets.size > 100000) {
357
processedOffsets.clear()
358
}
359
360
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
361
}
362
```
363
364
## Best Practices
365
366
1. **Always access offsets**: Use `HasOffsetRanges` to get offset information from RDDs for monitoring and debugging.
367
368
2. **Commit after processing**: Only commit offsets after successfully processing all messages in the batch.
369
370
3. **Use callbacks for monitoring**: Implement `OffsetCommitCallback` to monitor commit success/failure.
371
372
4. **Handle commit failures gracefully**: Don't fail the entire job on offset commit failures - implement retry logic.
373
374
5. **External offset storage for critical apps**: For applications requiring strict exactly-once semantics, consider storing offsets externally.
375
376
6. **Monitor offset lag**: Track the difference between latest available offsets and committed offsets.
377
378
7. **Partition-aware processing**: Remember that offset ranges are per-partition - design your processing accordingly.