Offset management provides precise control over Kafka message consumption and enables exactly-once processing semantics. The offset management system includes utilities for tracking offset ranges, committing processed offsets, and maintaining consumption state across streaming batches.
Represents a range of offsets from a single Kafka TopicPartition, defining exactly which messages to process.
final class OffsetRange(
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long
) extends Serializable {
def topicPartition(): TopicPartition
def count(): Long
override def equals(obj: Any): Boolean
override def hashCode(): Int
override def toString(): String
}Properties:
topic: Kafka topic namepartition: Kafka partition IDfromOffset: Inclusive starting offsetuntilOffset: Exclusive ending offsetMethods:
topicPartition(): Returns Kafka TopicPartition object for conveniencecount(): Returns number of messages in this range (untilOffset - fromOffset)Factory methods for creating OffsetRange instances.
object OffsetRange {
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
def create(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
def apply(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
}Interface for objects that contain offset range information.
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}Implemented by:
Interface for objects that can commit offset ranges to Kafka.
trait CanCommitOffsets {
def commitAsync(offsetRanges: Array[OffsetRange]): Unit
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
}Implemented by:
Methods:
commitAsync(offsetRanges): Queue offsets for commit without callbackcommitAsync(offsetRanges, callback): Queue offsets for commit with completion callbackimport org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.TopicPartition
// Create offset ranges using factory methods
val range1 = OffsetRange.create("orders", 0, 100, 200)
val range2 = OffsetRange.create("payments", 1, 50, 150)
// Create using TopicPartition
val topicPartition = new TopicPartition("users", 0)
val range3 = OffsetRange.create(topicPartition, 0, 100)
// Create using apply methods (Scala)
val range4 = OffsetRange("logs", 2, 1000, 2000)
// Check range properties
println(s"Range 1 covers ${range1.count()} messages")
println(s"Topic partition: ${range1.topicPartition()}")import org.apache.spark.streaming.kafka010._
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD { rdd =>
// Get offset ranges from the RDD
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Log offset information
offsetRanges.foreach { range =>
println(s"Processing ${range.topic} partition ${range.partition}: " +
s"${range.fromOffset} to ${range.untilOffset} (${range.count()} messages)")
}
// Process the data
val results = rdd.map(record => processRecord(record)).collect()
// Only commit offsets after successful processing
if (results.nonEmpty) {
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
println("Committed offsets for successful batch")
}
}import org.apache.kafka.clients.consumer.{OffsetCommitCallback, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Process data with error handling
try {
val processedData = rdd.map(record => processRecord(record)).collect()
// Custom commit callback for monitoring
val commitCallback = new OffsetCommitCallback {
def onComplete(metadata: java.util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
if (exception != null) {
println(s"Offset commit failed: ${exception.getMessage}")
} else {
println(s"Successfully committed ${metadata.size()} partition offsets")
metadata.forEach { case (tp, offsetMeta) =>
println(s" ${tp.topic()}-${tp.partition()}: ${offsetMeta.offset()}")
}
}
}
}
// Commit with callback
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, commitCallback)
} catch {
case e: Exception =>
println(s"Processing failed, not committing offsets: ${e.getMessage}")
}
}import org.apache.spark.streaming.kafka010._
// Define specific ranges to process
val offsetRanges = Array(
OffsetRange("orders", 0, 1000, 1500),
OffsetRange("orders", 1, 2000, 2500),
OffsetRange("payments", 0, 500, 1000)
)
val rdd = KafkaUtils.createRDD[String, String](
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent
)
// Process and track progress
val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val totalMessages = ranges.map(_.count()).sum
println(s"Processing {totalMessages} messages across ${ranges.length} partitions")
val processedCount = rdd.count()
println(s"Successfully processed ${processedCount} messages")
// Verify ranges match expectations
ranges.zip(offsetRanges).foreach { case (actual, expected) =>
assert(actual == expected, "Offset ranges don't match")
}import org.apache.spark.streaming.kafka010._
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Begin transaction or mark processing start
val transactionId = startTransaction(offsetRanges)
try {
// Process data
val results = rdd.mapPartitionsWithIndex { (partitionId, iterator) =>
val range = offsetRanges(partitionId)
iterator.map { record =>
// Process with partition-specific logic
processWithTransaction(record, transactionId, range)
}
}.collect()
// Only commit Kafka offsets after successful transaction commit
commitTransaction(transactionId)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
println(s"Successfully processed and committed batch ${transactionId}")
} catch {
case e: Exception =>
rollbackTransaction(transactionId)
println(s"Processing failed, rolled back transaction ${transactionId}: ${e.getMessage}")
throw e // Re-throw to trigger stream retry
}
}
def startTransaction(ranges: Array[OffsetRange]): String = {
val transactionId = java.util.UUID.randomUUID().toString
// Store offset ranges with transaction for recovery
storeTransactionOffsets(transactionId, ranges)
transactionId
}import org.apache.spark.streaming.kafka010._
// Utility functions for working with offset ranges
def mergeContiguousRanges(ranges: Array[OffsetRange]): Array[OffsetRange] = {
ranges.groupBy(r => (r.topic, r.partition))
.values
.flatMap { partitionRanges =>
val sorted = partitionRanges.sortBy(_.fromOffset)
sorted.foldLeft(List.empty[OffsetRange]) { (acc, range) =>
acc match {
case Nil => List(range)
case head :: tail if head.untilOffset == range.fromOffset =>
// Merge contiguous ranges
OffsetRange(head.topic, head.partition, head.fromOffset, range.untilOffset) :: tail
case _ => range :: acc
}
}.reverse
}.toArray
}
def calculateLag(currentRanges: Array[OffsetRange], latestOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
currentRanges.map { range =>
val tp = range.topicPartition()
val lag = latestOffsets.get(tp).map(_ - range.untilOffset).getOrElse(0L)
tp -> math.max(0L, lag)
}.toMap
}val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "manual-offset-group",
"enable.auto.commit" -> (false: java.lang.Boolean), // Disable auto-commit
"auto.offset.reset" -> "earliest" // or "latest" based on requirements
)val kafkaParams = Map[String, Object](
// ... other parameters
"offset.commit.timeout.ms" -> "5000", // Commit timeout
"retry.backoff.ms" -> "100", // Retry backoff
"request.timeout.ms" -> "30000" // Request timeout
)val commitCallback = new OffsetCommitCallback {
def onComplete(metadata: java.util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
if (exception != null) {
exception match {
case _: org.apache.kafka.clients.consumer.CommitFailedException =>
println("Commit failed - likely due to consumer group rebalance")
case _: org.apache.kafka.common.errors.TimeoutException =>
println("Commit timed out - may succeed later")
case _: org.apache.kafka.common.errors.AuthorizationException =>
println("Not authorized to commit offsets")
case _ =>
println(s"Unexpected commit error: ${exception.getMessage}")
}
}
}
}def validateOffsetRange(range: OffsetRange, latestOffset: Long): Boolean = {
range.fromOffset >= 0 &&
range.untilOffset > range.fromOffset &&
range.untilOffset <= latestOffset
}
def safeCreateOffsetRange(topic: String, partition: Int, from: Long, until: Long): Option[OffsetRange] = {
if (from >= 0 && until > from) {
Some(OffsetRange(topic, partition, from, until))
} else {
println(s"Invalid offset range: $topic-$partition [$from, $until)")
None
}
}@Experimental in Spark 2.4.8