CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10

A shaded JAR assembly containing all dependencies needed for Kafka integration with Spark Streaming

Pending
Overview
Eval results
Files

batch-rdd.mddocs/

Batch RDD Processing

Batch RDD processing allows you to create RDDs from Kafka using specific offset ranges, enabling batch-oriented consumption of Kafka data with exact control over which messages to process.

Capabilities

Basic RDD Creation

Creates an RDD from Kafka using offset ranges for each topic and partition.

/**
 * Create a RDD from Kafka using offset ranges for each topic and partition.
 *
 * @param sc SparkContext object
 * @param kafkaParams Kafka configuration parameters. Requires "metadata.broker.list" 
 *                    or "bootstrap.servers" to be set with Kafka broker(s) specified in
 *                    host1:port1,host2:port2 form.
 * @param offsetRanges Each OffsetRange in the batch corresponds to a range of offsets 
 *                     for a given Kafka topic/partition
 * @tparam K type of Kafka message key
 * @tparam V type of Kafka message value
 * @tparam KD type of Kafka message key decoder
 * @tparam VD type of Kafka message value decoder
 * @return RDD of (Kafka message key, Kafka message value)
 */
def createRDD[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](
  sc: SparkContext,
  kafkaParams: Map[String, String],
  offsetRanges: Array[OffsetRange]  
): RDD[(K, V)]

Usage Example:

import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder

val kafkaParams = Map[String, String](
  "metadata.broker.list" -> "localhost:9092"
)

val offsetRanges = Array(
  OffsetRange("events", 0, 1000, 2000),
  OffsetRange("events", 1, 500, 1500),
  OffsetRange("logs", 0, 100, 200)
)

val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
  sparkContext, kafkaParams, offsetRanges
)

rdd.foreach { case (key, value) =>
  println(s"Key: $key, Value: $value")
}

println(s"Total messages: ${rdd.count()}")

Advanced RDD Creation with Custom Message Handler

Creates an RDD with custom message handler and broker leadership information for optimized fetching.

/**
 * Create a RDD from Kafka with custom message handler and broker leadership info.
 *
 * @param sc SparkContext object
 * @param kafkaParams Kafka configuration parameters
 * @param offsetRanges Each OffsetRange corresponds to a range of offsets for a topic/partition
 * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be empty map,
 *                in which case leaders will be looked up on the driver.
 * @param messageHandler Function for translating each message and metadata into desired type
 * @tparam K type of Kafka message key
 * @tparam V type of Kafka message value  
 * @tparam KD type of Kafka message key decoder
 * @tparam VD type of Kafka message value decoder
 * @tparam R type returned by messageHandler
 * @return RDD of R
 */
def createRDD[
  K: ClassTag,
  V: ClassTag, 
  KD <: Decoder[K]: ClassTag,
  VD <: Decoder[V]: ClassTag,
  R: ClassTag
](
  sc: SparkContext,
  kafkaParams: Map[String, String],
  offsetRanges: Array[OffsetRange],
  leaders: Map[TopicAndPartition, Broker],
  messageHandler: MessageAndMetadata[K, V] => R
): RDD[R]

Usage Example:

import kafka.common.TopicAndPartition

val leaders = Map(
  TopicAndPartition("events", 0) -> Broker("broker1.example.com", 9092),
  TopicAndPartition("events", 1) -> Broker("broker2.example.com", 9092)
)

val messageHandler = (mmd: MessageAndMetadata[String, String]) => {
  MessageInfo(
    topic = mmd.topic,
    partition = mmd.partition,
    offset = mmd.offset,
    timestamp = System.currentTimeMillis(),
    data = s"${mmd.key()}-${mmd.message()}"
  )
}

case class MessageInfo(topic: String, partition: Int, offset: Long, timestamp: Long, data: String)

val enrichedRDD = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, MessageInfo](
  sparkContext, kafkaParams, offsetRanges, leaders, messageHandler
)

enrichedRDD.foreach(println)

Java RDD Creation API

Java-friendly API for creating RDDs from Kafka.

/**
 * Create a RDD from Kafka using offset ranges (Java API).
 *
 * @param jsc JavaSparkContext object
 * @param kafkaParams Kafka configuration parameters
 * @param offsetRanges Each OffsetRange corresponds to a range of offsets for a topic/partition
 * @param keyClass type of Kafka message key
 * @param valueClass type of Kafka message value
 * @param keyDecoderClass type of Kafka message key decoder
 * @param valueDecoderClass type of Kafka message value decoder
 * @tparam K type of Kafka message key
 * @tparam V type of Kafka message value
 * @tparam KD type of Kafka message key decoder
 * @tparam VD type of Kafka message value decoder
 * @return RDD of (Kafka message key, Kafka message value)
 */
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>> 
JavaPairRDD<K, V> createRDD(
  JavaSparkContext jsc,
  Class<K> keyClass,
  Class<V> valueClass,
  Class<KD> keyDecoderClass,
  Class<VD> valueDecoderClass,
  Map<String, String> kafkaParams,
  OffsetRange[] offsetRanges
)

Java Usage Example:

import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import kafka.serializer.StringDecoder;

Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");

