The batch processing capability provides a batch-oriented interface for consuming from Kafka with precise offset control. This is ideal for exactly-once semantics where you need to specify the exact range of messages to process, making it perfect for reprocessing scenarios and precise data processing workflows.
Creates an RDD for batch consumption from Kafka with specified offset ranges.
def createRDD[K, V](
sc: SparkContext,
kafkaParams: java.util.Map[String, Object],
offsetRanges: Array[OffsetRange],
locationStrategy: LocationStrategy
): RDD[ConsumerRecord[K, V]]Parameters:
sc: SparkContext - The Spark contextkafkaParams: java.util.Map[String, Object] - Kafka configuration parameters (requires "bootstrap.servers")offsetRanges: Array[OffsetRange] - Offset ranges defining the Kafka data for this RDDlocationStrategy: LocationStrategy - Consumer placement strategy (use LocationStrategies.PreferConsistent)Returns: RDD[ConsumerRecord[K, V]] - Kafka RDD implementing HasOffsetRanges
Java version of the batch RDD creation method.
public static <K, V> JavaRDD<ConsumerRecord<K, V>> createRDD(
JavaSparkContext jsc,
java.util.Map<String, Object> kafkaParams,
OffsetRange[] offsetRanges,
LocationStrategy locationStrategy
)Parameters:
jsc: JavaSparkContext - The Java Spark contextkafkaParams: Map[String, Object] - Kafka configuration parametersoffsetRanges: OffsetRange[] - Array of offset rangeslocationStrategy: LocationStrategy - Consumer placement strategyReturns: JavaRDD[ConsumerRecord[K, V]] - Java RDD wrapper for Kafka data
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "batch-processing-group"
)
// Define specific offset ranges to process
val offsetRanges = Array(
OffsetRange("orders", 0, 100, 200), // Process messages 100-199 from orders partition 0
OffsetRange("orders", 1, 50, 150), // Process messages 50-149 from orders partition 1
OffsetRange("payments", 0, 0, 100) // Process messages 0-99 from payments partition 0
)
val rdd = KafkaUtils.createRDD[String, String](
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent
)
// Process the RDD
val processedData = rdd.map { record =>
(record.topic, record.partition, record.offset, record.key, record.value)
}.collect()
processedData.foreach { case (topic, partition, offset, key, value) =>
println(s"Topic: $topic, Partition: $partition, Offset: $offset, Key: $key, Value: $value")
}import org.apache.spark.streaming.kafka010._
// Get offset ranges from a previous streaming batch
val previousOffsetRanges: Array[OffsetRange] = // ... obtained from HasOffsetRanges
// Create RDD to reprocess the same data
val reprocessRDD = KafkaUtils.createRDD[String, String](
sparkContext,
kafkaParams,
previousOffsetRanges,
LocationStrategies.PreferConsistent
)
// Apply different processing logic
val reprocessedResults = reprocessRDD
.filter(record => record.value.contains("error"))
.map(record => s"Reprocessed: ${record.value}")
.collect()import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "batch-processing-group");
OffsetRange[] offsetRanges = {
OffsetRange.create("topic1", 0, 100L, 200L),
OffsetRange.create("topic1", 1, 150L, 250L)
};
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
javaSparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent()
);
// Process the RDD
rdd.foreach(record -> {
System.out.println("Topic: " + record.topic() +
", Partition: " + record.partition() +
", Offset: " + record.offset() +
", Value: " + record.value());
});import org.apache.spark.streaming.kafka010._
val rdd = KafkaUtils.createRDD[String, String](
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent
)
// RDD implements HasOffsetRanges, so you can get offset information
val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
ranges.foreach { range =>
println(s"Topic: ${range.topic}, Partition: ${range.partition}, " +
s"From: ${range.fromOffset}, Until: ${range.untilOffset}, Count: ${range.count}")
}
// Process data and track progress
val processedCount = rdd.count()
val totalMessages = ranges.map(_.count).sum
println(s"Processed $processedCount messages out of $totalMessages total")bootstrap.servers: Kafka broker addresses (required)key.deserializer: Key deserializer class (required)value.deserializer: Value deserializer class (required)group.id: Consumer group ID (recommended for monitoring)security.protocol: Security protocol if using authenticated Kafkasasl.mechanism: SASL mechanism for authenticationThe createRDD method automatically sets several parameters for executor safety:
enable.auto.commit is set to falseauto.offset.reset is set to nonegroup.id is modified to be executor-specificreceive.buffer.config is set to 65536 (KAFKA-3135 workaround)@Experimental in Spark 2.4.8HasOffsetRanges interface for offset introspectionLocationStrategies.PreferConsistent unless you have specific host preferencesLocationStrategies.PreferBrokers with RDD creation (no driver consumer available)