Kafka 0.10+ Source for Structured Streaming
npx @tessl/cli install tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-13@3.5.0Apache Spark Kafka SQL Connector provides seamless integration between Apache Kafka message queues and Apache Spark's Structured Streaming framework. It enables both reading from and writing to Kafka topics with exactly-once processing semantics, fault tolerance, and automatic offset management for building real-time data pipelines.
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.functions._
// The connector is registered automatically as "kafka" data source
// No direct imports of connector classes are neededFor advanced usage with types:
import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.kafka010.PartitionOffsetMapimport org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("KafkaStreaming")
.getOrCreate()
// Read from Kafka using structured streaming
val kafkaStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "latest")
.load()
// Process the stream
val processedStream = kafkaStream
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")
.writeStream
.outputMode("append")
.format("console")
.start()val kafkaBatch = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()val dataFrame = spark.createDataFrame(Seq(
("key1", "value1"),
("key2", "value2")
)).toDF("key", "value")
dataFrame
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.save()The Spark Kafka SQL Connector is built around several key components:
Core data source functionality for registering Kafka as a Spark SQL data source with "kafka" identifier.
// Automatically registered - no direct usage
class KafkaSourceProvider extends DataSourceRegister
with StreamSourceProvider
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
with SimpleTableProviderFlexible subscription patterns for connecting to Kafka topics including direct assignment, topic subscription, and pattern-based subscription.
// Consumer strategies are configured via options:
// .option("subscribe", "topic1,topic2,topic3")
// .option("subscribePattern", "prefix-.*")
// .option("assign", """{"topic1":[0,1,2],"topic2":[0,1]}""")
sealed trait ConsumerStrategy {
def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
def createAdmin(kafkaParams: ju.Map[String, Object]): Admin
def assignedTopicPartitions(admin: Admin): Set[TopicPartition]
}Comprehensive offset positioning and range management supporting earliest, latest, specific offsets, and timestamp-based positioning.
// Offset range limits for controlling read boundaries
sealed trait KafkaOffsetRangeLimit
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
case class SpecificTimestampRangeLimit(topicTimestamps: Map[TopicPartition, Long], strategy: StrategyOnNoMatchStartingOffset.Value) extends KafkaOffsetRangeLimit
case class GlobalTimestampRangeLimit(timestamp: Long, strategy: StrategyOnNoMatchStartingOffset.Value) extends KafkaOffsetRangeLimitSchema definition and conversion between Kafka ConsumerRecord format and Spark DataFrame rows with optional header support.
// Schema with headers disabled (default)
val schemaWithoutHeaders = StructType(Array(
StructField("key", BinaryType),
StructField("value", BinaryType),
StructField("topic", StringType),
StructField("partition", IntegerType),
StructField("offset", LongType),
StructField("timestamp", TimestampType),
StructField("timestampType", IntegerType)
))
// Schema with headers enabled (.option("includeHeaders", "true"))
val schemaWithHeaders = schemaWithoutHeaders.add(
StructField("headers", ArrayType(StructType(Array(
StructField("key", StringType),
StructField("value", BinaryType)
))))
)
def kafkaSchema(includeHeaders: Boolean): StructTypeMicro-batch and continuous streaming implementations with comprehensive trigger support and metrics.
// Micro-batch streaming
class KafkaMicroBatchStream extends MicroBatchStream
with SupportsTriggerAvailableNow
with ReportsSourceMetrics {
def initialOffset(): Offset
def latestOffset(): Offset
def latestOffset(startOffset: Offset, readLimit: ReadLimit): Offset
def planInputPartitions(start: Offset, end: Offset): Array[InputPartition]
def createReaderFactory(): PartitionReaderFactory
def commit(end: Offset): Unit
def stop(): Unit
}
// Continuous streaming
class KafkaContinuousStream extends ContinuousStream {
def mergeOffsets(offsets: Array[PartitionOffset]): Offset
def initialOffset(): Offset
def deserializeOffset(json: String): Offset
def commit(end: Offset): Unit
def stop(): Unit
}Efficient batch reading with offset range calculation and partition optimization.
class KafkaBatch extends Batch {
def planInputPartitions(): Array[InputPartition]
def createReaderFactory(): PartitionReaderFactory
}
class KafkaBatchPartitionReader extends PartitionReader[InternalRow] {
def next(): Boolean
def get(): UnsafeRow
def close(): Unit
def currentMetricsValues(): Array[CustomTaskMetric]
}Both batch and streaming write support with producer pooling, topic routing, and data validation.
// Core writer functionality
object KafkaWriter {
val TOPIC_ATTRIBUTE_NAME: String = "topic"
val KEY_ATTRIBUTE_NAME: String = "key"
val VALUE_ATTRIBUTE_NAME: String = "value"
val HEADERS_ATTRIBUTE_NAME: String = "headers"
val PARTITION_ATTRIBUTE_NAME: String = "partition"
def write(sparkSession: SparkSession, queryExecution: QueryExecution,
kafkaParams: ju.Map[String, Object], topic: Option[String]): Unit
}
// V2 DataSource write implementation
case class KafkaWrite(topic: Option[String], producerParams: ju.Map[String, Object], schema: StructType) extends Write {
def description(): String
def toBatch: BatchWrite
def toStreaming: StreamingWrite
}kafka.bootstrap.servers: Kafka bootstrap servers (required)subscribe, subscribePattern, or assign (required)startingOffsets: Where to start reading ("earliest", "latest", or JSON offset specification)endingOffsets: Where to stop reading for batch queries ("latest" or JSON offset specification)failOnDataLoss: Whether to fail query when data loss is detected (default: "true")includeHeaders: Include Kafka headers in DataFrame schema (default: "false")maxOffsetsPerTrigger: Maximum number of offsets to process per triggerminOffsetsPerTrigger: Minimum number of offsets to process per triggerminPartitions: Minimum number of partitions for processingkafkaConsumer.pollTimeoutMs: Consumer poll timeout in millisecondsfetchOffset.numRetries: Number of retries for offset fetchingfetchOffset.retryIntervalMs: Retry interval for offset fetchinggroupIdPrefix: Prefix for consumer group IDsThe connector provides structured exception handling for common Kafka integration scenarios:
object KafkaExceptions {
def mismatchedTopicPartitionsBetweenEndOffsetAndPrefetched(
tpsForPrefetched: Set[TopicPartition],
tpsForEndOffset: Set[TopicPartition]): SparkException
def endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
prefetchedOffset: Map[TopicPartition, Long],
endOffset: Map[TopicPartition, Long]): SparkException
def lostTopicPartitionsInEndOffsetWithTriggerAvailableNow(
tpsForLatestOffset: Set[TopicPartition],
tpsForEndOffset: Set[TopicPartition]): SparkException
def endOffsetHasGreaterOffsetForTopicPartitionThanLatestWithTriggerAvailableNow(
latestOffset: Map[TopicPartition, Long],
endOffset: Map[TopicPartition, Long]): SparkException
}
## Custom Metrics
The connector exposes custom metrics for monitoring:
- `offsetOutOfRange`: Number of offsets that were out of range
- `dataLoss`: Number of data loss events detected
These metrics integrate with Spark's metrics system and can be monitored through Spark UI and external monitoring systems.