A shaded JAR assembly containing all dependencies needed for Kafka integration with Spark Streaming
—
Batch RDD processing allows you to create RDDs from Kafka using specific offset ranges, enabling batch-oriented consumption of Kafka data with exact control over which messages to process.
Creates an RDD from Kafka using offset ranges for each topic and partition.
/**
* Create a RDD from Kafka using offset ranges for each topic and partition.
*
* @param sc SparkContext object
* @param kafkaParams Kafka configuration parameters. Requires "metadata.broker.list"
* or "bootstrap.servers" to be set with Kafka broker(s) specified in
* host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a range of offsets
* for a given Kafka topic/partition
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @return RDD of (Kafka message key, Kafka message value)
*/
def createRDD[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange]
): RDD[(K, V)]Usage Example:
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "localhost:9092"
)
val offsetRanges = Array(
OffsetRange("events", 0, 1000, 2000),
OffsetRange("events", 1, 500, 1500),
OffsetRange("logs", 0, 100, 200)
)
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
sparkContext, kafkaParams, offsetRanges
)
rdd.foreach { case (key, value) =>
println(s"Key: $key, Value: $value")
}
println(s"Total messages: ${rdd.count()}")Creates an RDD with custom message handler and broker leadership information for optimized fetching.
/**
* Create a RDD from Kafka with custom message handler and broker leadership info.
*
* @param sc SparkContext object
* @param kafkaParams Kafka configuration parameters
* @param offsetRanges Each OffsetRange corresponds to a range of offsets for a topic/partition
* @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be empty map,
* in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into desired type
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @tparam R type returned by messageHandler
* @return RDD of R
*/
def createRDD[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag,
R: ClassTag
](
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange],
leaders: Map[TopicAndPartition, Broker],
messageHandler: MessageAndMetadata[K, V] => R
): RDD[R]Usage Example:
import kafka.common.TopicAndPartition
val leaders = Map(
TopicAndPartition("events", 0) -> Broker("broker1.example.com", 9092),
TopicAndPartition("events", 1) -> Broker("broker2.example.com", 9092)
)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => {
MessageInfo(
topic = mmd.topic,
partition = mmd.partition,
offset = mmd.offset,
timestamp = System.currentTimeMillis(),
data = s"${mmd.key()}-${mmd.message()}"
)
}
case class MessageInfo(topic: String, partition: Int, offset: Long, timestamp: Long, data: String)
val enrichedRDD = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, MessageInfo](
sparkContext, kafkaParams, offsetRanges, leaders, messageHandler
)
enrichedRDD.foreach(println)Java-friendly API for creating RDDs from Kafka.
/**
* Create a RDD from Kafka using offset ranges (Java API).
*
* @param jsc JavaSparkContext object
* @param kafkaParams Kafka configuration parameters
* @param offsetRanges Each OffsetRange corresponds to a range of offsets for a topic/partition
* @param keyClass type of Kafka message key
* @param valueClass type of Kafka message value
* @param keyDecoderClass type of Kafka message key decoder
* @param valueDecoderClass type of Kafka message value decoder
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @return RDD of (Kafka message key, Kafka message value)
*/
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>
JavaPairRDD<K, V> createRDD(
JavaSparkContext jsc,
Class<K> keyClass,
Class<V> valueClass,
Class<KD> keyDecoderClass,
Class<VD> valueDecoderClass,
Map<String, String> kafkaParams,
OffsetRange[] offsetRanges
)Java Usage Example:
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import kafka.serializer.StringDecoder;
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
OffsetRange[] offsetRanges = {
OffsetRange.create("events", 0, 1000, 2000),
OffsetRange.create("events", 1, 500, 1500)
};
JavaPairRDD<String, String> rdd = KafkaUtils.createRDD(
jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
offsetRanges
);
rdd.foreach(record -> {
System.out.println("Key: " + record._1 + ", Value: " + record._2);
});
System.out.println("Total messages: " + rdd.count());Java API with custom message handler for complex transformations.
/**
* Create a RDD from Kafka with custom message handler (Java API).
*
* @param jsc JavaSparkContext object
* @param kafkaParams Kafka configuration parameters
* @param offsetRanges Each OffsetRange corresponds to a range of offsets
* @param leaders Kafka brokers for each TopicAndPartition
* @param messageHandler Function for translating each message and metadata
* @param keyClass type of Kafka message key
* @param valueClass type of Kafka message value
* @param keyDecoderClass type of Kafka message key decoder
* @param valueDecoderClass type of Kafka message value decoder
* @param recordClass type returned by messageHandler
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @tparam R type returned by messageHandler
* @return RDD of R
*/
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>, R>
JavaRDD<R> createRDD(
JavaSparkContext jsc,
Class<K> keyClass,
Class<V> valueClass,
Class<KD> keyDecoderClass,
Class<VD> valueDecoderClass,
Class<R> recordClass,
Map<String, String> kafkaParams,
OffsetRange[] offsetRanges,
Map<TopicAndPartition, Broker> leaders,
Function<MessageAndMetadata<K, V>, R> messageHandler
)Access complete message metadata including:
// Manual creation
val range1 = OffsetRange("events", 0, 1000, 2000)
val range2 = OffsetRange.create("logs", 1, 500, 1500)
// From TopicAndPartition
import kafka.common.TopicAndPartition
val tp = TopicAndPartition("events", 0)
val range3 = OffsetRange(tp, 1000, 2000)val range = OffsetRange("events", 0, 1000, 2000)
println(s"Topic: ${range.topic}")
println(s"Partition: ${range.partition}")
println(s"From offset: ${range.fromOffset}")
println(s"Until offset: ${range.untilOffset}")
println(s"Message count: ${range.count}")
println(s"TopicAndPartition: ${range.topicAndPartition}")// Process specific time window
val historicalRanges = Array(
OffsetRange("sales", 0, 10000, 20000),
OffsetRange("sales", 1, 15000, 25000)
)
val historicalData = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
sc, kafkaParams, historicalRanges
)// Validate specific message ranges
val validationHandler = (mmd: MessageAndMetadata[String, String]) => {
ValidationResult(
offset = mmd.offset,
isValid = mmd.message().nonEmpty && mmd.key().nonEmpty,
data = mmd.message()
)
}
case class ValidationResult(offset: Long, isValid: Boolean, data: String)// Process data incrementally
def processIncremental(lastProcessedOffsets: Map[TopicAndPartition, Long]): Unit = {
val kc = new KafkaCluster(kafkaParams)
val latestOffsets = kc.getLatestLeaderOffsets(lastProcessedOffsets.keySet)
val offsetRanges = lastProcessedOffsets.map { case (tp, fromOffset) =>
val untilOffset = latestOffsets.right.get(tp).offset
OffsetRange(tp.topic, tp.partition, fromOffset, untilOffset)
}.toArray
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
sc, kafkaParams, offsetRanges
)
// Process the RDD
rdd.foreach(println)
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10