Apache Spark Structured Streaming integration with Apache Kafka providing comprehensive data source and sink capabilities for both batch and streaming workloads.
Batch data access capabilities for reading historical data from Kafka topics with configurable offset ranges, supporting large-scale data processing and analytics workloads.
Batch relation for reading historical data from Kafka topics with precise offset control.
/**
* Batch relation for reading from Kafka topics
*/
class KafkaRelation extends BaseRelation with TableScan with Logging {
/**
* Returns the SQL context
* @return SQLContext for this relation
*/
def sqlContext: SQLContext
/**
* Returns schema for Kafka records
* @return StructType defining record schema
*/
def schema: StructType
/**
* Builds scan RDD for batch processing
* @return RDD[Row] containing Kafka records
*/
def buildScan(): RDD[Row]
/**
* String representation of the relation
* @return String describing this relation
*/
def toString: String
}RDD implementation for reading Kafka data based on offset ranges with partition-aware processing.
/**
* RDD for reading Kafka data based on offset ranges
*/
class KafkaSourceRDD extends RDD[ConsumerRecord[Array[Byte], Array[Byte]]] {
/**
* Persistence not supported for KafkaSourceRDD
* @param newLevel Storage level (ignored)
* @return this RDD (logs error)
*/
def persist(newLevel: StorageLevel): this.type
/**
* Gets RDD partitions based on offset ranges
* @return Array of Partition objects
*/
def getPartitions: Array[Partition]
/**
* Gets preferred executor locations for data locality
* @param split Partition to get locations for
* @return Sequence of preferred executor hostnames
*/
def getPreferredLocations(split: Partition): Seq[String]
/**
* Computes partition data by reading from Kafka
* @param thePart Partition to compute
* @param context Task context for the computation
* @return Iterator of Kafka ConsumerRecord objects
*/
def compute(
thePart: Partition,
context: TaskContext
): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]]
}Partition implementations for Kafka RDD processing.
/**
* Offset range for one RDD partition
* @param topicPartition Kafka topic partition
* @param fromOffset Starting offset (inclusive)
* @param untilOffset Ending offset (exclusive)
* @param preferredLoc Preferred executor location
*/
case class KafkaSourceRDDOffsetRange(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String]
) {
/** Gets topic name */
def topic: String = topicPartition.topic()
/** Gets partition number */
def partition: Int = topicPartition.partition()
/** Gets size of offset range */
def size: Long = untilOffset - fromOffset
}
/**
* RDD partition containing offset range
* @param index Partition index
* @param offsetRange Offset range for this partition
*/
case class KafkaSourceRDDPartition(
index: Int,
offsetRange: KafkaSourceRDDOffsetRange
) extends Partitionval batchDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "transactions")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
batchDF.show()val historicalDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", """{"events":{"0":1000,"1":2000}}""")
.option("endingOffsets", """{"events":{"0":5000,"1":6000}}""")
.load()
// Process historical data
val processedDF = historicalDF
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")
.filter($"timestamp" > "2023-01-01")val multiTopicDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1,topic2,topic3")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Aggregate by topic
val topicStats = multiTopicDF
.groupBy("topic")
.agg(
count("*").as("record_count"),
min("timestamp").as("earliest_timestamp"),
max("timestamp").as("latest_timestamp")
)
topicStats.show()val patternDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribePattern", "logs_.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()val assignedDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"important_topic":[0,1,2],"critical_topic":[0]}""")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()val optimizedDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "large-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("minPartitions", "20") // Increase parallelism
.load()val tunedDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "high-throughput-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("kafka.fetch.max.bytes", "52428800") // 50MB
.option("kafka.max.poll.records", "1000") // Records per poll
.option("kafka.fetch.min.bytes", "1024") // Min fetch size
.option("kafka.fetch.max.wait.ms", "500") // Max wait
.load()val memoryOptimizedDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "memory-intensive-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("kafka.receive.buffer.bytes", "262144") // 256KB
.option("kafka.send.buffer.bytes", "131072") // 128KB
.load()import org.apache.spark.sql.functions._
val timeBasedDF = batchDF
.withColumn("event_time", from_unixtime($"timestamp" / 1000))
.withColumn("date", to_date($"event_time"))
.filter($"date" >= "2023-01-01" && $"date" <= "2023-12-31")
// Group by date and count records
val dailyStats = timeBasedDF
.groupBy("date", "topic")
.agg(count("*").as("daily_count"))
.orderBy("date", "topic")import org.apache.spark.sql.functions._
val contentDF = batchDF
.selectExpr("CAST(key AS STRING) as message_key", "CAST(value AS STRING) as message_value")
.withColumn("json_data", from_json($"message_value", messageSchema))
.select("message_key", "json_data.*", "topic", "partition", "offset")val deduplicatedDF = batchDF
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "*")
.dropDuplicates("key") // Deduplicate by key
.orderBy("timestamp") // Maintain orderimport org.apache.spark.sql.types._
val kafkaSchema = StructType(Seq(
StructField("key", BinaryType, nullable = true),
StructField("value", BinaryType, nullable = true),
StructField("topic", StringType, nullable = false),
StructField("partition", IntegerType, nullable = false),
StructField("offset", LongType, nullable = false),
StructField("timestamp", TimestampType, nullable = false),
StructField("timestampType", IntegerType, nullable = false)
))val convertedDF = batchDF
.selectExpr(
"CAST(key AS STRING) as key_str",
"CAST(value AS STRING) as value_str",
"topic",
"partition",
"offset",
"timestamp",
"CASE WHEN timestampType = 0 THEN 'CreateTime' ELSE 'LogAppendTime' END as timestamp_type"
)try {
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic")
.option("startingOffsets", """{"topic":{"0":1000}}""")
.option("endingOffsets", """{"topic":{"0":5000}}""")
.load()
df.count()
} catch {
case e: IllegalArgumentException =>
println(s"Invalid offset configuration: ${e.getMessage}")
case e: Exception =>
println(s"Error reading from Kafka: ${e.getMessage}")
}import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import scala.collection.JavaConverters._
def checkTopicExists(brokers: String, topic: String): Boolean = {
val props = Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers).asJava
val adminClient = AdminClient.create(props)
try {
val topics = adminClient.listTopics().names().get()
topics.contains(topic)
} finally {
adminClient.close()
}
}
if (checkTopicExists("localhost:9092", "my-topic")) {
val df = spark.read.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.load()
}batchDF.createOrReplaceTempView("kafka_messages")
val sqlResult = spark.sql("""
SELECT
topic,
partition,
COUNT(*) as message_count,
MIN(offset) as min_offset,
MAX(offset) as max_offset,
MIN(timestamp) as earliest_time,
MAX(timestamp) as latest_time
FROM kafka_messages
GROUP BY topic, partition
ORDER BY topic, partition
""")
sqlResult.show()spark.sql("""
SELECT
topic,
DATE(timestamp) as date,
HOUR(timestamp) as hour,
COUNT(*) as hourly_count,
AVG(LENGTH(CAST(value AS STRING))) as avg_message_size
FROM kafka_messages
WHERE timestamp >= '2023-01-01'
GROUP BY topic, DATE(timestamp), HOUR(timestamp)
ORDER BY date, hour
""").show()Use earliest/latest for full scans:
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")Use specific offsets for incremental processing:
.option("startingOffsets", s"""{"$topic":{"0":$lastProcessedOffset}}""")Monitor partition lag:
val offsetInfo = batchDF
.groupBy("topic", "partition")
.agg(min("offset").as("min_offset"), max("offset").as("max_offset"))// Configure driver and executor memory appropriately
spark.conf.set("spark.driver.memory", "4g")
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.cores", "4")
// Optimize shuffle partitions for Kafka data
spark.conf.set("spark.sql.shuffle.partitions", "200")Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-sql-kafka-0-10-2-11