Kafka 0.10+ Source for Structured Streaming - provides integration between Apache Spark's Structured Streaming and Apache Kafka for real-time data processing
tessl install tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-12@3.0.0The Spark SQL Kafka connector provides integration between Apache Spark's Structured Streaming and Apache Kafka 0.10+, enabling real-time data processing capabilities. It implements both source and sink functionality for Kafka topics with exactly-once semantics, offset management, and flexible consumer strategies.
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.Consumer
import org.apache.spark.sql.kafka010._import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("KafkaExample")
.getOrCreate()
// Read from Kafka as a streaming DataFrame
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "earliest")
.load()
// Kafka schema: key, value, topic, partition, offset, timestamp, timestampType, headers (optional)
val messageDF = df.selectExpr("CAST(value AS STRING) as message")
// Start streaming query
val query = messageDF.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()// Write streaming data to Kafka
val query = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.start()// Batch read from Kafka
val batchDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Batch write to Kafka
batchDF.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.save()The Spark SQL Kafka connector is built around several key components:
Options for connecting to Kafka clusters and configuring consumer/producer behavior.
// Required connection configuration
.option("kafka.bootstrap.servers", "localhost:9092")
// Consumer strategies (choose one)
.option("subscribe", "topic1,topic2") // Subscribe to topics
.option("subscribePattern", "topic-.*") // Subscribe with regex
.option("assign", """{"topic1":[0,1],"topic2":[0]}""") // Assign partitionsComprehensive offset control for both batch and streaming operations.
// Offset specification options
.option("startingOffsets", "earliest") // earliest, latest, or JSON
.option("endingOffsets", "latest") // latest or JSON (batch only)
.option("startingOffsetsByTimestamp", """{"topic1":{"0":1640995200000}}""")
.option("endingOffsetsByTimestamp", """{"topic1":{"0":1640995300000}}""")Fixed schema for Kafka records with support for headers and type conversion.
// Standard Kafka schema (without headers)
val schemaWithoutHeaders = StructType(Array(
StructField("key", BinaryType),
StructField("value", BinaryType),
StructField("topic", StringType),
StructField("partition", IntegerType),
StructField("offset", LongType),
StructField("timestamp", TimestampType),
StructField("timestampType", IntegerType)
))
// Schema with headers (when includeHeaders=true)
val headersType = ArrayType(StructType(Array(
StructField("key", StringType),
StructField("value", BinaryType)
)))
val schemaWithHeaders = StructType(
schemaWithoutHeaders.fields :+ StructField("headers", headersType)
)
// Schema access function
def kafkaSchema(includeHeaders: Boolean): StructType = {
if (includeHeaders) schemaWithHeaders else schemaWithoutHeaders
}Streaming source and sink functionality with advanced configuration options.
// Streaming-specific options
.option("maxOffsetsPerTrigger", "1000") // Rate limiting
.option("failOnDataLoss", "true") // Data loss policy
.option("minPartitions", "3") // Minimum partitionsBatch reading and writing with flexible offset ranges and performance tuning.
// Batch operations support
spark.read.format("kafka") // Batch read
dataFrame.write.format("kafka") // Batch writeConfiguration options for handling data loss, connection failures, and retry policies.
// Reliability options
.option("failOnDataLoss", "false") // Continue on data loss
.option("fetchOffset.numRetries", "5") // Offset fetch retries
.option("fetchOffset.retryIntervalMs", "1000") // Retry interval
.option("kafkaConsumer.pollTimeoutMs", "60000") // Consumer poll timeoutError Handling and Reliability
// Core types for offset management
type PartitionOffsetMap = Map[TopicPartition, Long]
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long])
sealed trait KafkaOffsetRangeLimit
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
case class SpecificTimestampRangeLimit(topicTimestamps: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
// Offset range limit constants
object KafkaOffsetRangeLimit {
val LATEST: Long = -1L // indicates resolution to the latest offset
val EARLIEST: Long = -2L // indicates resolution to the earliest offset
}
case class KafkaOffsetRange(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String]
)
// Consumer strategy types
sealed trait ConsumerStrategy {
def createConsumer(kafkaParams: java.util.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
}
case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy
// Configuration option keys
object KafkaSourceProvider {
val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
val STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "startingoffsetsbytimestamp"
val ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "endingoffsetsbytimestamp"
val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
val MAX_OFFSET_PER_TRIGGER = "maxoffsetspertrigger"
val FETCH_OFFSET_NUM_RETRY = "fetchoffset.numretries"
val FETCH_OFFSET_RETRY_INTERVAL_MS = "fetchoffset.retryintervalms"
val CONSUMER_POLL_TIMEOUT = "kafkaconsumer.polltimeoutms"
val INCLUDE_HEADERS = "includeheaders"
val TOPIC_OPTION_KEY = "topic"
}