Apache Spark Structured Streaming integration with Apache Kafka providing comprehensive data source and sink capabilities for both batch and streaming workloads.
npx @tessl/cli install tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-11@2.4.00
# Apache Spark Kafka Integration
1
2
Apache Spark Kafka Integration provides comprehensive structured streaming and batch data processing capabilities for Apache Kafka. This module enables seamless reading from and writing to Kafka topics using Spark DataFrames and Datasets with support for micro-batch processing, continuous streaming, and batch operations with complete offset management and fault tolerance.
3
4
## Package Information
5
6
- **Package Name**: spark-sql-kafka-0-10_2.11
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Installation**: Add to your Spark application dependencies
10
- **Coordinate**: `org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8`
11
12
## Core Imports
13
14
```scala
15
import org.apache.spark.sql.SparkSession
16
import org.apache.spark.sql.streaming.Trigger
17
import org.apache.kafka.common.TopicPartition
18
```
19
20
## Basic Usage
21
22
### Reading from Kafka (Streaming)
23
24
```scala
25
import org.apache.spark.sql.SparkSession
26
27
val spark = SparkSession.builder
28
.appName("KafkaExample")
29
.getOrCreate()
30
31
// Read from Kafka topic
32
val df = spark
33
.readStream
34
.format("kafka")
35
.option("kafka.bootstrap.servers", "localhost:9092")
36
.option("subscribe", "my-topic")
37
.option("startingOffsets", "latest")
38
.load()
39
40
// Process the stream
41
val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
42
.writeStream
43
.outputMode("append")
44
.format("console")
45
.trigger(Trigger.ProcessingTime("10 seconds"))
46
.start()
47
48
query.awaitTermination()
49
```
50
51
### Writing to Kafka
52
53
```scala
54
// Write DataFrame to Kafka
55
df.select(
56
col("id").cast("string").as("key"),
57
to_json(struct(col("*"))).as("value")
58
)
59
.write
60
.format("kafka")
61
.option("kafka.bootstrap.servers", "localhost:9092")
62
.option("topic", "output-topic")
63
.save()
64
```
65
66
### Batch Processing
67
68
```scala
69
// Read from Kafka for batch processing
70
val batchDF = spark
71
.read
72
.format("kafka")
73
.option("kafka.bootstrap.servers", "localhost:9092")
74
.option("subscribe", "my-topic")
75
.option("startingOffsets", "earliest")
76
.option("endingOffsets", "latest")
77
.load()
78
```
79
80
## Architecture
81
82
The Spark Kafka integration is built around several key components:
83
84
- **Data Source Provider**: `KafkaSourceProvider` implements Spark's DataSource API for both V1 and V2
85
- **Consumer Strategies**: Flexible patterns for topic assignment (Assign, Subscribe, SubscribePattern)
86
- **Offset Management**: Comprehensive offset tracking with configurable start/end positions
87
- **Streaming Readers**: Micro-batch and continuous processing capabilities
88
- **Producer Integration**: Efficient writing with connection pooling and caching
89
- **Schema Management**: Fixed schema for Kafka records with proper type handling
90
91
## Kafka Record Schema
92
93
All Kafka records follow this fixed schema:
94
95
```scala
96
StructType(Seq(
97
StructField("key", BinaryType), // Message key (nullable)
98
StructField("value", BinaryType), // Message value (nullable)
99
StructField("topic", StringType), // Topic name
100
StructField("partition", IntegerType), // Partition number
101
StructField("offset", LongType), // Message offset
102
StructField("timestamp", TimestampType), // Message timestamp
103
StructField("timestampType", IntegerType) // Timestamp type (0=CreateTime, 1=LogAppendTime)
104
))
105
```
106
107
## Capabilities
108
109
### Consumer Strategies
110
111
Flexible patterns for consuming data from Kafka topics, supporting subscription by topic names, regex patterns, or specific partition assignments.
112
113
```scala { .api }
114
sealed trait ConsumerStrategy {
115
def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
116
}
117
118
case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy
119
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy
120
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy
121
```
122
123
[Consumer Strategies](./consumer-strategies.md)
124
125
### Offset Management
126
127
Comprehensive offset tracking and range limit handling for precise control over data consumption boundaries.
128
129
```scala { .api }
130
sealed trait KafkaOffsetRangeLimit
131
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit
132
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
133
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
134
135
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2
136
```
137
138
[Offset Management](./offset-management.md)
139
140
### Streaming Data Sources
141
142
Advanced streaming readers supporting both micro-batch and continuous processing modes with fault tolerance and exactly-once semantics.
143
144
```scala { .api }
145
class KafkaMicroBatchReader extends MicroBatchReader with Logging {
146
def setOffsetRange(start: Option[Offset], end: Offset): Unit
147
def planInputPartitions(): ju.List[InputPartition[InternalRow]]
148
def readSchema(): StructType
149
}
150
151
class KafkaContinuousReader extends ContinuousReader with Logging {
152
def readSchema: StructType
153
def setStartOffset(start: Option[Offset]): Unit
154
def planInputPartitions(): ju.List[InputPartition[InternalRow]]
155
}
156
```
157
158
[Streaming Sources](./streaming-sources.md)
159
160
### Batch Data Access
161
162
Batch relation for reading historical data from Kafka topics with configurable offset ranges.
163
164
```scala { .api }
165
class KafkaRelation extends BaseRelation with TableScan with Logging {
166
def sqlContext: SQLContext
167
def schema: StructType
168
def buildScan(): RDD[Row]
169
}
170
```
171
172
[Batch Processing](./batch-processing.md)
173
174
### Data Writing
175
176
Comprehensive writing capabilities for both streaming and batch workloads with producer connection pooling and automatic serialization.
177
178
```scala { .api }
179
class KafkaSink extends Sink with Logging {
180
def addBatch(batchId: Long, data: DataFrame): Unit
181
}
182
183
class KafkaStreamWriter extends StreamWriter {
184
def createWriterFactory(): KafkaStreamWriterFactory
185
def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit
186
}
187
188
object KafkaWriter extends Logging {
189
def write(sparkSession: SparkSession, queryExecution: QueryExecution,
190
kafkaParameters: ju.Map[String, Object], topic: Option[String]): Unit
191
}
192
```
193
194
[Data Writing](./data-writing.md)
195
196
### Configuration and Options
197
198
Complete configuration options for fine-tuning Kafka integration behavior, connection parameters, and performance settings.
199
200
**Source Options**:
201
```scala { .api }
202
// Connection
203
"kafka.bootstrap.servers" -> "localhost:9092"
204
"subscribe" -> "topic1,topic2"
205
"subscribePattern" -> "topic.*"
206
"assign" -> """{"topic1":[0,1],"topic2":[0]}"""
207
208
// Offset Management
209
"startingOffsets" -> "earliest" // or "latest" or JSON
210
"endingOffsets" -> "latest" // or JSON (batch only)
211
"failOnDataLoss" -> "true"
212
213
// Performance
214
"minPartitions" -> "10"
215
"maxOffsetsPerTrigger" -> "1000000"
216
```
217
218
**Sink Options**:
219
```scala { .api }
220
"kafka.bootstrap.servers" -> "localhost:9092"
221
"topic" -> "output-topic"
222
```
223
224
[Configuration](./configuration.md)
225
226
## Types
227
228
```scala { .api }
229
// Package-level type alias
230
type PartitionOffsetMap = Map[TopicPartition, Long]
231
232
// Data Consumer Types
233
case class AvailableOffsetRange(earliest: Long, latest: Long)
234
235
sealed trait KafkaDataConsumer {
236
def get(offset: Long, untilOffset: Long, pollTimeoutMs: Long,
237
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]]
238
def getAvailableOffsetRange(): AvailableOffsetRange
239
def release(): Unit
240
}
241
242
// Offset Range Types
243
case class KafkaOffsetRange(
244
topicPartition: TopicPartition,
245
fromOffset: Long,
246
untilOffset: Long,
247
preferredLoc: Option[String]
248
) {
249
lazy val size: Long = untilOffset - fromOffset
250
}
251
252
// RDD Types
253
case class KafkaSourceRDDOffsetRange(
254
topicPartition: TopicPartition,
255
fromOffset: Long,
256
untilOffset: Long,
257
preferredLoc: Option[String]
258
) {
259
def topic: String = topicPartition.topic
260
def partition: Int = topicPartition.partition
261
def size: Long = untilOffset - fromOffset
262
}
263
```