Kafka 0.10+ Source for Structured Streaming providing Kafka integration for Apache Spark's streaming and batch processing
npx @tessl/cli install tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-12@3.5.0The Apache Spark Kafka Connector (spark-sql-kafka-0-10_2.12) provides seamless integration between Apache Kafka and Apache Spark's Structured Streaming and SQL APIs. It enables both batch and streaming data processing from Kafka topics with exactly-once processing semantics, offset management, and fault tolerance.
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.6The connector is accessed through Spark SQL's DataSource API using the "kafka" format identifier:
// Reading from Kafka (streaming)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1,topic2")
.load()
// Writing to Kafka (streaming)
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.outputMode("append")
.start()import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("KafkaExample")
.getOrCreate()
// Stream from Kafka
val kafkaDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input-topic")
.option("startingOffsets", "earliest")
.load()
// Extract and process the value
val processedDF = kafkaDF
.select(col("value").cast("string").as("message"))
.filter(col("message").isNotNull)
// Write back to Kafka
val query = processedDF
.select(to_json(struct("*")).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.outputMode("append")
.start()The Spark Kafka connector is built around several key components:
Read data from Kafka topics in real-time using Spark Structured Streaming with micro-batch or continuous processing modes.
// Streaming read operation
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers: String)
.option("subscribe", topics: String) // or subscribepattern or assign
.option("startingOffsets", offsets: String) // "earliest", "latest", or JSON
.load(): DataFrameRead historical data from Kafka topics for batch processing and analysis.
// Batch read operation
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", servers: String)
.option("subscribe", topics: String) // or subscribepattern or assign
.option("startingOffsets", startOffsets: String)
.option("endingOffsets", endOffsets: String)
.load(): DataFrameWrite DataFrame data to Kafka topics with proper serialization and partitioning.
// Streaming write operation
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers: String)
.option("topic", topicName: String) // optional if specified in data
.outputMode("append")
.start(): StreamingQuery
// Batch write operation
df.write
.format("kafka")
.option("kafka.bootstrap.servers", servers: String)
.option("topic", topicName: String)
.save()Comprehensive configuration options for connection, performance tuning, and reliability.
// Core configuration options
.option("kafka.bootstrap.servers", servers: String) // Required
.option("subscribe", topics: String) // Topic selection
.option("maxOffsetsPerTrigger", maxRecords: String) // Performance tuning
.option("failOnDataLoss", failOnLoss: String) // ReliabilityAll Kafka DataFrames have the following fixed schema:
// Fixed Kafka record schema
case class KafkaRecord(
key: Array[Byte], // Message key as byte array (nullable)
value: Array[Byte], // Message value as byte array
topic: String, // Topic name
partition: Int, // Partition number
offset: Long, // Message offset within partition
timestamp: java.sql.Timestamp, // Message timestamp
timestampType: Int, // 0=CreateTime, 1=LogAppendTime
headers: Array[KafkaHeader] // Optional headers (when includeHeaders=true)
)
case class KafkaHeader(
key: String,
value: Array[Byte]
)For writing, DataFrames can contain any combination of these fields:
// Write schema fields (all optional except value)
case class KafkaWriteRecord(
topic: String, // Target topic (optional if set in options)
key: Any, // Message key (will be serialized)
value: Any, // Message value (required, will be serialized)
partition: Int, // Specific partition (optional)
headers: Map[String, Array[Byte]] // Message headers (optional)
)// Subscribe to specific topics by name
.option("subscribe", "topic1,topic2,topic3")
// Subscribe to topics matching a regex pattern
.option("subscribepattern", "events-.*")
// Assign specific partitions
.option("assign", """{"topic1":[0,1],"topic2":[0]}""")// Offset specification options
"earliest" // Start from earliest available offsets
"latest" // Start from latest available offsets
// Specific offsets per partition (JSON format)
"""{"topic1":{"0":23,"1":345},"topic2":{"0":0}}"""
// Global timestamp (milliseconds since epoch)
.option("startingTimestamp", "1609459200000")
// Per-partition timestamps (JSON format)
"""{"topic1":{"0":1609459200000,"1":1609459300000}}"""The connector provides robust error handling for common scenarios: