or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconsumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdstream-creation.md
tile.json

batch-processing.mddocs/

Batch Processing

The batch processing capability provides a batch-oriented interface for consuming from Kafka with precise offset control. This is ideal for exactly-once semantics where you need to specify the exact range of messages to process, making it perfect for reprocessing scenarios and precise data processing workflows.

Core Functions

createRDD (Scala)

Creates an RDD for batch consumption from Kafka with specified offset ranges.

def createRDD[K, V](
  sc: SparkContext,
  kafkaParams: java.util.Map[String, Object],
  offsetRanges: Array[OffsetRange],
  locationStrategy: LocationStrategy
): RDD[ConsumerRecord[K, V]]

Parameters:

  • sc: SparkContext - The Spark context
  • kafkaParams: java.util.Map[String, Object] - Kafka configuration parameters (requires "bootstrap.servers")
  • offsetRanges: Array[OffsetRange] - Offset ranges defining the Kafka data for this RDD
  • locationStrategy: LocationStrategy - Consumer placement strategy (use LocationStrategies.PreferConsistent)

Returns: RDD[ConsumerRecord[K, V]] - Kafka RDD implementing HasOffsetRanges

createRDD (Java)

Java version of the batch RDD creation method.

public static <K, V> JavaRDD<ConsumerRecord<K, V>> createRDD(
  JavaSparkContext jsc,
  java.util.Map<String, Object> kafkaParams,
  OffsetRange[] offsetRanges,
  LocationStrategy locationStrategy
)

Parameters:

  • jsc: JavaSparkContext - The Java Spark context
  • kafkaParams: Map[String, Object] - Kafka configuration parameters
  • offsetRanges: OffsetRange[] - Array of offset ranges
  • locationStrategy: LocationStrategy - Consumer placement strategy

Returns: JavaRDD[ConsumerRecord[K, V]] - Java RDD wrapper for Kafka data

Usage Examples

Basic Batch Processing

import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "batch-processing-group"
)

// Define specific offset ranges to process
val offsetRanges = Array(
  OffsetRange("orders", 0, 100, 200),      // Process messages 100-199 from orders partition 0
  OffsetRange("orders", 1, 50, 150),       // Process messages 50-149 from orders partition 1
  OffsetRange("payments", 0, 0, 100)       // Process messages 0-99 from payments partition 0
)

val rdd = KafkaUtils.createRDD[String, String](
  sparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent
)

// Process the RDD
val processedData = rdd.map { record =>
  (record.topic, record.partition, record.offset, record.key, record.value)
}.collect()

processedData.foreach { case (topic, partition, offset, key, value) =>
  println(s"Topic: $topic, Partition: $partition, Offset: $offset, Key: $key, Value: $value")
}

Reprocessing with Offset Ranges

import org.apache.spark.streaming.kafka010._

// Get offset ranges from a previous streaming batch
val previousOffsetRanges: Array[OffsetRange] = // ... obtained from HasOffsetRanges

// Create RDD to reprocess the same data
val reprocessRDD = KafkaUtils.createRDD[String, String](
  sparkContext,
  kafkaParams,
  previousOffsetRanges,
  LocationStrategies.PreferConsistent
)

// Apply different processing logic
val reprocessedResults = reprocessRDD
  .filter(record => record.value.contains("error"))
  .map(record => s"Reprocessed: ${record.value}")
  .collect()

Java Batch Processing

import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "batch-processing-group");

OffsetRange[] offsetRanges = {
  OffsetRange.create("topic1", 0, 100L, 200L),
  OffsetRange.create("topic1", 1, 150L, 250L)
};

JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
  javaSparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent()
);

// Process the RDD
rdd.foreach(record -> {
  System.out.println("Topic: " + record.topic() + 
                    ", Partition: " + record.partition() + 
                    ", Offset: " + record.offset() + 
                    ", Value: " + record.value());
});

Working with HasOffsetRanges

import org.apache.spark.streaming.kafka010._

val rdd = KafkaUtils.createRDD[String, String](
  sparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent
)

// RDD implements HasOffsetRanges, so you can get offset information
val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

ranges.foreach { range =>
  println(s"Topic: ${range.topic}, Partition: ${range.partition}, " +
          s"From: ${range.fromOffset}, Until: ${range.untilOffset}, Count: ${range.count}")
}

// Process data and track progress
val processedCount = rdd.count()
val totalMessages = ranges.map(_.count).sum
println(s"Processed $processedCount messages out of $totalMessages total")

Configuration Notes

Required Kafka Parameters

  • bootstrap.servers: Kafka broker addresses (required)
  • key.deserializer: Key deserializer class (required)
  • value.deserializer: Value deserializer class (required)

Optional Kafka Parameters

  • group.id: Consumer group ID (recommended for monitoring)
  • security.protocol: Security protocol if using authenticated Kafka
  • sasl.mechanism: SASL mechanism for authentication

Automatic Parameter Handling

The createRDD method automatically sets several parameters for executor safety:

  • enable.auto.commit is set to false
  • auto.offset.reset is set to none
  • group.id is modified to be executor-specific
  • receive.buffer.config is set to 65536 (KAFKA-3135 workaround)

Important Notes

  • All batch processing methods are marked as @Experimental in Spark 2.4.8
  • Starting and ending offsets are specified in advance for exactly-once semantics
  • The RDD implements HasOffsetRanges interface for offset introspection
  • Each offset range corresponds to a single RDD partition
  • Consumer instances are managed automatically and cached for performance
  • Use LocationStrategies.PreferConsistent unless you have specific host preferences
  • Cannot use LocationStrategies.PreferBrokers with RDD creation (no driver consumer available)