or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconsumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdstream-creation.md
tile.json

offset-management.mddocs/

Offset Management

Offset management provides precise control over Kafka message consumption and enables exactly-once processing semantics. The offset management system includes utilities for tracking offset ranges, committing processed offsets, and maintaining consumption state across streaming batches.

Core Types

OffsetRange

Represents a range of offsets from a single Kafka TopicPartition, defining exactly which messages to process.

final class OffsetRange(
  val topic: String,
  val partition: Int,  
  val fromOffset: Long,
  val untilOffset: Long
) extends Serializable {
  def topicPartition(): TopicPartition
  def count(): Long
  override def equals(obj: Any): Boolean
  override def hashCode(): Int
  override def toString(): String
}

Properties:

  • topic: Kafka topic name
  • partition: Kafka partition ID
  • fromOffset: Inclusive starting offset
  • untilOffset: Exclusive ending offset

Methods:

  • topicPartition(): Returns Kafka TopicPartition object for convenience
  • count(): Returns number of messages in this range (untilOffset - fromOffset)

OffsetRange Companion Object

Factory methods for creating OffsetRange instances.

object OffsetRange {
  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
  def create(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
  def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
  def apply(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
}

Core Interfaces

HasOffsetRanges

Interface for objects that contain offset range information.

trait HasOffsetRanges {
  def offsetRanges: Array[OffsetRange]
}

Implemented by:

  • KafkaRDD (batch processing)
  • DirectKafkaInputDStream RDDs (streaming)

CanCommitOffsets

Interface for objects that can commit offset ranges to Kafka.

trait CanCommitOffsets {
  def commitAsync(offsetRanges: Array[OffsetRange]): Unit
  def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
}

Implemented by:

  • DirectKafkaInputDStream (streaming)

Methods:

  • commitAsync(offsetRanges): Queue offsets for commit without callback
  • commitAsync(offsetRanges, callback): Queue offsets for commit with completion callback

Usage Examples

Basic Offset Range Creation

import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.TopicPartition

// Create offset ranges using factory methods
val range1 = OffsetRange.create("orders", 0, 100, 200)
val range2 = OffsetRange.create("payments", 1, 50, 150)

// Create using TopicPartition
val topicPartition = new TopicPartition("users", 0)
val range3 = OffsetRange.create(topicPartition, 0, 100)

// Create using apply methods (Scala)
val range4 = OffsetRange("logs", 2, 1000, 2000)

// Check range properties
println(s"Range 1 covers ${range1.count()} messages")
println(s"Topic partition: ${range1.topicPartition()}")

Streaming with Offset Tracking

import org.apache.spark.streaming.kafka010._

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

stream.foreachRDD { rdd =>
  // Get offset ranges from the RDD
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  
  // Log offset information
  offsetRanges.foreach { range =>
    println(s"Processing ${range.topic} partition ${range.partition}: " +
            s"${range.fromOffset} to ${range.untilOffset} (${range.count()} messages)")
  }
  
  // Process the data
  val results = rdd.map(record => processRecord(record)).collect()
  
  // Only commit offsets after successful processing
  if (results.nonEmpty) {
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    println("Committed offsets for successful batch")
  }
}

Manual Offset Commit with Callback

import org.apache.kafka.clients.consumer.{OffsetCommitCallback, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  
  // Process data with error handling
  try {
    val processedData = rdd.map(record => processRecord(record)).collect()
    
    // Custom commit callback for monitoring
    val commitCallback = new OffsetCommitCallback {
      def onComplete(metadata: java.util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
        if (exception != null) {
          println(s"Offset commit failed: ${exception.getMessage}")
        } else {
          println(s"Successfully committed ${metadata.size()} partition offsets")
          metadata.forEach { case (tp, offsetMeta) =>
            println(s"  ${tp.topic()}-${tp.partition()}: ${offsetMeta.offset()}")
          }
        }
      }
    }
    
    // Commit with callback
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, commitCallback)
    
  } catch {
    case e: Exception =>
      println(s"Processing failed, not committing offsets: ${e.getMessage}")
  }
}

Batch Processing with Offset Ranges

import org.apache.spark.streaming.kafka010._

// Define specific ranges to process
val offsetRanges = Array(
  OffsetRange("orders", 0, 1000, 1500),
  OffsetRange("orders", 1, 2000, 2500),
  OffsetRange("payments", 0, 500, 1000)
)

val rdd = KafkaUtils.createRDD[String, String](
  sparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent
)

// Process and track progress
val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val totalMessages = ranges.map(_.count()).sum
println(s"Processing {totalMessages} messages across ${ranges.length} partitions")

val processedCount = rdd.count()
println(s"Successfully processed ${processedCount} messages")

// Verify ranges match expectations
ranges.zip(offsetRanges).foreach { case (actual, expected) =>
  assert(actual == expected, "Offset ranges don't match")
}

Exactly-Once Processing Pattern

import org.apache.spark.streaming.kafka010._

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  
  // Begin transaction or mark processing start
  val transactionId = startTransaction(offsetRanges)
  
