The Kafka connector provides efficient batch reading capabilities for processing historical Kafka data with offset range optimization and partition-aware processing.
V2 DataSource batch implementation for reading Kafka data efficiently.
/**
* V2 DataSource batch implementation for reading Kafka data
* Provides efficient batch processing with partition optimization
*/
class KafkaBatch extends Batch {
/** Plans input partitions for parallel processing */
def planInputPartitions(): Array[InputPartition]
/** Creates reader factory for partition processing */
def createReaderFactory(): PartitionReaderFactory
}Usage Examples:
// Basic batch reading
val kafkaBatch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "historical-data")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Process batch data
val processedBatch = kafkaBatch
.select(
col("topic"),
col("partition"),
col("offset"),
col("timestamp"),
expr("CAST(value AS STRING)").as("message")
)
.groupBy("topic", "partition")
.agg(
count("*").as("message_count"),
min("offset").as("min_offset"),
max("offset").as("max_offset")
)V1 DataSource relation for batch reading with TableScan support.
/**
* V1 DataSource relation for batch reading from Kafka
* Provides backward compatibility with DataSource V1 API
*/
class KafkaRelation extends BaseRelation with TableScan {
/** Returns the schema for Kafka records */
def schema: StructType
/** Builds RDD for scanning all data */
def buildScan(): RDD[Row]
/** Returns SQL context */
def sqlContext: SQLContext
}Represents a single input partition for batch processing.
/**
* Input partition for batch Kafka reading
* Represents a range of offsets to be processed by a single task
*/
case class KafkaBatchInputPartition(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean,
includeHeaders: Boolean
) extends InputPartition {
/** Returns preferred locations for this partition (empty for Kafka) */
def preferredLocations(): Array[String] = Array.empty
}Defines the range of offsets to be processed by a partition reader.
/**
* Represents a range of offsets for a specific topic partition
* Used for planning batch processing tasks
*/
case class KafkaOffsetRange(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String]
) {
/** Topic name */
def topic: String = topicPartition.topic()
/** Partition number */
def partition: Int = topicPartition.partition()
/** Estimated number of messages in this range */
def size: Long = untilOffset - fromOffset
}Factory for creating partition readers.
/**
* Factory for creating Kafka batch partition readers
* Creates readers that can process KafkaBatchInputPartition instances
*/
object KafkaBatchReaderFactory extends PartitionReaderFactory {
/** Creates a partition reader for the given input partition */
def createReader(partition: InputPartition): PartitionReader[InternalRow]
}Reads data from a specific Kafka partition range.
/**
* Partition reader for batch Kafka data processing
* Reads a specific range of offsets from a Kafka partition
*/
case class KafkaBatchPartitionReader(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean,
includeHeaders: Boolean
) extends PartitionReader[InternalRow] {
/** Advances to next record */
def next(): Boolean
/** Gets current record as UnsafeRow */
def get(): UnsafeRow
/** Closes reader and releases resources */
def close(): Unit
/** Returns current metrics values */
def currentMetricsValues(): Array[CustomTaskMetric]
}// Read data from specific time range
val timeRangeBatch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingTimestamp", "1609459200000") // Jan 1, 2021 00:00:00 UTC
.option("endingTimestamp", "1609545600000") // Jan 2, 2021 00:00:00 UTC
.load()// Read specific offset ranges per partition
val specificOffsetsBatch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", """{"events":{"0":1000,"1":2000,"2":1500}}""")
.option("endingOffsets", """{"events":{"0":5000,"1":6000,"2":4500}}""")
.load()// Mix earliest/latest with specific offsets
val mixedOffsetsBatch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", """{"events":{"0":-2,"1":1000,"2":-2}}""") // -2 = earliest
.option("endingOffsets", """{"events":{"0":-1,"1":2000,"2":-1}}""") // -1 = latest
.load()Control the minimum number of Spark partitions for processing:
// Ensure sufficient parallelism for batch processing
val parallelBatch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "large-topic")
.option("minPartitions", "50") // Create at least 50 Spark partitions
.load()Internal calculator for optimizing offset ranges based on partition configuration.
/**
* Calculates optimal offset ranges for processing
* Splits large partition ranges to improve parallelism
*/
class KafkaOffsetRangeCalculator(minPartitions: Option[Int]) {
/**
* Calculates offset ranges ensuring minimum partition count
* Splits large ranges across multiple Spark partitions if needed
*/
def getRanges(
fromOffsets: Seq[KafkaOffsetRange],
executorLocations: Seq[String]
): Seq[KafkaOffsetRange]
}
object KafkaOffsetRangeCalculator {
/** Creates calculator from options */
def apply(options: CaseInsensitiveStringMap): KafkaOffsetRangeCalculator
}Range Splitting Logic:
// If a Kafka partition has 100,000 messages and minPartitions=10:
// Original: [topic-0: 0 -> 100000]
// Split into:
// [topic-0: 0 -> 10000]
// [topic-0: 10000 -> 20000]
// [topic-0: 20000 -> 30000]
// ... (10 total ranges)// Optimize Kafka consumer for batch processing
val optimizedBatch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "large-dataset")
.option("kafka.fetch.min.bytes", "1048576") // 1MB minimum fetch
.option("kafka.fetch.max.wait.ms", "500") // 500ms max wait
.option("kafka.max.poll.records", "50000") // Large batch sizes
.option("kafka.receive.buffer.bytes", "1048576") // 1MB receive buffer
.load()// Maximize parallelism for large datasets
val highParallelismBatch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "big-data-topic")
.option("minPartitions", "200") // Force high parallelism
.load()
.repartition(400) // Further increase parallelism for processing// Cache frequently accessed batch data
val cachedBatch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "reference-data")
.load()
.select(
col("topic"),
expr("CAST(key AS STRING)").as("key"),
expr("CAST(value AS STRING)").as("value"),
col("timestamp")
)
.cache() // Cache in memory for multiple operations
// Use cached data multiple times
val summary = cachedBatch.groupBy("topic").count()
val sample = cachedBatch.sample(0.1)// Analyze data by time windows
val timeWindowAnalysis = kafkaBatch
.select(
col("topic"),
col("timestamp"),
expr("CAST(value AS STRING)").as("message")
)
.withColumn("hour", hour(col("timestamp")))
.withColumn("day", date(col("timestamp")))
.groupBy("topic", "day", "hour")
.agg(
count("*").as("message_count"),
countDistinct("message").as("unique_messages")
)
.orderBy("topic", "day", "hour")// Analyze data across multiple topics
val crossTopicBatch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events,logs,metrics")
.load()
val topicComparison = crossTopicBatch
.groupBy("topic", date(col("timestamp")).as("date"))
.agg(
count("*").as("message_count"),
avg(length(col("value"))).as("avg_message_size"),
min("timestamp").as("first_message"),
max("timestamp").as("last_message")
)// Analyze data quality across partitions
val qualityAnalysis = kafkaBatch
.select(
col("topic"),
col("partition"),
col("offset"),
col("timestamp"),
when(col("key").isNull, 1).otherwise(0).as("null_key"),
when(col("value").isNull, 1).otherwise(0).as("null_value"),
length(col("value")).as("value_size")
)
.groupBy("topic", "partition")
.agg(
count("*").as("total_messages"),
sum("null_key").as("null_keys"),
sum("null_value").as("null_values"),
avg("value_size").as("avg_value_size"),
min("offset").as("min_offset"),
max("offset").as("max_offset")
)// Enable data loss detection for critical batch processing
val strictBatch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "critical-data")
.option("failOnDataLoss", "true") // Fail if data is missing
.load()// Configure timeouts for reliable batch processing
val timeoutConfiguredBatch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("kafkaConsumer.pollTimeoutMs", "30000") // 30 second poll timeout
.option("fetchOffset.numRetries", "10") // 10 retry attempts
.option("fetchOffset.retryIntervalMs", "5000") // 5 second retry interval
.load()// Process data even if some partitions are unavailable
val resilientBatch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("failOnDataLoss", "false") // Continue despite missing data
.load()
.filter(col("value").isNotNull) // Filter out null values// Monitor batch processing metrics
val batchJob = kafkaBatch.count() // Trigger computation
// Access metrics through Spark UI or programmatically
val executorMetrics = spark.sparkContext.statusTracker.getExecutorInfos
executorMetrics.foreach { executor =>
println(s"Executor ${executor.executorId}: ${executor.totalCores} cores")
}// Collect custom metrics during batch processing
val metricsCollector = kafkaBatch
.select(
col("topic"),
col("partition"),
col("timestamp"),
length(col("value")).as("message_size")
)
.groupBy("topic", "partition")
.agg(
count("*").as("message_count"),
sum("message_size").as("total_bytes"),
avg("message_size").as("avg_message_size"),
min("timestamp").as("earliest_timestamp"),
max("timestamp").as("latest_timestamp")
)
// Write metrics for monitoring
metricsCollector
.write
.format("delta")
.mode("overwrite")
.save("/metrics/kafka_batch_processing")