Apache Spark Structured Streaming integration with Apache Kafka providing comprehensive data source and sink capabilities for both batch and streaming workloads.
npx @tessl/cli install tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-11@2.4.0Apache Spark Kafka Integration provides comprehensive structured streaming and batch data processing capabilities for Apache Kafka. This module enables seamless reading from and writing to Kafka topics using Spark DataFrames and Datasets with support for micro-batch processing, continuous streaming, and batch operations with complete offset management and fault tolerance.
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.kafka.common.TopicPartitionimport org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("KafkaExample")
.getOrCreate()
// Read from Kafka topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "latest")
.load()
// Process the stream
val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
query.awaitTermination()// Write DataFrame to Kafka
df.select(
col("id").cast("string").as("key"),
to_json(struct(col("*"))).as("value")
)
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.save()// Read from Kafka for batch processing
val batchDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()The Spark Kafka integration is built around several key components:
KafkaSourceProvider implements Spark's DataSource API for both V1 and V2All Kafka records follow this fixed schema:
StructType(Seq(
StructField("key", BinaryType), // Message key (nullable)
StructField("value", BinaryType), // Message value (nullable)
StructField("topic", StringType), // Topic name
StructField("partition", IntegerType), // Partition number
StructField("offset", LongType), // Message offset
StructField("timestamp", TimestampType), // Message timestamp
StructField("timestampType", IntegerType) // Timestamp type (0=CreateTime, 1=LogAppendTime)
))Flexible patterns for consuming data from Kafka topics, supporting subscription by topic names, regex patterns, or specific partition assignments.
sealed trait ConsumerStrategy {
def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
}
case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategyComprehensive offset tracking and range limit handling for precise control over data consumption boundaries.
sealed trait KafkaOffsetRangeLimit
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2Advanced streaming readers supporting both micro-batch and continuous processing modes with fault tolerance and exactly-once semantics.
class KafkaMicroBatchReader extends MicroBatchReader with Logging {
def setOffsetRange(start: Option[Offset], end: Offset): Unit
def planInputPartitions(): ju.List[InputPartition[InternalRow]]
def readSchema(): StructType
}
class KafkaContinuousReader extends ContinuousReader with Logging {
def readSchema: StructType
def setStartOffset(start: Option[Offset]): Unit
def planInputPartitions(): ju.List[InputPartition[InternalRow]]
}Batch relation for reading historical data from Kafka topics with configurable offset ranges.
class KafkaRelation extends BaseRelation with TableScan with Logging {
def sqlContext: SQLContext
def schema: StructType
def buildScan(): RDD[Row]
}Comprehensive writing capabilities for both streaming and batch workloads with producer connection pooling and automatic serialization.
class KafkaSink extends Sink with Logging {
def addBatch(batchId: Long, data: DataFrame): Unit
}
class KafkaStreamWriter extends StreamWriter {
def createWriterFactory(): KafkaStreamWriterFactory
def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit
}
object KafkaWriter extends Logging {
def write(sparkSession: SparkSession, queryExecution: QueryExecution,
kafkaParameters: ju.Map[String, Object], topic: Option[String]): Unit
}Complete configuration options for fine-tuning Kafka integration behavior, connection parameters, and performance settings.
Source Options:
// Connection
"kafka.bootstrap.servers" -> "localhost:9092"
"subscribe" -> "topic1,topic2"
"subscribePattern" -> "topic.*"
"assign" -> """{"topic1":[0,1],"topic2":[0]}"""
// Offset Management
"startingOffsets" -> "earliest" // or "latest" or JSON
"endingOffsets" -> "latest" // or JSON (batch only)
"failOnDataLoss" -> "true"
// Performance
"minPartitions" -> "10"
"maxOffsetsPerTrigger" -> "1000000"Sink Options:
"kafka.bootstrap.servers" -> "localhost:9092"
"topic" -> "output-topic"// Package-level type alias
type PartitionOffsetMap = Map[TopicPartition, Long]
// Data Consumer Types
case class AvailableOffsetRange(earliest: Long, latest: Long)
sealed trait KafkaDataConsumer {
def get(offset: Long, untilOffset: Long, pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]]
def getAvailableOffsetRange(): AvailableOffsetRange
def release(): Unit
}
// Offset Range Types
case class KafkaOffsetRange(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String]
) {
lazy val size: Long = untilOffset - fromOffset
}
// RDD Types
case class KafkaSourceRDDOffsetRange(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String]
) {
def topic: String = topicPartition.topic
def partition: Int = topicPartition.partition
def size: Long = untilOffset - fromOffset
}