A shaded JAR assembly containing all dependencies needed for Kafka integration with Spark Streaming
—
Comprehensive offset management utilities for controlling exactly which Kafka messages to process, including offset range representation, cluster interaction helpers, and consumer group coordination.
Represents a range of offsets from a single Kafka topic and partition.
/**
* Represents a range of offsets from a single Kafka TopicAndPartition.
*
* @param topic Kafka topic name
* @param partition Kafka partition id
* @param fromOffset Inclusive starting offset
* @param untilOffset Exclusive ending offset
*/
final class OffsetRange(
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long
) extends Serializable {
/** Kafka TopicAndPartition object, for convenience */
def topicAndPartition(): TopicAndPartition
/** Number of messages this OffsetRange refers to */
def count(): Long
override def equals(obj: Any): Boolean
override def hashCode(): Int
override def toString(): String
}Usage Examples:
import org.apache.spark.streaming.kafka.OffsetRange
import kafka.common.TopicAndPartition
// Create offset ranges
val range1 = OffsetRange("events", 0, 1000, 2000)
val range2 = OffsetRange.create("logs", 1, 500, 1500)
// From TopicAndPartition
val tp = TopicAndPartition("metrics", 2)
val range3 = OffsetRange(tp, 1000, 1500)
// Access properties
println(s"Topic: ${range1.topic}")
println(s"Partition: ${range1.partition}")
println(s"Message count: ${range1.count()}")
println(s"TopicAndPartition: ${range1.topicAndPartition()}")Factory methods for creating OffsetRange instances.
object OffsetRange {
/** Create OffsetRange from topic, partition, and offset values */
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
/** Create OffsetRange from TopicAndPartition and offset values */
def create(topicAndPartition: TopicAndPartition, fromOffset: Long, untilOffset: Long): OffsetRange
/** Apply method for creating OffsetRange instances */
def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
/** Apply method with TopicAndPartition */
def apply(topicAndPartition: TopicAndPartition, fromOffset: Long, untilOffset: Long): OffsetRange
}Java Usage:
import org.apache.spark.streaming.kafka.OffsetRange;
import kafka.common.TopicAndPartition;
// Create offset ranges in Java
OffsetRange range1 = OffsetRange.create("events", 0, 1000, 2000);
OffsetRange range2 = OffsetRange.create(new TopicAndPartition("logs", 1), 500, 1500);
// Access properties
System.out.println("Topic: " + range1.topic());
System.out.println("Partition: " + range1.partition());
System.out.println("Count: " + range1.count());Interface for objects that contain offset ranges, typically implemented by Kafka RDDs.
/**
* Represents any object that has a collection of OffsetRanges.
* This can be used to access the offset ranges in RDDs generated by direct Kafka DStream.
*/
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}Usage Examples:
// Access offset ranges from direct stream RDDs
directStream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach { offsetRange =>
println(s"${offsetRange.topic} ${offsetRange.partition} " +
s"${offsetRange.fromOffset} ${offsetRange.untilOffset}")
}
// Process the data
rdd.foreach(println)
}Java Usage:
import org.apache.spark.streaming.kafka.HasOffsetRanges;
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
for (OffsetRange range : offsetRanges) {
System.out.printf("%s %d %d %d%n",
range.topic(), range.partition(),
range.fromOffset(), range.untilOffset());
}
});Represents Kafka broker host and port information.
/**
* :: Experimental ::
* Represents the host and port info for a Kafka broker.
* Differs from Kafka project's internal kafka.cluster.Broker, which contains a server ID.
*/
@Experimental
final class Broker(
/** Broker's hostname */
val host: String,
/** Broker's port */
val port: Int
) extends Serializable {
override def equals(obj: Any): Boolean
override def hashCode: Int
override def toString(): String
}Usage Examples:
import org.apache.spark.streaming.kafka.Broker
// Create broker instances
val broker1 = Broker("kafka1.example.com", 9092)
val broker2 = Broker.create("kafka2.example.com", 9093)
// Use with RDD creation for leader optimization
val leaders = Map(
TopicAndPartition("events", 0) -> broker1,
TopicAndPartition("events", 1) -> broker2
)
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, (String, String)](
sc, kafkaParams, offsetRanges, leaders, messageHandler
)Implement custom offset storage for exactly-once processing:
import org.apache.spark.streaming.kafka._
class OffsetManager {
// Store offsets in external system (database, file, etc.)
def saveOffsets(offsetRanges: Array[OffsetRange]): Unit = {
offsetRanges.foreach { range =>
// Save to external store
saveOffsetToDatabase(range.topic, range.partition, range.untilOffset)
}
}
def getStoredOffsets(topics: Set[String]): Map[TopicAndPartition, Long] = {
// Load from external store
loadOffsetsFromDatabase(topics)
}
}
val offsetManager = new OffsetManager()
// Get stored offsets for restart
val storedOffsets = offsetManager.getStoredOffsets(Set("events"))
// Create stream with stored offsets
val stream = if (storedOffsets.nonEmpty) {
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, storedOffsets, messageHandler
)
} else {
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Set("events")
)
}
// Process and save offsets
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Process data
rdd.foreach(processMessage)
// Save offsets after successful processing
offsetManager.saveOffsets(offsetRanges)
}For consumer group coordination, use Kafka's built-in consumer group management with the receiver-based streaming approach, which automatically handles offset management through Zookeeper:
// Receiver-based streaming with automatic consumer group offset management
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "my-consumer-group",
"auto.commit.interval.ms" -> "1000"
)
val topics = Map("events" -> 1)
val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
// Offsets are automatically committed to Zookeeper by the consumer group
stream.foreachRDD { rdd =>
rdd.foreach { case (key, value) =>
processMessage(key, value)
}
}When working with specific offset ranges, implement validation to ensure offsets are within available bounds:
def validateOffsetRanges(offsetRanges: Array[OffsetRange]): Array[OffsetRange] = {
// Basic validation - ensure from < until
val validRanges = offsetRanges.filter { range =>
range.fromOffset >= 0 && range.fromOffset < range.untilOffset
}
if (validRanges.length != offsetRanges.length) {
val invalidRanges = offsetRanges.diff(validRanges)
throw new IllegalArgumentException(s"Invalid offset ranges: ${invalidRanges.mkString(", ")}")
}
validRanges
}
// Use validation
val requestedRanges = Array(
OffsetRange("events", 0, 1000, 2000),
OffsetRange("events", 1, 500, 1500)
)
try {
val validRanges = validateOffsetRanges(requestedRanges)
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
sc, kafkaParams, validRanges
)
rdd.foreach(println)
} catch {
case e: IllegalArgumentException =>
println(s"Validation failed: ${e.getMessage}")
}def safeCreateRDD(
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange]
): Option[RDD[(String, String)]] = {
val maxRetries = 3
var attempt = 0
while (attempt < maxRetries) {
try {
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
sc, kafkaParams, offsetRanges
)
return Some(rdd)
} catch {
case e: SparkException =>
attempt += 1
println(s"Attempt $attempt failed: ${e.getMessage}")
if (attempt < maxRetries) {
Thread.sleep(1000 * attempt) // Exponential backoff
}
}
}
None
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10