CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10

A shaded JAR assembly containing all dependencies needed for Kafka integration with Spark Streaming

Pending
Overview
Eval results
Files

offset-management.mddocs/

Offset Management

Comprehensive offset management utilities for controlling exactly which Kafka messages to process, including offset range representation, cluster interaction helpers, and consumer group coordination.

Capabilities

OffsetRange Class

Represents a range of offsets from a single Kafka topic and partition.

/**
 * Represents a range of offsets from a single Kafka TopicAndPartition.
 * 
 * @param topic Kafka topic name
 * @param partition Kafka partition id
 * @param fromOffset Inclusive starting offset
 * @param untilOffset Exclusive ending offset
 */
final class OffsetRange(
  val topic: String,
  val partition: Int,
  val fromOffset: Long,
  val untilOffset: Long
) extends Serializable {
  
  /** Kafka TopicAndPartition object, for convenience */
  def topicAndPartition(): TopicAndPartition
  
  /** Number of messages this OffsetRange refers to */
  def count(): Long
  
  override def equals(obj: Any): Boolean
  override def hashCode(): Int  
  override def toString(): String
}

Usage Examples:

import org.apache.spark.streaming.kafka.OffsetRange
import kafka.common.TopicAndPartition

// Create offset ranges
val range1 = OffsetRange("events", 0, 1000, 2000)
val range2 = OffsetRange.create("logs", 1, 500, 1500)

// From TopicAndPartition
val tp = TopicAndPartition("metrics", 2)
val range3 = OffsetRange(tp, 1000, 1500)

// Access properties
println(s"Topic: ${range1.topic}")
println(s"Partition: ${range1.partition}")
println(s"Message count: ${range1.count()}")
println(s"TopicAndPartition: ${range1.topicAndPartition()}")

OffsetRange Companion Object

Factory methods for creating OffsetRange instances.

object OffsetRange {
  /** Create OffsetRange from topic, partition, and offset values */
  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
  
  /** Create OffsetRange from TopicAndPartition and offset values */
  def create(topicAndPartition: TopicAndPartition, fromOffset: Long, untilOffset: Long): OffsetRange
  
  /** Apply method for creating OffsetRange instances */
  def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
  
  /** Apply method with TopicAndPartition */
  def apply(topicAndPartition: TopicAndPartition, fromOffset: Long, untilOffset: Long): OffsetRange
}

Java Usage:

import org.apache.spark.streaming.kafka.OffsetRange;
import kafka.common.TopicAndPartition;

// Create offset ranges in Java
OffsetRange range1 = OffsetRange.create("events", 0, 1000, 2000);
OffsetRange range2 = OffsetRange.create(new TopicAndPartition("logs", 1), 500, 1500);

// Access properties
System.out.println("Topic: " + range1.topic());
System.out.println("Partition: " + range1.partition());
System.out.println("Count: " + range1.count());

HasOffsetRanges Trait

Interface for objects that contain offset ranges, typically implemented by Kafka RDDs.

/**
 * Represents any object that has a collection of OffsetRanges.
 * This can be used to access the offset ranges in RDDs generated by direct Kafka DStream.
 */
trait HasOffsetRanges {
  def offsetRanges: Array[OffsetRange]
}

Usage Examples:

// Access offset ranges from direct stream RDDs
directStream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  
  offsetRanges.foreach { offsetRange =>
    println(s"${offsetRange.topic} ${offsetRange.partition} " +
            s"${offsetRange.fromOffset} ${offsetRange.untilOffset}")
  }
  
  // Process the data
  rdd.foreach(println)
}

Java Usage:

import org.apache.spark.streaming.kafka.HasOffsetRanges;

stream.foreachRDD(rdd -> {
    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    
    for (OffsetRange range : offsetRanges) {
        System.out.printf("%s %d %d %d%n", 
            range.topic(), range.partition(), 
            range.fromOffset(), range.untilOffset());
    }
});

Broker Class

Represents Kafka broker host and port information.

/**
 * :: Experimental ::
 * Represents the host and port info for a Kafka broker.
 * Differs from Kafka project's internal kafka.cluster.Broker, which contains a server ID.
 */
@Experimental
final class Broker(
  /** Broker's hostname */
  val host: String,
  /** Broker's port */
  val port: Int
) extends Serializable {
  
  override def equals(obj: Any): Boolean
  override def hashCode: Int
  override def toString(): String
}

Usage Examples:

import org.apache.spark.streaming.kafka.Broker

// Create broker instances
val broker1 = Broker("kafka1.example.com", 9092)
val broker2 = Broker.create("kafka2.example.com", 9093)

// Use with RDD creation for leader optimization
val leaders = Map(
  TopicAndPartition("events", 0) -> broker1,
  TopicAndPartition("events", 1) -> broker2
)

val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, (String, String)](
  sc, kafkaParams, offsetRanges, leaders, messageHandler
)

Advanced Offset Management Patterns

Manual Offset Tracking

Implement custom offset storage for exactly-once processing:

import org.apache.spark.streaming.kafka._

class OffsetManager {
  // Store offsets in external system (database, file, etc.)
  def saveOffsets(offsetRanges: Array[OffsetRange]): Unit = {
    offsetRanges.foreach { range =>
      // Save to external store
      saveOffsetToDatabase(range.topic, range.partition, range.untilOffset)
    }
  }
  
  def getStoredOffsets(topics: Set[String]): Map[TopicAndPartition, Long] = {
    // Load from external store
    loadOffsetsFromDatabase(topics)
  }
}

val offsetManager = new OffsetManager()

// Get stored offsets for restart
val storedOffsets = offsetManager.getStoredOffsets(Set("events"))

// Create stream with stored offsets
val stream = if (storedOffsets.nonEmpty) {
  KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, storedOffsets, messageHandler
  )
} else {
  KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, Set("events")
  )
}

// Process and save offsets
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  
  // Process data
  rdd.foreach(processMessage)
  
  // Save offsets after successful processing
  offsetManager.saveOffsets(offsetRanges)
}

Consumer Group Offset Management

For consumer group coordination, use Kafka's built-in consumer group management with the receiver-based streaming approach, which automatically handles offset management through Zookeeper:

// Receiver-based streaming with automatic consumer group offset management
val kafkaParams = Map[String, String](
  "zookeeper.connect" -> "localhost:2181",
  "group.id" -> "my-consumer-group",
  "auto.commit.interval.ms" -> "1000"
)

val topics = Map("events" -> 1)

val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)

// Offsets are automatically committed to Zookeeper by the consumer group
stream.foreachRDD { rdd =>
  rdd.foreach { case (key, value) =>
    processMessage(key, value)
  }
}

Offset Range Validation

When working with specific offset ranges, implement validation to ensure offsets are within available bounds:

def validateOffsetRanges(offsetRanges: Array[OffsetRange]): Array[OffsetRange] = {
  // Basic validation - ensure from < until
  val validRanges = offsetRanges.filter { range =>
    range.fromOffset >= 0 && range.fromOffset < range.untilOffset
  }
  
  if (validRanges.length != offsetRanges.length) {
    val invalidRanges = offsetRanges.diff(validRanges)
    throw new IllegalArgumentException(s"Invalid offset ranges: ${invalidRanges.mkString(", ")}")
  }
  
  validRanges
}

// Use validation
val requestedRanges = Array(
  OffsetRange("events", 0, 1000, 2000),
  OffsetRange("events", 1, 500, 1500)
)

try {
  val validRanges = validateOffsetRanges(requestedRanges)
  val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
    sc, kafkaParams, validRanges
  )
  rdd.foreach(println)
} catch {
  case e: IllegalArgumentException =>
    println(s"Validation failed: ${e.getMessage}")
}

Error Handling

Offset-related Exceptions

  • SparkException: Thrown when requested offsets are not available on brokers
  • Connectivity Issues: Handle broker unavailability gracefully
  • Metadata Errors: Retry logic for temporary metadata failures
def safeCreateRDD(
    sc: SparkContext, 
    kafkaParams: Map[String, String], 
    offsetRanges: Array[OffsetRange]
): Option[RDD[(String, String)]] = {
  val maxRetries = 3
  var attempt = 0
  
  while (attempt < maxRetries) {
    try {
      val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
        sc, kafkaParams, offsetRanges
      )
      return Some(rdd)
    } catch {
      case e: SparkException =>
        attempt += 1
        println(s"Attempt $attempt failed: ${e.getMessage}")
        if (attempt < maxRetries) {
          Thread.sleep(1000 * attempt) // Exponential backoff
        }
    }
  }
  
  None
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10

docs

batch-rdd.md

direct-streaming.md

index.md

java-api.md

offset-management.md

receiver-streaming.md

tile.json