CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-sql-kafka-0-10-2-11

Apache Spark Structured Streaming integration with Apache Kafka providing comprehensive data source and sink capabilities for both batch and streaming workloads.

Overview
Eval results
Files

offset-management.mddocs/

Offset Management

Comprehensive offset tracking and management system for precise control over Kafka data consumption boundaries, supporting both streaming and batch processing with fault tolerance and exactly-once semantics.

Capabilities

Offset Range Limits

Defines desired offset range limits for consuming data from Kafka partitions.

/**
 * Base trait for offset range limits
 */
sealed trait KafkaOffsetRangeLimit

/**
 * Binds to earliest available offsets in partitions
 */
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit

/**
 * Binds to latest available offsets in partitions  
 */
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit

/**
 * Binds to specific offset positions per partition
 * @param partitionOffsets Map of TopicPartition to specific offset
 */
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

Offset Range Limit Constants

object KafkaOffsetRangeLimit {
  /** Indicates resolution to latest offset */
  val LATEST: Long = -1L
  
  /** Indicates resolution to earliest offset */
  val EARLIEST: Long = -2L
}

Source Offset Management

Custom offset implementation for Kafka sources containing partition-to-offset mappings.

/**
 * Custom offset for Kafka source containing partition-to-offset mappings
 * @param partitionToOffsets Map of TopicPartition to offset
 */
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2 {
  /** JSON representation of partition offsets */
  def json: String
}

/**
 * Companion object for KafkaSourceOffset
 */
object KafkaSourceOffset {
  /**
   * Extracts partition offsets from generic offset
   * @param offset Generic offset to extract from
   * @return Map of TopicPartition to offset
   */
  def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long]
  
  /**
   * Creates offset from topic-partition-offset tuples
   * @param offsetTuples Variable arguments of (topic, partition, offset)
   * @return KafkaSourceOffset instance
   */
  def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset
  
  /**
   * Creates offset from JSON representation
   * @param offset Serialized offset in JSON format
   * @return KafkaSourceOffset instance
   */
  def apply(offset: SerializedOffset): KafkaSourceOffset
}

Partition Offset for Continuous Streaming

/**
 * Represents offset for a specific partition in continuous streaming
 * @param topicPartition The topic partition
 * @param partitionOffset The offset within the partition
 */
case class KafkaSourcePartitionOffset(
  topicPartition: TopicPartition, 
  partitionOffset: Long
) extends PartitionOffset

Offset Reader

Component for reading offset information from Kafka using the KafkaConsumer API.

/**
 * Component for reading offset information from Kafka
 */
class KafkaOffsetReader extends Logging {
  /**
   * Closes connection to Kafka brokers
   */
  def close(): Unit
  
  /**
   * Fetches topic partitions based on consumer strategy
   * @return Set of TopicPartition objects
   */
  def fetchTopicPartitions(): Set[TopicPartition]
  
  /**
   * Resolves specific partition offsets
   * @param partitionOffsets Map of partitions to offsets (may contain special values)
   * @param reportDataLoss Function to report data loss
   * @return KafkaSourceOffset with resolved offsets
   */
  def fetchSpecificOffsets(
    partitionOffsets: Map[TopicPartition, Long],
    reportDataLoss: String => Unit
  ): KafkaSourceOffset
  
  /**
   * Fetches earliest available offsets for all partitions
   * @return Map of TopicPartition to earliest offset
   */
  def fetchEarliestOffsets(): Map[TopicPartition, Long]
  