  try {
    // Process data
    val results = rdd.mapPartitionsWithIndex { (partitionId, iterator) =>
      val range = offsetRanges(partitionId)
      iterator.map { record =>
        // Process with partition-specific logic
        processWithTransaction(record, transactionId, range)
      }
    }.collect()
    
    // Only commit Kafka offsets after successful transaction commit
    commitTransaction(transactionId)
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    
    println(s"Successfully processed and committed batch ${transactionId}")
    
  } catch {
    case e: Exception =>
      rollbackTransaction(transactionId)
      println(s"Processing failed, rolled back transaction ${transactionId}: ${e.getMessage}")
      throw e // Re-throw to trigger stream retry
  }
}

def startTransaction(ranges: Array[OffsetRange]): String = {
  val transactionId = java.util.UUID.randomUUID().toString
  // Store offset ranges with transaction for recovery
  storeTransactionOffsets(transactionId, ranges)
  transactionId
}

Offset Range Utilities

import org.apache.spark.streaming.kafka010._

// Utility functions for working with offset ranges
def mergeContiguousRanges(ranges: Array[OffsetRange]): Array[OffsetRange] = {
  ranges.groupBy(r => (r.topic, r.partition))
    .values
    .flatMap { partitionRanges =>
      val sorted = partitionRanges.sortBy(_.fromOffset)
      sorted.foldLeft(List.empty[OffsetRange]) { (acc, range) =>
        acc match {
          case Nil => List(range)
          case head :: tail if head.untilOffset == range.fromOffset =>
            // Merge contiguous ranges
            OffsetRange(head.topic, head.partition, head.fromOffset, range.untilOffset) :: tail
          case _ => range :: acc
        }
      }.reverse
    }.toArray
}

def calculateLag(currentRanges: Array[OffsetRange], latestOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
  currentRanges.map { range =>
    val tp = range.topicPartition()
    val lag = latestOffsets.get(tp).map(_ - range.untilOffset).getOrElse(0L)
    tp -> math.max(0L, lag)
  }.toMap
}

Configuration for Offset Management

Kafka Parameters for Manual Offset Management

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "manual-offset-group",
  "enable.auto.commit" -> (false: java.lang.Boolean), // Disable auto-commit
  "auto.offset.reset" -> "earliest" // or "latest" based on requirements
)

Offset Commit Configuration

val kafkaParams = Map[String, Object](
  // ... other parameters
  "offset.commit.timeout.ms" -> "5000",      // Commit timeout
  "retry.backoff.ms" -> "100",               // Retry backoff
  "request.timeout.ms" -> "30000"            // Request timeout
)

Error Handling

Handling Commit Failures

val commitCallback = new OffsetCommitCallback {
  def onComplete(metadata: java.util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
    if (exception != null) {
      exception match {
        case _: org.apache.kafka.clients.consumer.CommitFailedException =>
          println("Commit failed - likely due to consumer group rebalance")
        case _: org.apache.kafka.common.errors.TimeoutException =>
          println("Commit timed out - may succeed later")
        case _: org.apache.kafka.common.errors.AuthorizationException =>
          println("Not authorized to commit offsets")
        case _ =>
          println(s"Unexpected commit error: ${exception.getMessage}")
      }
    }
  }
}

Validation and Bounds Checking

def validateOffsetRange(range: OffsetRange, latestOffset: Long): Boolean = {
  range.fromOffset >= 0 && 
  range.untilOffset > range.fromOffset && 
  range.untilOffset <= latestOffset
}

def safeCreateOffsetRange(topic: String, partition: Int, from: Long, until: Long): Option[OffsetRange] = {
  if (from >= 0 && until > from) {
    Some(OffsetRange(topic, partition, from, until))
  } else {
    println(s"Invalid offset range: $topic-$partition [$from, $until)")
    None
  }
}

Important Notes

  • All offset management classes are marked as @Experimental in Spark 2.4.8
  • OffsetRange uses inclusive start (fromOffset) and exclusive end (untilOffset)
  • Offset commits are asynchronous and may complete after the calling method returns
  • HasOffsetRanges interface allows offset introspection on RDDs and DStream RDDs
  • CanCommitOffsets interface enables manual offset management for exactly-once semantics
  • Offset ranges are preserved across RDD transformations that maintain partitioning
  • Use offset commits only after successful data processing to ensure exactly-once semantics