or run

tessl search
Log in

Version

Files

docs

batch-operations.mdconnection-configuration.mderror-handling-reliability.mdindex.mdoffset-management.mdschema-data-format.mdstreaming-operations.md
tile.json

offset-management.mddocs/

Offset Management

Comprehensive offset control for both batch and streaming operations with support for earliest, latest, specific offsets, and timestamp-based positioning.

Capabilities

Starting Offsets

Control where to start reading from Kafka topics.

String-based Offset Specification

/**
 * Starting offset specification using string values
 * @param offsets - "earliest", "latest", or JSON partition map
 */
.option("startingOffsets", "earliest")          // Start from earliest available
.option("startingOffsets", "latest")            // Start from latest (streaming default)
.option("startingOffsets", """{"topic1":{"0":100,"1":200},"topic2":{"0":50}}""")  // Specific offsets

Usage Examples:

// Start from earliest available messages
val df1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsets", "earliest")
  .load()

// Start from specific offsets per partition
val df2 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsets", """{"events":{"0":1000,"1":2000,"2":500}}""")
  .load()

Timestamp-based Offset Specification

/**
 * Starting offset specification using timestamps
 * @param timestamps - JSON mapping of topic partitions to timestamps (milliseconds since epoch)
 */
.option("startingOffsetsByTimestamp", """{"topic1":{"0":1640995200000,"1":1640995200000}}""")

Usage Example:

// Start from messages after specific timestamp (Jan 1, 2022 00:00:00 UTC)
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsetsByTimestamp", """{"events":{"0":1640995200000,"1":1640995200000}}""")
  .load()

Ending Offsets (Batch Only)

Control where to stop reading for batch operations. Not applicable to streaming queries.

String-based Ending Offsets

/**
 * Ending offset specification for batch reads
 * @param offsets - "latest" or JSON partition map
 * Note: "earliest" is not valid for ending offsets
 */
.option("endingOffsets", "latest")              // Read until latest available
.option("endingOffsets", """{"topic1":{"0":500,"1":600},"topic2":{"0":300}}""")  // Specific end offsets

Timestamp-based Ending Offsets

/**
 * Ending offset specification using timestamps for batch reads
 * @param timestamps - JSON mapping of topic partitions to timestamps
 */
.option("endingOffsetsByTimestamp", """{"topic1":{"0":1640995300000,"1":1640995300000}}""")

Usage Example:

// Batch read from earliest to latest
val batchDF1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

// Batch read with specific offset ranges
val batchDF2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsets", """{"events":{"0":100,"1":200}}""")
  .option("endingOffsets", """{"events":{"0":500,"1":600}}""")
  .load()

// Batch read with time range
val batchDF3 = spark
  .read
  .format("kafka")  
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsetsByTimestamp", """{"events":{"0":1640995200000}}""")  // Start time
  .option("endingOffsetsByTimestamp", """{"events":{"0":1640995300000}}""")    // End time  
  .load()

Offset Range Limits

Core types for representing different offset specifications.

/**
 * Base trait for offset range specifications
 */
sealed trait KafkaOffsetRangeLimit

/**
 * Bind to earliest available offsets
 */
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit

/**
 * Bind to latest available offsets  
 */
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit

/**
 * Bind to specific offsets per partition
 * @param partitionOffsets - Map from TopicPartition to offset
 */
case class SpecificOffsetRangeLimit(
  partitionOffsets: Map[TopicPartition, Long]
) extends KafkaOffsetRangeLimit

/**
 * Bind to offsets based on timestamp per partition
 * @param topicTimestamps - Map from TopicPartition to timestamp (ms since epoch)
 */
case class SpecificTimestampRangeLimit(
  topicTimestamps: Map[TopicPartition, Long]  
) extends KafkaOffsetRangeLimit

Offset Tracking and Serialization

Track and serialize offset information for checkpointing and recovery.

/**
 * Offset tracking for streaming sources
 * @param partitionToOffsets - Current offset per partition
 */
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) {
  /** JSON representation of partition offsets */
  val json: String
}

/**
 * Companion object for KafkaSourceOffset creation and parsing
 */
object KafkaSourceOffset {
  /**
   * Extract partition offsets from various offset types
   * @param offset - Offset instance to extract from
   * @return Map of partition to offset
   */
  def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long]
  
  /**
   * Create KafkaSourceOffset from tuple sequence
   * @param offsetTuples - Sequence of (topic, partition, offset) tuples
   * @return KafkaSourceOffset instance
   */
  def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset
  
  /**
   * Create KafkaSourceOffset from serialized offset
   * @param offset - SerializedOffset containing JSON
   * @return KafkaSourceOffset instance
   */
  def apply(offset: SerializedOffset): KafkaSourceOffset
}

Offset Range Calculation

Represent offset ranges for partition processing.

/**
 * Represents an offset range for a specific partition
 * @param topicPartition - The topic partition  
 * @param fromOffset - Starting offset (inclusive)
 * @param untilOffset - Ending offset (exclusive)
 * @param preferredLoc - Preferred processing location
 */
case class KafkaOffsetRange(
  topicPartition: TopicPartition,
  fromOffset: Long,
  untilOffset: Long, 
  preferredLoc: Option[String]
)

JSON Utilities for Offsets

Utilities for serializing and deserializing offset information.

/**
 * JSON utilities for Kafka metadata serialization
 */
object JsonUtils {
  /**
   * Parse partition assignments from JSON string
   * @param str - JSON string representing partitions
   * @return Array of TopicPartition objects
   */
  def partitions(str: String): Array[TopicPartition]
  
  /**
   * Serialize partitions to JSON string
   * @param partitions - Iterable of TopicPartition objects
   * @return JSON string representation
   */
  def partitions(partitions: Iterable[TopicPartition]): String
  
  /**
   * Parse partition offset map from JSON string
   * @param str - JSON string representing partition offsets
   * @return Map from TopicPartition to offset
   */
  def partitionOffsets(str: String): Map[TopicPartition, Long]
  
  /**
   * Serialize partition offsets to JSON string
   * @param partitionOffsets - Map from TopicPartition to offset
   * @return JSON string representation
   */
  def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String
  
  /**
   * Parse partition timestamp map from JSON string
   * @param str - JSON string representing partition timestamps
   * @return Map from TopicPartition to timestamp
   */
  def partitionTimestamps(str: String): Map[TopicPartition, Long]
  
  /**
   * Serialize partition timestamps to JSON string
   * @param partitionTimestamps - Map from TopicPartition to timestamp
   * @return JSON string representation
   */
  def partitionTimestamps(partitionTimestamps: Map[TopicPartition, Long]): String
}

Types

import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}

// Type alias for partition-to-offset mapping
type PartitionOffsetMap = Map[TopicPartition, Long]

// Offset range limit constants
object KafkaOffsetRangeLimit {
  val LATEST: Long = -1L
  val EARLIEST: Long = -2L
}