Batch data access capabilities for reading historical data from Kafka topics with configurable offset ranges, supporting large-scale data processing and analytics workloads.
Batch relation for reading historical data from Kafka topics with precise offset control.
/**
* Batch relation for reading from Kafka topics
*/
class KafkaRelation extends BaseRelation with TableScan with Logging {
/**
* Returns the SQL context
* @return SQLContext for this relation
*/
def sqlContext: SQLContext
/**
* Returns schema for Kafka records
* @return StructType defining record schema
*/
def schema: StructType
/**
* Builds scan RDD for batch processing
* @return RDD[Row] containing Kafka records
*/
def buildScan(): RDD[Row]
/**
* String representation of the relation
* @return String describing this relation
*/
def toString: String
}RDD implementation for reading Kafka data based on offset ranges with partition-aware processing.
/**
* RDD for reading Kafka data based on offset ranges
*/
class KafkaSourceRDD extends RDD[ConsumerRecord[Array[Byte], Array[Byte]]] {
/**
* Persistence not supported for KafkaSourceRDD
* @param newLevel Storage level (ignored)
* @return this RDD (logs error)
*/
def persist(newLevel: StorageLevel): this.type
/**
* Gets RDD partitions based on offset ranges
* @return Array of Partition objects
*/
def getPartitions: Array[Partition]
/**
* Gets preferred executor locations for data locality
* @param split Partition to get locations for
* @return Sequence of preferred executor hostnames
*/
def getPreferredLocations(split: Partition): Seq[String]
/**
* Computes partition data by reading from Kafka
* @param thePart Partition to compute
* @param context Task context for the computation
* @return Iterator of Kafka ConsumerRecord objects
*/
def compute(
thePart: Partition,
context: TaskContext
): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]]
}Partition implementations for Kafka RDD processing.
/**
* Offset range for one RDD partition
* @param topicPartition Kafka topic partition
* @param fromOffset Starting offset (inclusive)
* @param untilOffset Ending offset (exclusive)
* @param preferredLoc Preferred executor location
*/
case class KafkaSourceRDDOffsetRange(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String]
) {
/** Gets topic name */
def topic: String = topicPartition.topic()
/** Gets partition number */
def partition: Int = topicPartition.partition()
/** Gets size of offset range */
def size: Long = untilOffset - fromOffset
}
/**
* RDD partition containing offset range
* @param index Partition index
* @param offsetRange Offset range for this partition
*/
case class KafkaSourceRDDPartition(
index: Int,
offsetRange: KafkaSourceRDDOffsetRange
) extends Partitionval batchDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "transactions")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
batchDF.show()val historicalDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", """{"events":{"0":1000,"1":2000}}""")
.option("endingOffsets", """{"events":{"0":5000,"1":6000}}""")
.load()
// Process historical data
val processedDF = historicalDF
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")
.filter($"timestamp" > "2023-01-01")val multiTopicDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1,topic2,topic3")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Aggregate by topic
val topicStats = multiTopicDF
.groupBy("topic")
.agg(
count("*").as("record_count"),
min("timestamp").as("earliest_timestamp"),
max("timestamp").as("latest_timestamp")
)
topicStats.show()val patternDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribePattern", "logs_.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()val assignedDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"important_topic":[0,1,2],"critical_topic":[0]}""")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()val optimizedDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "large-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("minPartitions", "20") // Increase parallelism
.load()val tunedDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "high-throughput-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("kafka.fetch.max.bytes", "52428800") // 50MB
.option("kafka.max.poll.records", "1000") // Records per poll
.option("kafka.fetch.min.bytes", "1024") // Min fetch size
.option("kafka.fetch.max.wait.ms", "500") // Max wait
.load()val memoryOptimizedDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "memory-intensive-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("kafka.receive.buffer.bytes", "262144") // 256KB
.option("kafka.send.buffer.bytes", "131072") // 128KB
.load()import org.apache.spark.sql.functions._
val timeBasedDF = batchDF
.withColumn("event_time", from_unixtime($"timestamp" / 1000))
.withColumn("date", to_date($"event_time"))
.filter($"date" >= "2023-01-01" && $"date" <= "2023-12-31")
// Group by date and count records
val dailyStats = timeBasedDF
.groupBy("date", "topic")
.agg(count("*").as("daily_count"))
.orderBy("date", "topic")import org.apache.spark.sql.functions._
val contentDF = batchDF
.selectExpr("CAST(key AS STRING) as message_key", "CAST(value AS STRING) as message_value")
.withColumn("json_data", from_json($"message_value", messageSchema))
.select("message_key", "json_data.*", "topic", "partition", "offset")val deduplicatedDF = batchDF
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "*")
.dropDuplicates("key") // Deduplicate by key
.orderBy("timestamp") // Maintain orderimport org.apache.spark.sql.types._
val kafkaSchema = StructType(Seq(
StructField("key", BinaryType, nullable = true),
StructField("value", BinaryType, nullable = true),
StructField("topic", StringType, nullable = false),
StructField("partition", IntegerType, nullable = false),
StructField("offset", LongType, nullable = false),
StructField("timestamp", TimestampType, nullable = false),
StructField("timestampType", IntegerType, nullable = false)
))val convertedDF = batchDF
.selectExpr(
"CAST(key AS STRING) as key_str",
"CAST(value AS STRING) as value_str",
"topic",
"partition",
"offset",
"timestamp",
"CASE WHEN timestampType = 0 THEN 'CreateTime' ELSE 'LogAppendTime' END as timestamp_type"
)try {
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic")
.option("startingOffsets", """{"topic":{"0":1000}}""")
.option("endingOffsets", """{"topic":{"0":5000}}""")
.load()
df.count()
} catch {
case e: IllegalArgumentException =>
println(s"Invalid offset configuration: ${e.getMessage}")
case e: Exception =>
println(s"Error reading from Kafka: ${e.getMessage}")
}import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import scala.collection.JavaConverters._
def checkTopicExists(brokers: String, topic: String): Boolean = {
val props = Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers).asJava
val adminClient = AdminClient.create(props)
try {
val topics = adminClient.listTopics().names().get()
topics.contains(topic)
} finally {
adminClient.close()
}
}
if (checkTopicExists("localhost:9092", "my-topic")) {
val df = spark.read.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.load()
}batchDF.createOrReplaceTempView("kafka_messages")
val sqlResult = spark.sql("""
SELECT
topic,
partition,
COUNT(*) as message_count,
MIN(offset) as min_offset,
MAX(offset) as max_offset,
MIN(timestamp) as earliest_time,
MAX(timestamp) as latest_time
FROM kafka_messages
GROUP BY topic, partition
ORDER BY topic, partition
""")
sqlResult.show()spark.sql("""
SELECT
topic,
DATE(timestamp) as date,
HOUR(timestamp) as hour,
COUNT(*) as hourly_count,
AVG(LENGTH(CAST(value AS STRING))) as avg_message_size
FROM kafka_messages
WHERE timestamp >= '2023-01-01'
GROUP BY topic, DATE(timestamp), HOUR(timestamp)
ORDER BY date, hour
""").show()Use earliest/latest for full scans:
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")Use specific offsets for incremental processing:
.option("startingOffsets", s"""{"$topic":{"0":$lastProcessedOffset}}""")Monitor partition lag:
val offsetInfo = batchDF
.groupBy("topic", "partition")
.agg(min("offset").as("min_offset"), max("offset").as("max_offset"))// Configure driver and executor memory appropriately
spark.conf.set("spark.driver.memory", "4g")
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.cores", "4")
// Optimize shuffle partitions for Kafka data
spark.conf.set("spark.sql.shuffle.partitions", "200")