Comprehensive offset control for both batch and streaming operations with support for earliest, latest, specific offsets, and timestamp-based positioning.
Control where to start reading from Kafka topics.
/**
* Starting offset specification using string values
* @param offsets - "earliest", "latest", or JSON partition map
*/
.option("startingOffsets", "earliest") // Start from earliest available
.option("startingOffsets", "latest") // Start from latest (streaming default)
.option("startingOffsets", """{"topic1":{"0":100,"1":200},"topic2":{"0":50}}""") // Specific offsetsUsage Examples:
// Start from earliest available messages
val df1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.load()
// Start from specific offsets per partition
val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", """{"events":{"0":1000,"1":2000,"2":500}}""")
.load()/**
* Starting offset specification using timestamps
* @param timestamps - JSON mapping of topic partitions to timestamps (milliseconds since epoch)
*/
.option("startingOffsetsByTimestamp", """{"topic1":{"0":1640995200000,"1":1640995200000}}""")Usage Example:
// Start from messages after specific timestamp (Jan 1, 2022 00:00:00 UTC)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsetsByTimestamp", """{"events":{"0":1640995200000,"1":1640995200000}}""")
.load()Control where to stop reading for batch operations. Not applicable to streaming queries.
/**
* Ending offset specification for batch reads
* @param offsets - "latest" or JSON partition map
* Note: "earliest" is not valid for ending offsets
*/
.option("endingOffsets", "latest") // Read until latest available
.option("endingOffsets", """{"topic1":{"0":500,"1":600},"topic2":{"0":300}}""") // Specific end offsets/**
* Ending offset specification using timestamps for batch reads
* @param timestamps - JSON mapping of topic partitions to timestamps
*/
.option("endingOffsetsByTimestamp", """{"topic1":{"0":1640995300000,"1":1640995300000}}""")Usage Example:
// Batch read from earliest to latest
val batchDF1 = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Batch read with specific offset ranges
val batchDF2 = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", """{"events":{"0":100,"1":200}}""")
.option("endingOffsets", """{"events":{"0":500,"1":600}}""")
.load()
// Batch read with time range
val batchDF3 = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsetsByTimestamp", """{"events":{"0":1640995200000}}""") // Start time
.option("endingOffsetsByTimestamp", """{"events":{"0":1640995300000}}""") // End time
.load()Core types for representing different offset specifications.
/**
* Base trait for offset range specifications
*/
sealed trait KafkaOffsetRangeLimit
/**
* Bind to earliest available offsets
*/
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit
/**
* Bind to latest available offsets
*/
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
/**
* Bind to specific offsets per partition
* @param partitionOffsets - Map from TopicPartition to offset
*/
case class SpecificOffsetRangeLimit(
partitionOffsets: Map[TopicPartition, Long]
) extends KafkaOffsetRangeLimit
/**
* Bind to offsets based on timestamp per partition
* @param topicTimestamps - Map from TopicPartition to timestamp (ms since epoch)
*/
case class SpecificTimestampRangeLimit(
topicTimestamps: Map[TopicPartition, Long]
) extends KafkaOffsetRangeLimitTrack and serialize offset information for checkpointing and recovery.
/**
* Offset tracking for streaming sources
* @param partitionToOffsets - Current offset per partition
*/
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) {
/** JSON representation of partition offsets */
val json: String
}
/**
* Companion object for KafkaSourceOffset creation and parsing
*/
object KafkaSourceOffset {
/**
* Extract partition offsets from various offset types
* @param offset - Offset instance to extract from
* @return Map of partition to offset
*/
def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long]
/**
* Create KafkaSourceOffset from tuple sequence
* @param offsetTuples - Sequence of (topic, partition, offset) tuples
* @return KafkaSourceOffset instance
*/
def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset
/**
* Create KafkaSourceOffset from serialized offset
* @param offset - SerializedOffset containing JSON
* @return KafkaSourceOffset instance
*/
def apply(offset: SerializedOffset): KafkaSourceOffset
}Represent offset ranges for partition processing.
/**
* Represents an offset range for a specific partition
* @param topicPartition - The topic partition
* @param fromOffset - Starting offset (inclusive)
* @param untilOffset - Ending offset (exclusive)
* @param preferredLoc - Preferred processing location
*/
case class KafkaOffsetRange(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String]
)Utilities for serializing and deserializing offset information.
/**
* JSON utilities for Kafka metadata serialization
*/
object JsonUtils {
/**
* Parse partition assignments from JSON string
* @param str - JSON string representing partitions
* @return Array of TopicPartition objects
*/
def partitions(str: String): Array[TopicPartition]
/**
* Serialize partitions to JSON string
* @param partitions - Iterable of TopicPartition objects
* @return JSON string representation
*/
def partitions(partitions: Iterable[TopicPartition]): String
/**
* Parse partition offset map from JSON string
* @param str - JSON string representing partition offsets
* @return Map from TopicPartition to offset
*/
def partitionOffsets(str: String): Map[TopicPartition, Long]
/**
* Serialize partition offsets to JSON string
* @param partitionOffsets - Map from TopicPartition to offset
* @return JSON string representation
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String
/**
* Parse partition timestamp map from JSON string
* @param str - JSON string representing partition timestamps
* @return Map from TopicPartition to timestamp
*/
def partitionTimestamps(str: String): Map[TopicPartition, Long]
/**
* Serialize partition timestamps to JSON string
* @param partitionTimestamps - Map from TopicPartition to timestamp
* @return JSON string representation
*/
def partitionTimestamps(partitionTimestamps: Map[TopicPartition, Long]): String
}import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
// Type alias for partition-to-offset mapping
type PartitionOffsetMap = Map[TopicPartition, Long]
// Offset range limit constants
object KafkaOffsetRangeLimit {
val LATEST: Long = -1L
val EARLIEST: Long = -2L
}