Kafka 0.10+ Source for Structured Streaming - provides integration between Apache Spark's Structured Streaming and Apache Kafka for real-time data processing
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
The Spark SQL Kafka connector provides integration between Apache Spark's Structured Streaming and Apache Kafka 0.10+, enabling real-time data processing capabilities. It implements both source and sink functionality for Kafka topics with exactly-once semantics, offset management, and flexible consumer strategies.
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.Consumer
import org.apache.spark.sql.kafka010._import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("KafkaExample")
.getOrCreate()
// Read from Kafka as a streaming DataFrame
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "earliest")
.load()
// Kafka schema: key, value, topic, partition, offset, timestamp, timestampType, headers (optional)
val messageDF = df.selectExpr("CAST(value AS STRING) as message")
// Start streaming query
val query = messageDF.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()// Write streaming data to Kafka
val query = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.start()// Batch read from Kafka
val batchDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Batch write to Kafka
batchDF.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.save()The Spark SQL Kafka connector is built around several key components:
Options for connecting to Kafka clusters and configuring consumer/producer behavior.
// Required connection configuration
.option("kafka.bootstrap.servers", "localhost:9092")
// Consumer strategies (choose one)
.option("subscribe", "topic1,topic2") // Subscribe to topics
.option("subscribePattern", "topic-.*") // Subscribe with regex
.option("assign", """{"topic1":[0,1],"topic2":[0]}""") // Assign partitionsComprehensive offset control for both batch and streaming operations.
// Offset specification options
.option("startingOffsets", "earliest") // earliest, latest, or JSON
.option("endingOffsets", "latest") // latest or JSON (batch only)
.option("startingOffsetsByTimestamp", """{"topic1":{"0":1640995200000}}""")
.option("endingOffsetsByTimestamp", """{"topic1":{"0":1640995300000}}""")Fixed schema for Kafka records with support for headers and type conversion.
// Standard Kafka schema (without headers)
val schemaWithoutHeaders = StructType(Array(
StructField("key", BinaryType),
StructField("value", BinaryType),
StructField("topic", StringType),
StructField("partition", IntegerType),
StructField("offset", LongType),
StructField("timestamp", TimestampType),
StructField("timestampType", IntegerType)
))
// Schema with headers (when includeHeaders=true)
val headersType = ArrayType(StructType(Array(
StructField("key", StringType),
StructField("value", BinaryType)
)))
val schemaWithHeaders = StructType(
schemaWithoutHeaders.fields :+ StructField("headers", headersType)
)
// Schema access function
def kafkaSchema(includeHeaders: Boolean): StructType = {
if (includeHeaders) schemaWithHeaders else schemaWithoutHeaders
}Streaming source and sink functionality with advanced configuration options.
// Streaming-specific options
.option("maxOffsetsPerTrigger", "1000") // Rate limiting
.option("failOnDataLoss", "true") // Data loss policy
.option("minPartitions", "3") // Minimum partitionsBatch reading and writing with flexible offset ranges and performance tuning.
// Batch operations support
spark.read.format("kafka") // Batch read
dataFrame.write.format("kafka") // Batch writeConfiguration options for handling data loss, connection failures, and retry policies.
// Reliability options
.option("failOnDataLoss", "false") // Continue on data loss
.option("fetchOffset.numRetries", "5") // Offset fetch retries
.option("fetchOffset.retryIntervalMs", "1000") // Retry interval
.option("kafkaConsumer.pollTimeoutMs", "60000") // Consumer poll timeoutError Handling and Reliability
// Core types for offset management
type PartitionOffsetMap = Map[TopicPartition, Long]
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long])
sealed trait KafkaOffsetRangeLimit
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
case class SpecificTimestampRangeLimit(topicTimestamps: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
// Offset range limit constants
object KafkaOffsetRangeLimit {
val LATEST: Long = -1L // indicates resolution to the latest offset
val EARLIEST: Long = -2L // indicates resolution to the earliest offset
}
case class KafkaOffsetRange(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String]
)
// Consumer strategy types
sealed trait ConsumerStrategy {
def createConsumer(kafkaParams: java.util.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
}
case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy
// Configuration option keys
object KafkaSourceProvider {
val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
val STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "startingoffsetsbytimestamp"
val ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "endingoffsetsbytimestamp"
val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
val MAX_OFFSET_PER_TRIGGER = "maxoffsetspertrigger"
val FETCH_OFFSET_NUM_RETRY = "fetchoffset.numretries"
val FETCH_OFFSET_RETRY_INTERVAL_MS = "fetchoffset.retryintervalms"
val CONSUMER_POLL_TIMEOUT = "kafkaconsumer.polltimeoutms"
val INCLUDE_HEADERS = "includeheaders"
val TOPIC_OPTION_KEY = "topic"
}