Assembly JAR providing Apache Spark integration with Apache Kafka 0.10 for reliable distributed streaming data processing
—
Comprehensive offset range management and commit operations for exactly-once processing semantics. The offset management system provides precise control over which Kafka messages are consumed and enables reliable offset tracking for fault-tolerant stream processing.
Represents a range of offsets from a single Kafka TopicPartition, defining exactly which messages to consume.
final class OffsetRange {
val topic: String // Kafka topic name
val partition: Int // Kafka partition id
val fromOffset: Long // Inclusive starting offset
val untilOffset: Long // Exclusive ending offset
def topicPartition(): TopicPartition // Kafka TopicPartition object for convenience
def count(): Long // Number of messages this OffsetRange refers to
}Properties:
topic: The name of the Kafka topicpartition: The partition number within the topicfromOffset: Starting offset (inclusive) - first message to consumeuntilOffset: Ending offset (exclusive) - first message NOT to consumeMethods:
topicPartition(): Returns a Kafka TopicPartition objectcount(): Returns the number of messages in this range (untilOffset - fromOffset)The OffsetRange companion object provides factory methods for creating instances.
object OffsetRange {
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
def create(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
def apply(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
}Usage Examples:
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.kafka.common.TopicPartition
// Create using topic name and partition number
val range1 = OffsetRange.create("orders", 0, 1000L, 2000L)
val range2 = OffsetRange.apply("payments", 1, 500L, 1500L)
// Create using TopicPartition object
val topicPartition = new TopicPartition("inventory", 2)
val range3 = OffsetRange.create(topicPartition, 0L, 1000L)
val range4 = OffsetRange.apply(topicPartition, 2000L, 3000L)
// Access properties
println(s"Topic: ${range1.topic}, Partition: ${range1.partition}")
println(s"Range: ${range1.fromOffset} to ${range1.untilOffset}")
println(s"Message count: ${range1.count()}")
println(s"TopicPartition: ${range1.topicPartition()}")Interface for objects that contain a collection of OffsetRanges, typically implemented by Kafka RDDs.
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}Usage Example:
import org.apache.spark.streaming.kafka010.{KafkaUtils, HasOffsetRanges}
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD { rdd =>
// Cast RDD to HasOffsetRanges to access offset information
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Process each offset range
offsetRanges.foreach { offsetRange =>
println(s"Topic: ${offsetRange.topic}, " +
s"Partition: ${offsetRange.partition}, " +
s"From: ${offsetRange.fromOffset}, " +
s"Until: ${offsetRange.untilOffset}, " +
s"Count: ${offsetRange.count()}")
}
// Process the actual data
rdd.foreach { record =>
// Your processing logic here
println(s"Processing: ${record.key} -> ${record.value}")
}
}Interface for objects that can commit offset ranges to Kafka for offset management.
trait CanCommitOffsets {
def commitAsync(offsetRanges: Array[OffsetRange]): Unit
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
}Methods:
commitAsync(offsetRanges): Queue offset ranges for commit to Kafka asynchronouslycommitAsync(offsetRanges, callback): Queue offset ranges with a completion callbackUsage Example:
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges}
import org.apache.kafka.clients.consumer.{OffsetCommitCallback, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
import java.util.{Map => JMap}
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Process your data
val processedCount = rdd.map { record =>
// Your processing logic
processMessage(record)
1
}.reduce(_ + _)
// Commit offsets after successful processing
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
println(s"Processed $processedCount messages and committed offsets")
}Usage with Callback:
val callback = new OffsetCommitCallback {
override def onComplete(
metadata: JMap[TopicPartition, OffsetAndMetadata],
exception: Exception
): Unit = {
if (exception != null) {
println(s"Offset commit failed: ${exception.getMessage}")
// Handle commit failure - maybe retry or alert
} else {
println("Offset commit successful")
metadata.forEach { (tp, om) =>
println(s"Committed ${tp.topic}-${tp.partition} at offset ${om.offset}")
}
}
}
}
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Process data...
processRDD(rdd)
// Commit with callback
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, callback)
}For maximum control, you can manage offsets manually using external storage:
import org.apache.spark.streaming.kafka010._
// Custom offset storage (could be database, ZooKeeper, etc.)
object OffsetStorage {
def saveOffsets(offsetRanges: Array[OffsetRange]): Unit = {
// Save to your preferred storage system
offsetRanges.foreach { range =>
// Save range.topic, range.partition, range.untilOffset
database.saveOffset(range.topic, range.partition, range.untilOffset)
}
}
def loadOffsets(topics: Array[String]): Map[TopicPartition, Long] = {
// Load from your storage system
val offsets = topics.flatMap { topic =>
getPartitionsForTopic(topic).map { partition =>
val offset = database.loadOffset(topic, partition)
new TopicPartition(topic, partition) -> offset
}
}.toMap
offsets
}
}
// Use stored offsets when creating consumer strategy
val storedOffsets = OffsetStorage.loadOffsets(topics)
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](
topics,
kafkaParams,
storedOffsets
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Process data
rdd.foreach(processRecord)
// Save offsets to external storage instead of Kafka
OffsetStorage.saveOffsets(offsetRanges)
// Don't use CanCommitOffsets.commitAsync in this case
}Implement exactly-once semantics using offset management:
import org.apache.spark.streaming.kafka010._
def processExactlyOnce(rdd: RDD[ConsumerRecord[String, String]]): Unit = {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Begin transaction or prepare idempotent operations
val transaction = database.beginTransaction()
try {
// Process each partition
rdd.foreachPartition { partition =>
partition.foreach { record =>
// Your idempotent processing logic here
val result = processMessage(record)
transaction.write(result)
}
}
// Commit transaction and offsets atomically
transaction.commit()
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} catch {
case ex: Exception =>
transaction.rollback()
throw ex
}
}
stream.foreachRDD(processExactlyOnce)Use offset ranges for batch processing with precise control:
import org.apache.spark.streaming.kafka010._
// Define specific ranges for batch processing
val batchRanges = Array(
OffsetRange("transactions", 0, 0L, 10000L), // Process first 10k messages
OffsetRange("transactions", 1, 5000L, 15000L), // Process messages 5k-15k
OffsetRange("transactions", 2, 0L, 8000L) // Process first 8k messages
)
val rdd = KafkaUtils.createRDD[String, String](
spark.sparkContext,
kafkaParams,
batchRanges,
LocationStrategies.PreferConsistent
)
// The RDD contains exactly the specified ranges
val totalMessages = batchRanges.map(_.count()).sum
println(s"Processing exactly $totalMessages messages")
rdd.foreach { record =>
println(s"Processing: ${record.topic}-${record.partition} " +
s"offset ${record.offset}: ${record.key} -> ${record.value}")
}val resilientCallback = new OffsetCommitCallback {
override def onComplete(
metadata: JMap[TopicPartition, OffsetAndMetadata],
exception: Exception
): Unit = {
if (exception != null) {
exception match {
case _: org.apache.kafka.clients.consumer.CommitFailedException =>
// Consumer group rebalanced, offsets may be stale
println("Commit failed due to rebalance, will retry on next batch")
case _: org.apache.kafka.common.errors.TimeoutException =>
// Network timeout, might retry
println("Commit timeout, will retry")
case other =>
// Other errors might need different handling
println(s"Unexpected commit error: ${other.getMessage}")
}
}
}
}// Use offset tracking to detect duplicates
val processedOffsets = scala.collection.mutable.Set[String]()
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreach { record =>
val offsetKey = s"${record.topic}-${record.partition}-${record.offset}"
if (!processedOffsets.contains(offsetKey)) {
processMessage(record)
processedOffsets.add(offsetKey)
} else {
println(s"Skipping duplicate message at $offsetKey")
}
}
// Clean up old offset tracking to prevent memory leaks
if (processedOffsets.size > 100000) {
processedOffsets.clear()
}
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}Always access offsets: Use HasOffsetRanges to get offset information from RDDs for monitoring and debugging.
Commit after processing: Only commit offsets after successfully processing all messages in the batch.
Use callbacks for monitoring: Implement OffsetCommitCallback to monitor commit success/failure.
Handle commit failures gracefully: Don't fail the entire job on offset commit failures - implement retry logic.
External offset storage for critical apps: For applications requiring strict exactly-once semantics, consider storing offsets externally.
Monitor offset lag: Track the difference between latest available offsets and committed offsets.
Partition-aware processing: Remember that offset ranges are per-partition - design your processing accordingly.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly