CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly

Assembly JAR providing Apache Spark integration with Apache Kafka 0.10 for reliable distributed streaming data processing

Pending
Overview
Eval results
Files

offset-management.mddocs/

Offset Management

Comprehensive offset range management and commit operations for exactly-once processing semantics. The offset management system provides precise control over which Kafka messages are consumed and enables reliable offset tracking for fault-tolerant stream processing.

Capabilities

OffsetRange Class

Represents a range of offsets from a single Kafka TopicPartition, defining exactly which messages to consume.

final class OffsetRange {
  val topic: String           // Kafka topic name
  val partition: Int          // Kafka partition id  
  val fromOffset: Long        // Inclusive starting offset
  val untilOffset: Long       // Exclusive ending offset
  
  def topicPartition(): TopicPartition    // Kafka TopicPartition object for convenience
  def count(): Long                       // Number of messages this OffsetRange refers to
}

Properties:

  • topic: The name of the Kafka topic
  • partition: The partition number within the topic
  • fromOffset: Starting offset (inclusive) - first message to consume
  • untilOffset: Ending offset (exclusive) - first message NOT to consume

Methods:

  • topicPartition(): Returns a Kafka TopicPartition object
  • count(): Returns the number of messages in this range (untilOffset - fromOffset)

OffsetRange Factory Methods

The OffsetRange companion object provides factory methods for creating instances.

object OffsetRange {
  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
  def create(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
  def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
  def apply(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
}

Usage Examples:

import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.kafka.common.TopicPartition

// Create using topic name and partition number
val range1 = OffsetRange.create("orders", 0, 1000L, 2000L)
val range2 = OffsetRange.apply("payments", 1, 500L, 1500L)

// Create using TopicPartition object
val topicPartition = new TopicPartition("inventory", 2)
val range3 = OffsetRange.create(topicPartition, 0L, 1000L)
val range4 = OffsetRange.apply(topicPartition, 2000L, 3000L)

// Access properties
println(s"Topic: ${range1.topic}, Partition: ${range1.partition}")
println(s"Range: ${range1.fromOffset} to ${range1.untilOffset}")
println(s"Message count: ${range1.count()}")
println(s"TopicPartition: ${range1.topicPartition()}")

HasOffsetRanges Trait

Interface for objects that contain a collection of OffsetRanges, typically implemented by Kafka RDDs.

trait HasOffsetRanges {
  def offsetRanges: Array[OffsetRange]
}

Usage Example:

import org.apache.spark.streaming.kafka010.{KafkaUtils, HasOffsetRanges}

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

stream.foreachRDD { rdd =>
  // Cast RDD to HasOffsetRanges to access offset information
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  
  // Process each offset range
  offsetRanges.foreach { offsetRange =>
    println(s"Topic: ${offsetRange.topic}, " +
            s"Partition: ${offsetRange.partition}, " + 
            s"From: ${offsetRange.fromOffset}, " +
            s"Until: ${offsetRange.untilOffset}, " +
            s"Count: ${offsetRange.count()}")
  }
  
  // Process the actual data
  rdd.foreach { record =>
    // Your processing logic here
    println(s"Processing: ${record.key} -> ${record.value}")
  }
}

CanCommitOffsets Trait

Interface for objects that can commit offset ranges to Kafka for offset management.

trait CanCommitOffsets {
  def commitAsync(offsetRanges: Array[OffsetRange]): Unit
  def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
}

Methods:

  • commitAsync(offsetRanges): Queue offset ranges for commit to Kafka asynchronously
  • commitAsync(offsetRanges, callback): Queue offset ranges with a completion callback

Usage Example:

import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges}
import org.apache.kafka.clients.consumer.{OffsetCommitCallback, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
import java.util.{Map => JMap}

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  
  // Process your data
  val processedCount = rdd.map { record =>
    // Your processing logic
    processMessage(record)
    1
  }.reduce(_ + _)
  
  // Commit offsets after successful processing
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  
  println(s"Processed $processedCount messages and committed offsets")
}

Usage with Callback:

val callback = new OffsetCommitCallback {
  override def onComplete(
    metadata: JMap[TopicPartition, OffsetAndMetadata], 
    exception: Exception
  ): Unit = {
    if (exception != null) {
      println(s"Offset commit failed: ${exception.getMessage}")
      // Handle commit failure - maybe retry or alert
    } else {
      println("Offset commit successful")
      metadata.forEach { (tp, om) =>
        println(s"Committed ${tp.topic}-${tp.partition} at offset ${om.offset}")
      }
    }
  }
}

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  
  // Process data...
  processRDD(rdd)
  
  // Commit with callback
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, callback)
}

Advanced Usage Patterns

Manual Offset Management

For maximum control, you can manage offsets manually using external storage:

import org.apache.spark.streaming.kafka010._