  /**
   * Fetches latest available offsets
   * @param knownOffsets Previously known offsets for comparison
   * @return Map of TopicPartition to latest offset
   */
  def fetchLatestOffsets(knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap
  
  /**
   * Fetches earliest offsets for specific partitions
   * @param newPartitions Set of partitions to fetch offsets for
   * @return Map of TopicPartition to earliest offset
   */
  def fetchEarliestOffsets(newPartitions: Set[TopicPartition]): Map[TopicPartition, Long]
}

/**
 * Companion object for KafkaOffsetReader
 */
object KafkaOffsetReader {
  /**
   * Returns the fixed schema for Kafka records
   * @return StructType defining Kafka record schema
   */
  def kafkaSchema: StructType
}

Offset Range Calculation

Calculates offset ranges based on minPartitions configuration for parallelism optimization.

/**
 * Calculates offset ranges based on minPartitions configuration
 * @param minPartitions Minimum number of partitions to create
 */
class KafkaOffsetRangeCalculator(minPartitions: Option[Int]) {
  /**
   * Calculates offset ranges with preferred executor locations
   * @param fromOffsets Starting offsets per partition
   * @param untilOffsets Ending offsets per partition
   * @param executorLocations Available executor locations
   * @return Sequence of KafkaOffsetRange objects
   */
  def getRanges(
    fromOffsets: Map[TopicPartition, Long],
    untilOffsets: Map[TopicPartition, Long],
    executorLocations: Seq[String]
  ): Seq[KafkaOffsetRange]
}

/**
 * Companion object for KafkaOffsetRangeCalculator
 */
object KafkaOffsetRangeCalculator {
  /**
   * Creates calculator from DataSource options
   * @param options DataSource options containing minPartitions
   * @return KafkaOffsetRangeCalculator instance
   */
  def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator
}

/**
 * Represents an offset range for a topic partition with preferred executor location
 * @param topicPartition The topic partition
 * @param fromOffset Starting offset (inclusive)
 * @param untilOffset Ending offset (exclusive)
 * @param preferredLoc Preferred executor location for locality
 */
case class KafkaOffsetRange(
  topicPartition: TopicPartition,
  fromOffset: Long,
  untilOffset: Long,
  preferredLoc: Option[String]
) {
  /** Lazy-computed size of the offset range */
  lazy val size: Long = untilOffset - fromOffset
}

Configuration Options

Starting Offsets (Streaming)

// Start from earliest available offsets
.option("startingOffsets", "earliest")

// Start from latest available offsets  
.option("startingOffsets", "latest")

// Start from specific offsets (JSON format)
.option("startingOffsets", """{"topic1":{"0":123,"1":456},"topic2":{"0":789}}""")

Ending Offsets (Batch Only)

// Read until latest available offsets
.option("endingOffsets", "latest")

// Read until specific offsets (JSON format)
.option("endingOffsets", """{"topic1":{"0":500,"1":600},"topic2":{"0":800}}""")

Data Loss Handling

// Fail query on data loss (default)
.option("failOnDataLoss", "true")

// Continue processing on data loss
.option("failOnDataLoss", "false")

Usage Examples

Streaming with Specific Starting Offsets

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsets", """{"events":{"0":1000,"1":2000}}""")
  .option("failOnDataLoss", "false")
  .load()

Batch Processing with Offset Range

val batchDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "transactions")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", """{"transactions":{"0":5000,"1":6000}}""")
  .load()

Micro-batch with Offset Limits

val query = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "clickstream")
  .option("startingOffsets", "latest")
  .option("maxOffsetsPerTrigger", "10000")
  .load()
  .writeStream
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

Fault Tolerance

Offset Checkpointing

Spark automatically checkpoints processed offsets for exactly-once processing:

val query = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .load()
  .writeStream
  .option("checkpointLocation", "/path/to/checkpoint")
  .start()

Data Loss Detection

The system detects and handles various data loss scenarios:

  • Topic deletion: Detects when subscribed topics are deleted
  • Partition reassignment: Handles partition count changes
  • Offset expiration: Detects when requested offsets are no longer available
  • Broker failures: Handles temporary broker unavailability

Recovery Strategies

// Strict mode - fail on any data loss
.option("failOnDataLoss", "true")

// Lenient mode - log warnings and continue
.option("failOnDataLoss", "false")

Performance Tuning

Partition Parallelism

// Minimum partitions for parallel processing
.option("minPartitions", "20")

// Rate limiting for streaming
.option("maxOffsetsPerTrigger", "1000000")

Memory Management

// Kafka consumer buffer sizes
.option("kafka.receive.buffer.bytes", "65536")
.option("kafka.fetch.max.bytes", "52428800")
.option("kafka.max.poll.records", "500")

JSON Format Reference

Partition Assignment JSON

{
  "topic1": [0, 1, 2],
  "topic2": [0, 1]
}

Specific Offsets JSON

{
  "topic1": {
    "0": 1000,
    "1": 2000,
    "2": 3000
  },
  "topic2": {
    "0": 500,
    "1": 600
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-sql-kafka-0-10-2-11

docs

batch-processing.md

configuration.md

consumer-strategies.md

data-writing.md

index.md

offset-management.md

streaming-sources.md

tile.json