CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-sql-kafka-0-10-2-13

Kafka 0.10+ Source for Structured Streaming

Overview
Eval results
Files

offset-management.mddocs/

Offset Management

The Kafka connector provides comprehensive offset management capabilities for controlling where to start and stop reading data, including support for specific offsets, timestamps, and automatic offset tracking.

Capabilities

KafkaOffsetRangeLimit

Base interface for defining offset boundaries and reading ranges.

/**
 * Represents desired offset range limits for starting, ending, and specific offsets
 * Used to control the boundaries of Kafka data consumption
 */
sealed trait KafkaOffsetRangeLimit

EarliestOffsetRangeLimit

Binds to the earliest available offsets in Kafka topics.

/**
 * Binds to earliest available offsets in Kafka
 * Starts reading from the beginning of each partition
 */
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit

Configuration:

// For streaming queries
.option("startingOffsets", "earliest")

// For batch queries (default for startingOffsets)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")  // Cannot use "earliest" for ending

Usage Examples:

// Stream from beginning of topics
val stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("startingOffsets", "earliest")  // Read from beginning
  .load()

// Batch read from earliest to latest
val batch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("startingOffsets", "earliest")  
  .option("endingOffsets", "latest")
  .load()

LatestOffsetRangeLimit

Binds to the latest available offsets in Kafka topics.

/**
 * Binds to latest available offsets in Kafka
 * Starts reading from the current end of each partition
 */
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit

Configuration:

// For streaming queries (default for startingOffsets)
.option("startingOffsets", "latest")

// For batch queries (default for endingOffsets)  
.option("endingOffsets", "latest")  // Cannot use "latest" for starting in batch

Usage Examples:

// Stream from current position (default behavior)
val stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("startingOffsets", "latest")  // Start from current end
  .load()

// Batch read up to current position
val batch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")    // Read up to current end
  .load()

SpecificOffsetRangeLimit

Binds to specific offsets for precise control over reading positions.

/**
 * Binds to specific offsets per partition
 * Provides precise control over starting/ending positions
 * 
 * @param partitionOffsets Map of TopicPartition to offset values
 *                        -1 = latest offset, -2 = earliest offset  
 */
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

Configuration:

// JSON specification of partition offsets
.option("startingOffsets", """{"topic1":{"0":23,"1":345},"topic2":{"0":0}}""")
.option("endingOffsets", """{"topic1":{"0":100,"1":500},"topic2":{"0":50}}""")

Usage Examples:

// Start from specific offsets
val stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "orders,payments")
  .option("startingOffsets", """{"orders":{"0":1000,"1":2000},"payments":{"0":500}}""")
  .load()

// Batch read with specific ranges
val batch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")  
  .option("startingOffsets", """{"my-topic":{"0":100,"1":200}}""")
  .option("endingOffsets", """{"my-topic":{"0":1000,"1":2000}}""")
  .load()

// Mix specific offsets with latest/earliest using special values
val mixed = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("startingOffsets", """{"my-topic":{"0":-2,"1":500}}""")  // -2 = earliest, 500 = specific
  .option("endingOffsets", """{"my-topic":{"0":-1,"1":1000}}""")   // -1 = latest, 1000 = specific
  .load()

Special Offset Values:

object KafkaOffsetRangeLimit {
  val LATEST = -1L      // Use latest available offset
  val EARLIEST = -2L    // Use earliest available offset
}

SpecificTimestampRangeLimit

Binds to earliest offset with timestamp greater than or equal to specified timestamp per partition.

/**
 * Binds to earliest offset with timestamp >= specified timestamp per partition
 * Enables time-based offset resolution
 * 
 * @param topicTimestamps Map of TopicPartition to timestamp values (Unix milliseconds)
 * @param strategyOnNoMatchingStartingOffset Strategy when no matching timestamp found
 */
case class SpecificTimestampRangeLimit(
  topicTimestamps: Map[TopicPartition, Long],
  strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value
) extends KafkaOffsetRangeLimit

Configuration:

// JSON specification of partition timestamps  
.option("startingOffsetsByTimestamp", """{"topic1":{"0":1609459200000,"1":1609459200000}}""")
.option("endingOffsetsByTimestamp", """{"topic1":{"0":1609545600000,"1":1609545600000}}""")

// Strategy for when no matching timestamp is found
.option("startingOffsetsByTimestampStrategy", "error")   // Default: throw error
.option("startingOffsetsByTimestampStrategy", "latest")  // Use latest offset

Usage Examples:

// Start from specific timestamp (January 1, 2021 00:00:00 UTC)
val stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsetsByTimestamp", """{"events":{"0":1609459200000,"1":1609459200000}}""")
  .load()

// Batch read between timestamps
val batch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "logs")
  .option("startingOffsetsByTimestamp", """{"logs":{"0":1609459200000}}""")  // Jan 1, 2021
  .option("endingOffsetsByTimestamp", """{"logs":{"0":1609545600000}}""")    // Jan 2, 2021
  .load()

GlobalTimestampRangeLimit

Applies timestamp-based offset resolution to all partitions using a single timestamp.

/**
 * Applies timestamp-based offset resolution to all partitions
 * Uses single timestamp for all discovered partitions
 * 
 * @param timestamp Unix timestamp in milliseconds
 * @param strategyOnNoMatchingStartingOffset Strategy when no matching timestamp found
 */
case class GlobalTimestampRangeLimit(
  timestamp: Long,
  strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value  
) extends KafkaOffsetRangeLimit

Configuration:

// Single timestamp applied to all partitions
.option("startingTimestamp", "1609459200000")  // January 1, 2021 00:00:00 UTC
.option("endingTimestamp", "1609545600000")    // January 2, 2021 00:00:00 UTC

Usage Examples:

// Start all partitions from same timestamp
val stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "user-events")
  .option("startingTimestamp", "1609459200000")  // Applies to all partitions
  .load()

// Batch read time range across all partitions  
val batch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribePattern", "logs-.*")
  .option("startingTimestamp", "1609459200000")
  .option("endingTimestamp", "1609545600000")
  .load()

KafkaSourceOffset

Custom offset implementation for tracking partition positions in streaming queries.

/**
 * Custom Offset implementation tracking all partitions and their offsets
 * Used internally by streaming sources for checkpoint management
 * 
 * @param partitionToOffsets Map of TopicPartition to current offset
 */
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends streaming.Offset

Companion Object Methods:

object KafkaSourceOffset {
  /** Extract partition offsets from generic Offset */
  def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long]
  
  /** Create offset from offset tuples */
  def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset
  
  /** Create from serialized offset */
  def apply(offset: SerializedOffset): KafkaSourceOffset
  
  /** Create from streaming offset */
  def apply(offset: streaming.Offset): KafkaSourceOffset
}

KafkaSourcePartitionOffset

Represents offset for a specific partition in V2 DataSource API.

/**
 * Represents offset for a specific partition in V2 DataSource API
 * Used for continuous streaming offset management
 * 
 * @param topicPartition The topic partition
 * @param partitionOffset Current offset in the partition
 */
case class KafkaSourcePartitionOffset(topicPartition: TopicPartition, partitionOffset: Long) extends PartitionOffset

Offset Reading and Management

KafkaOffsetReader

Interface for fetching and managing offsets from Kafka.

/**
 * Base interface for fetching offsets from Kafka
 * Handles interaction with Kafka brokers for offset management
 */
trait KafkaOffsetReader {
  /** Close resources and connections */
  def close(): Unit
  
  /** Fetch partition offsets based on range limit */
  def fetchPartitionOffsets(offsetRangeLimit: KafkaOffsetRangeLimit, isStartingOffsets: Boolean): Map[TopicPartition, Long]
  
  /** Fetch specific offsets with validation */
  def fetchSpecificOffsets(partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset
  
  /** Fetch timestamp-based offsets for specific partitions */
  def fetchSpecificTimestampBasedOffsets(
    topicTimestamps: Map[TopicPartition, Long], 
    isStartingOffsets: Boolean,
    strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value
  ): KafkaSourceOffset
  
  /** Fetch timestamp-based offsets globally */
  def fetchGlobalTimestampBasedOffsets(
    timestamp: Long,
    isStartingOffsets: Boolean, 
    strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value
  ): KafkaSourceOffset
  
  /** Fetch earliest available offsets */
  def fetchEarliestOffsets(): Map[TopicPartition, Long]
  
  /** Fetch latest available offsets */
  def fetchLatestOffsets(knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap
}

Timestamp Strategy Configuration

When using timestamp-based offsets, configure the strategy for handling missing timestamps:

object StrategyOnNoMatchStartingOffset extends Enumeration {
  val ERROR = Value   // Throw exception when no matching timestamp found (default)
  val LATEST = Value  // Use latest offset when no matching timestamp found
}

Configuration:

// Default behavior - throw error if timestamp not found
.option("startingOffsetsByTimestampStrategy", "error")

// Fallback to latest offset if timestamp not found
.option("startingOffsetsByTimestampStrategy", "latest")

Option Priority

When multiple offset options are specified, they are processed in priority order:

  1. Global timestamp: startingTimestamp / endingTimestamp
  2. Partition timestamps: startingOffsetsByTimestamp / endingOffsetsByTimestamp
  3. Specific offsets: startingOffsets / endingOffsets
  4. Default values: LatestOffsetRangeLimit for streaming, EarliestOffsetRangeLimit for batch

Validation Rules

Streaming Query Restrictions

// These options are invalid for streaming queries
.option("endingOffsets", "...")           // Not supported
.option("endingOffsetsByTimestamp", "...")  // Not supported  
.option("endingTimestamp", "...")         // Not supported

Batch Query Restrictions

// Invalid starting offset for batch
.option("startingOffsets", "latest")      // Must use "earliest" or specific offsets

// Invalid ending offset for batch  
.option("endingOffsets", "earliest")      // Must use "latest" or specific offsets

// Specific offset restrictions
.option("startingOffsets", """{"topic":{"0":-1}}""")  // -1 (latest) not allowed for starting
.option("endingOffsets", """{"topic":{"0":-2}}""")    // -2 (earliest) not allowed for ending

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-sql-kafka-0-10-2-13

docs

batch-reading.md

consumer-strategies.md

data-source.md

index.md

offset-management.md

schema-conversion.md

streaming-sources.md

writing.md

tile.json