Kafka 0.10+ Source for Structured Streaming
The Kafka connector provides comprehensive offset management capabilities for controlling where to start and stop reading data, including support for specific offsets, timestamps, and automatic offset tracking.
Base interface for defining offset boundaries and reading ranges.
/**
* Represents desired offset range limits for starting, ending, and specific offsets
* Used to control the boundaries of Kafka data consumption
*/
sealed trait KafkaOffsetRangeLimitBinds to the earliest available offsets in Kafka topics.
/**
* Binds to earliest available offsets in Kafka
* Starts reading from the beginning of each partition
*/
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimitConfiguration:
// For streaming queries
.option("startingOffsets", "earliest")
// For batch queries (default for startingOffsets)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest") // Cannot use "earliest" for endingUsage Examples:
// Stream from beginning of topics
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "earliest") // Read from beginning
.load()
// Batch read from earliest to latest
val batch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()Binds to the latest available offsets in Kafka topics.
/**
* Binds to latest available offsets in Kafka
* Starts reading from the current end of each partition
*/
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimitConfiguration:
// For streaming queries (default for startingOffsets)
.option("startingOffsets", "latest")
// For batch queries (default for endingOffsets)
.option("endingOffsets", "latest") // Cannot use "latest" for starting in batchUsage Examples:
// Stream from current position (default behavior)
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "latest") // Start from current end
.load()
// Batch read up to current position
val batch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest") // Read up to current end
.load()Binds to specific offsets for precise control over reading positions.
/**
* Binds to specific offsets per partition
* Provides precise control over starting/ending positions
*
* @param partitionOffsets Map of TopicPartition to offset values
* -1 = latest offset, -2 = earliest offset
*/
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimitConfiguration:
// JSON specification of partition offsets
.option("startingOffsets", """{"topic1":{"0":23,"1":345},"topic2":{"0":0}}""")
.option("endingOffsets", """{"topic1":{"0":100,"1":500},"topic2":{"0":50}}""")Usage Examples:
// Start from specific offsets
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "orders,payments")
.option("startingOffsets", """{"orders":{"0":1000,"1":2000},"payments":{"0":500}}""")
.load()
// Batch read with specific ranges
val batch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", """{"my-topic":{"0":100,"1":200}}""")
.option("endingOffsets", """{"my-topic":{"0":1000,"1":2000}}""")
.load()
// Mix specific offsets with latest/earliest using special values
val mixed = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", """{"my-topic":{"0":-2,"1":500}}""") // -2 = earliest, 500 = specific
.option("endingOffsets", """{"my-topic":{"0":-1,"1":1000}}""") // -1 = latest, 1000 = specific
.load()Special Offset Values:
object KafkaOffsetRangeLimit {
val LATEST = -1L // Use latest available offset
val EARLIEST = -2L // Use earliest available offset
}Binds to earliest offset with timestamp greater than or equal to specified timestamp per partition.
/**
* Binds to earliest offset with timestamp >= specified timestamp per partition
* Enables time-based offset resolution
*
* @param topicTimestamps Map of TopicPartition to timestamp values (Unix milliseconds)
* @param strategyOnNoMatchingStartingOffset Strategy when no matching timestamp found
*/
case class SpecificTimestampRangeLimit(
topicTimestamps: Map[TopicPartition, Long],
strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value
) extends KafkaOffsetRangeLimitConfiguration:
// JSON specification of partition timestamps
.option("startingOffsetsByTimestamp", """{"topic1":{"0":1609459200000,"1":1609459200000}}""")
.option("endingOffsetsByTimestamp", """{"topic1":{"0":1609545600000,"1":1609545600000}}""")
// Strategy for when no matching timestamp is found
.option("startingOffsetsByTimestampStrategy", "error") // Default: throw error
.option("startingOffsetsByTimestampStrategy", "latest") // Use latest offsetUsage Examples:
// Start from specific timestamp (January 1, 2021 00:00:00 UTC)
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsetsByTimestamp", """{"events":{"0":1609459200000,"1":1609459200000}}""")
.load()
// Batch read between timestamps
val batch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "logs")
.option("startingOffsetsByTimestamp", """{"logs":{"0":1609459200000}}""") // Jan 1, 2021
.option("endingOffsetsByTimestamp", """{"logs":{"0":1609545600000}}""") // Jan 2, 2021
.load()Applies timestamp-based offset resolution to all partitions using a single timestamp.
/**
* Applies timestamp-based offset resolution to all partitions
* Uses single timestamp for all discovered partitions
*
* @param timestamp Unix timestamp in milliseconds
* @param strategyOnNoMatchingStartingOffset Strategy when no matching timestamp found
*/
case class GlobalTimestampRangeLimit(
timestamp: Long,
strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value
) extends KafkaOffsetRangeLimitConfiguration:
// Single timestamp applied to all partitions
.option("startingTimestamp", "1609459200000") // January 1, 2021 00:00:00 UTC
.option("endingTimestamp", "1609545600000") // January 2, 2021 00:00:00 UTCUsage Examples:
// Start all partitions from same timestamp
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingTimestamp", "1609459200000") // Applies to all partitions
.load()
// Batch read time range across all partitions
val batch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribePattern", "logs-.*")
.option("startingTimestamp", "1609459200000")
.option("endingTimestamp", "1609545600000")
.load()Custom offset implementation for tracking partition positions in streaming queries.
/**
* Custom Offset implementation tracking all partitions and their offsets
* Used internally by streaming sources for checkpoint management
*
* @param partitionToOffsets Map of TopicPartition to current offset
*/
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends streaming.OffsetCompanion Object Methods:
object KafkaSourceOffset {
/** Extract partition offsets from generic Offset */
def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long]
/** Create offset from offset tuples */
def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset
/** Create from serialized offset */
def apply(offset: SerializedOffset): KafkaSourceOffset
/** Create from streaming offset */
def apply(offset: streaming.Offset): KafkaSourceOffset
}Represents offset for a specific partition in V2 DataSource API.
/**
* Represents offset for a specific partition in V2 DataSource API
* Used for continuous streaming offset management
*
* @param topicPartition The topic partition
* @param partitionOffset Current offset in the partition
*/
case class KafkaSourcePartitionOffset(topicPartition: TopicPartition, partitionOffset: Long) extends PartitionOffsetInterface for fetching and managing offsets from Kafka.
/**
* Base interface for fetching offsets from Kafka
* Handles interaction with Kafka brokers for offset management
*/
trait KafkaOffsetReader {
/** Close resources and connections */
def close(): Unit
/** Fetch partition offsets based on range limit */
def fetchPartitionOffsets(offsetRangeLimit: KafkaOffsetRangeLimit, isStartingOffsets: Boolean): Map[TopicPartition, Long]
/** Fetch specific offsets with validation */
def fetchSpecificOffsets(partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset
/** Fetch timestamp-based offsets for specific partitions */
def fetchSpecificTimestampBasedOffsets(
topicTimestamps: Map[TopicPartition, Long],
isStartingOffsets: Boolean,
strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value
): KafkaSourceOffset
/** Fetch timestamp-based offsets globally */
def fetchGlobalTimestampBasedOffsets(
timestamp: Long,
isStartingOffsets: Boolean,
strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value
): KafkaSourceOffset
/** Fetch earliest available offsets */
def fetchEarliestOffsets(): Map[TopicPartition, Long]
/** Fetch latest available offsets */
def fetchLatestOffsets(knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap
}When using timestamp-based offsets, configure the strategy for handling missing timestamps:
object StrategyOnNoMatchStartingOffset extends Enumeration {
val ERROR = Value // Throw exception when no matching timestamp found (default)
val LATEST = Value // Use latest offset when no matching timestamp found
}Configuration:
// Default behavior - throw error if timestamp not found
.option("startingOffsetsByTimestampStrategy", "error")
// Fallback to latest offset if timestamp not found
.option("startingOffsetsByTimestampStrategy", "latest")When multiple offset options are specified, they are processed in priority order:
startingTimestamp / endingTimestampstartingOffsetsByTimestamp / endingOffsetsByTimestampstartingOffsets / endingOffsetsLatestOffsetRangeLimit for streaming, EarliestOffsetRangeLimit for batch// These options are invalid for streaming queries
.option("endingOffsets", "...") // Not supported
.option("endingOffsetsByTimestamp", "...") // Not supported
.option("endingTimestamp", "...") // Not supported// Invalid starting offset for batch
.option("startingOffsets", "latest") // Must use "earliest" or specific offsets
// Invalid ending offset for batch
.option("endingOffsets", "earliest") // Must use "latest" or specific offsets
// Specific offset restrictions
.option("startingOffsets", """{"topic":{"0":-1}}""") // -1 (latest) not allowed for starting
.option("endingOffsets", """{"topic":{"0":-2}}""") // -2 (earliest) not allowed for endingInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-sql-kafka-0-10-2-13