// Custom offset storage (could be database, ZooKeeper, etc.)
object OffsetStorage {
  def saveOffsets(offsetRanges: Array[OffsetRange]): Unit = {
    // Save to your preferred storage system
    offsetRanges.foreach { range =>
      // Save range.topic, range.partition, range.untilOffset
      database.saveOffset(range.topic, range.partition, range.untilOffset)
    }
  }
  
  def loadOffsets(topics: Array[String]): Map[TopicPartition, Long] = {
    // Load from your storage system
    val offsets = topics.flatMap { topic =>
      getPartitionsForTopic(topic).map { partition =>
        val offset = database.loadOffset(topic, partition)
        new TopicPartition(topic, partition) -> offset
      }
    }.toMap
    offsets
  }
}

// Use stored offsets when creating consumer strategy
val storedOffsets = OffsetStorage.loadOffsets(topics)
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](
  topics, 
  kafkaParams, 
  storedOffsets
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  
  // Process data
  rdd.foreach(processRecord)
  
  // Save offsets to external storage instead of Kafka
  OffsetStorage.saveOffsets(offsetRanges)
  
  // Don't use CanCommitOffsets.commitAsync in this case
}

Exactly-Once Processing Pattern

Implement exactly-once semantics using offset management:

import org.apache.spark.streaming.kafka010._

def processExactlyOnce(rdd: RDD[ConsumerRecord[String, String]]): Unit = {
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  
  // Begin transaction or prepare idempotent operations
  val transaction = database.beginTransaction()
  
  try {
    // Process each partition
    rdd.foreachPartition { partition =>
      partition.foreach { record =>
        // Your idempotent processing logic here
        val result = processMessage(record)
        transaction.write(result)
      }
    }
    
    // Commit transaction and offsets atomically
    transaction.commit()
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    
  } catch {
    case ex: Exception =>
      transaction.rollback()
      throw ex
  }
}

stream.foreachRDD(processExactlyOnce)

Batch Processing with Specific Ranges

Use offset ranges for batch processing with precise control:

import org.apache.spark.streaming.kafka010._

// Define specific ranges for batch processing
val batchRanges = Array(
  OffsetRange("transactions", 0, 0L, 10000L),      // Process first 10k messages
  OffsetRange("transactions", 1, 5000L, 15000L),   // Process messages 5k-15k
  OffsetRange("transactions", 2, 0L, 8000L)        // Process first 8k messages
)

val rdd = KafkaUtils.createRDD[String, String](
  spark.sparkContext,
  kafkaParams,
  batchRanges,
  LocationStrategies.PreferConsistent
)

// The RDD contains exactly the specified ranges
val totalMessages = batchRanges.map(_.count()).sum
println(s"Processing exactly $totalMessages messages")

rdd.foreach { record =>
  println(s"Processing: ${record.topic}-${record.partition} " +
          s"offset ${record.offset}: ${record.key} -> ${record.value}")
}

Error Handling and Recovery

Offset Commit Failure Handling

val resilientCallback = new OffsetCommitCallback {
  override def onComplete(
    metadata: JMap[TopicPartition, OffsetAndMetadata], 
    exception: Exception
  ): Unit = {
    if (exception != null) {
      exception match {
        case _: org.apache.kafka.clients.consumer.CommitFailedException =>
          // Consumer group rebalanced, offsets may be stale
          println("Commit failed due to rebalance, will retry on next batch")
          
        case _: org.apache.kafka.common.errors.TimeoutException =>
          // Network timeout, might retry
          println("Commit timeout, will retry")
          
        case other =>
          // Other errors might need different handling
          println(s"Unexpected commit error: ${other.getMessage}")
      }
    }
  }
}

Duplicate Message Handling

// Use offset tracking to detect duplicates
val processedOffsets = scala.collection.mutable.Set[String]()

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  
  rdd.foreach { record =>
    val offsetKey = s"${record.topic}-${record.partition}-${record.offset}"
    
    if (!processedOffsets.contains(offsetKey)) {
      processMessage(record)
      processedOffsets.add(offsetKey)
    } else {
      println(s"Skipping duplicate message at $offsetKey")
    }
  }
  
  // Clean up old offset tracking to prevent memory leaks
  if (processedOffsets.size > 100000) {
    processedOffsets.clear()
  }
  
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

Best Practices

  1. Always access offsets: Use HasOffsetRanges to get offset information from RDDs for monitoring and debugging.

  2. Commit after processing: Only commit offsets after successfully processing all messages in the batch.

  3. Use callbacks for monitoring: Implement OffsetCommitCallback to monitor commit success/failure.

  4. Handle commit failures gracefully: Don't fail the entire job on offset commit failures - implement retry logic.

  5. External offset storage for critical apps: For applications requiring strict exactly-once semantics, consider storing offsets externally.

  6. Monitor offset lag: Track the difference between latest available offsets and committed offsets.

  7. Partition-aware processing: Remember that offset ranges are per-partition - design your processing accordingly.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly

docs

consumer-strategies.md

index.md

location-strategies.md

offset-management.md

per-partition-config.md

stream-creation.md

tile.json