OffsetRange[] offsetRanges = {
    OffsetRange.create("events", 0, 1000, 2000),
    OffsetRange.create("events", 1, 500, 1500)
};

JavaPairRDD<String, String> rdd = KafkaUtils.createRDD(
    jsc,
    String.class,
    String.class, 
    StringDecoder.class,
    StringDecoder.class,
    kafkaParams,
    offsetRanges
);

rdd.foreach(record -> {
    System.out.println("Key: " + record._1 + ", Value: " + record._2);
});

System.out.println("Total messages: " + rdd.count());

Advanced Java RDD with Custom Message Handler

Java API with custom message handler for complex transformations.

/**
 * Create a RDD from Kafka with custom message handler (Java API).
 *
 * @param jsc JavaSparkContext object
 * @param kafkaParams Kafka configuration parameters
 * @param offsetRanges Each OffsetRange corresponds to a range of offsets
 * @param leaders Kafka brokers for each TopicAndPartition
 * @param messageHandler Function for translating each message and metadata
 * @param keyClass type of Kafka message key
 * @param valueClass type of Kafka message value
 * @param keyDecoderClass type of Kafka message key decoder  
 * @param valueDecoderClass type of Kafka message value decoder
 * @param recordClass type returned by messageHandler
 * @tparam K type of Kafka message key
 * @tparam V type of Kafka message value
 * @tparam KD type of Kafka message key decoder
 * @tparam VD type of Kafka message value decoder
 * @tparam R type returned by messageHandler
 * @return RDD of R
 */
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>, R> 
JavaRDD<R> createRDD(
  JavaSparkContext jsc,
  Class<K> keyClass,
  Class<V> valueClass,
  Class<KD> keyDecoderClass,
  Class<VD> valueDecoderClass,
  Class<R> recordClass,
  Map<String, String> kafkaParams,
  OffsetRange[] offsetRanges,
  Map<TopicAndPartition, Broker> leaders,
  Function<MessageAndMetadata<K, V>, R> messageHandler
)

Key Features

Exact Offset Control

  • Specify precise offset ranges for each topic/partition
  • Process historical data or specific time windows
  • Replay data for debugging or reprocessing

Broker Leadership Optimization

  • Provide known broker leaders to avoid metadata lookup
  • Optimize network topology for better performance
  • Handle broker failures gracefully

Message Metadata Access

Access complete message metadata including:

  • Topic name
  • Partition number
  • Offset within partition
  • Message timestamp (if available)
  • Message key and value

Partition-Level Parallelism

  • Each OffsetRange becomes an RDD partition
  • Parallel processing across topic/partitions
  • Fine-grained control over parallelism

Offset Range Management

Creating Offset Ranges

// Manual creation
val range1 = OffsetRange("events", 0, 1000, 2000)
val range2 = OffsetRange.create("logs", 1, 500, 1500)

// From TopicAndPartition
import kafka.common.TopicAndPartition
val tp = TopicAndPartition("events", 0)
val range3 = OffsetRange(tp, 1000, 2000)

Offset Range Properties

val range = OffsetRange("events", 0, 1000, 2000)

println(s"Topic: ${range.topic}")
println(s"Partition: ${range.partition}")
println(s"From offset: ${range.fromOffset}")
println(s"Until offset: ${range.untilOffset}")
println(s"Message count: ${range.count}")
println(s"TopicAndPartition: ${range.topicAndPartition}")

Use Cases

Historical Data Processing

// Process specific time window
val historicalRanges = Array(
  OffsetRange("sales", 0, 10000, 20000),
  OffsetRange("sales", 1, 15000, 25000)
)

val historicalData = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
  sc, kafkaParams, historicalRanges
)

Data Quality Validation

// Validate specific message ranges
val validationHandler = (mmd: MessageAndMetadata[String, String]) => {
  ValidationResult(
    offset = mmd.offset,
    isValid = mmd.message().nonEmpty && mmd.key().nonEmpty,
    data = mmd.message()
  )
}

case class ValidationResult(offset: Long, isValid: Boolean, data: String)

Incremental Processing

// Process data incrementally
def processIncremental(lastProcessedOffsets: Map[TopicAndPartition, Long]): Unit = {
  val kc = new KafkaCluster(kafkaParams)
  val latestOffsets = kc.getLatestLeaderOffsets(lastProcessedOffsets.keySet)
  
  val offsetRanges = lastProcessedOffsets.map { case (tp, fromOffset) =>
    val untilOffset = latestOffsets.right.get(tp).offset
    OffsetRange(tp.topic, tp.partition, fromOffset, untilOffset)
  }.toArray
  
  val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
    sc, kafkaParams, offsetRanges
  )
  
  // Process the RDD
  rdd.foreach(println)
}

Error Handling

  • SparkException: Thrown for invalid offset ranges or connectivity issues
  • Offset validation: Automatic validation that offsets are available on brokers
  • Leader discovery: Automatic leader lookup when not provided
  • Partition failures: Individual partition failures don't affect other partitions

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10

docs

batch-rdd.md

direct-streaming.md

index.md

java-api.md

offset-management.md

receiver-streaming.md

tile.json