0
# Apache Spark Kafka SQL Connector
1
2
Apache Spark Kafka SQL Connector provides seamless integration between Apache Kafka message queues and Apache Spark's Structured Streaming framework. It enables both reading from and writing to Kafka topics with exactly-once processing semantics, fault tolerance, and automatic offset management for building real-time data pipelines.
3
4
## Package Information
5
6
- **Package Name**: spark-sql-kafka-0-10_2.13
7
- **Package Type**: Maven
8
- **Language**: Scala
9
- **Group ID**: org.apache.spark
10
- **Artifact ID**: spark-sql-kafka-0-10_2.13
11
- **Version**: 3.5.6
12
- **Installation**: Add to Maven dependencies or include when submitting Spark applications
13
14
## Core Imports
15
16
```scala
17
import org.apache.spark.sql.DataFrame
18
import org.apache.spark.sql.streaming.StreamingQuery
19
import org.apache.spark.sql.functions._
20
21
// The connector is registered automatically as "kafka" data source
22
// No direct imports of connector classes are needed
23
```
24
25
**For advanced usage with types:**
26
27
```scala
28
import org.apache.kafka.common.TopicPartition
29
import org.apache.spark.sql.kafka010.PartitionOffsetMap
30
```
31
32
## Basic Usage
33
34
### Reading from Kafka (Streaming)
35
36
```scala
37
import org.apache.spark.sql.SparkSession
38
39
val spark = SparkSession
40
.builder()
41
.appName("KafkaStreaming")
42
.getOrCreate()
43
44
// Read from Kafka using structured streaming
45
val kafkaStream = spark
46
.readStream
47
.format("kafka")
48
.option("kafka.bootstrap.servers", "localhost:9092")
49
.option("subscribe", "my-topic")
50
.option("startingOffsets", "latest")
51
.load()
52
53
// Process the stream
54
val processedStream = kafkaStream
55
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")
56
.writeStream
57
.outputMode("append")
58
.format("console")
59
.start()
60
```
61
62
### Reading from Kafka (Batch)
63
64
```scala
65
val kafkaBatch = spark
66
.read
67
.format("kafka")
68
.option("kafka.bootstrap.servers", "localhost:9092")
69
.option("subscribe", "my-topic")
70
.option("startingOffsets", "earliest")
71
.option("endingOffsets", "latest")
72
.load()
73
```
74
75
### Writing to Kafka
76
77
```scala
78
val dataFrame = spark.createDataFrame(Seq(
79
("key1", "value1"),
80
("key2", "value2")
81
)).toDF("key", "value")
82
83
dataFrame
84
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
85
.write
86
.format("kafka")
87
.option("kafka.bootstrap.servers", "localhost:9092")
88
.option("topic", "output-topic")
89
.save()
90
```
91
92
## Architecture
93
94
The Spark Kafka SQL Connector is built around several key components:
95
96
- **KafkaSourceProvider**: Main entry point implementing multiple Spark SQL interfaces for registration and instantiation
97
- **Consumer Strategies**: Flexible subscription patterns (subscribe, subscribePattern, assign) for different use cases
98
- **Offset Management**: Comprehensive offset tracking with support for earliest, latest, specific offsets, and timestamp-based positioning
99
- **Schema Conversion**: Automatic conversion between Kafka records and Spark rows with optional header support
100
- **Streaming Sources**: Both micro-batch and continuous streaming implementations with trigger support
101
- **Batch Sources**: Efficient batch reading with offset range optimization
102
- **Write Support**: Both batch and streaming write capabilities with producer pooling and configuration management
103
104
## Capabilities
105
106
### Data Source Registration
107
108
Core data source functionality for registering Kafka as a Spark SQL data source with "kafka" identifier.
109
110
```scala { .api }
111
// Automatically registered - no direct usage
112
class KafkaSourceProvider extends DataSourceRegister
113
with StreamSourceProvider
114
with StreamSinkProvider
115
with RelationProvider
116
with CreatableRelationProvider
117
with SimpleTableProvider
118
```
119
120
[Data Source Registration](./data-source.md)
121
122
### Consumer Strategy Configuration
123
124
Flexible subscription patterns for connecting to Kafka topics including direct assignment, topic subscription, and pattern-based subscription.
125
126
```scala { .api }
127
// Consumer strategies are configured via options:
128
// .option("subscribe", "topic1,topic2,topic3")
129
// .option("subscribePattern", "prefix-.*")
130
// .option("assign", """{"topic1":[0,1,2],"topic2":[0,1]}""")
131
132
sealed trait ConsumerStrategy {
133
def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
134
def createAdmin(kafkaParams: ju.Map[String, Object]): Admin
135
def assignedTopicPartitions(admin: Admin): Set[TopicPartition]
136
}
137
```
138
139
[Consumer Strategies](./consumer-strategies.md)
140
141
### Offset Management
142
143
Comprehensive offset positioning and range management supporting earliest, latest, specific offsets, and timestamp-based positioning.
144
145
```scala { .api }
146
// Offset range limits for controlling read boundaries
147
sealed trait KafkaOffsetRangeLimit
148
149
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit
150
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
151
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
152
case class SpecificTimestampRangeLimit(topicTimestamps: Map[TopicPartition, Long], strategy: StrategyOnNoMatchStartingOffset.Value) extends KafkaOffsetRangeLimit
153
case class GlobalTimestampRangeLimit(timestamp: Long, strategy: StrategyOnNoMatchStartingOffset.Value) extends KafkaOffsetRangeLimit
154
```
155
156
[Offset Management](./offset-management.md)
157
158
### Schema Conversion
159
160
Schema definition and conversion between Kafka ConsumerRecord format and Spark DataFrame rows with optional header support.
161
162
```scala { .api }
163
// Schema with headers disabled (default)
164
val schemaWithoutHeaders = StructType(Array(
165
StructField("key", BinaryType),
166
StructField("value", BinaryType),
167
StructField("topic", StringType),
168
StructField("partition", IntegerType),
169
StructField("offset", LongType),
170
StructField("timestamp", TimestampType),
171
StructField("timestampType", IntegerType)
172
))
173
174
// Schema with headers enabled (.option("includeHeaders", "true"))
175
val schemaWithHeaders = schemaWithoutHeaders.add(
176
StructField("headers", ArrayType(StructType(Array(
177
StructField("key", StringType),
178
StructField("value", BinaryType)
179
))))
180
)
181
182
def kafkaSchema(includeHeaders: Boolean): StructType
183
```
184
185
[Schema Conversion](./schema-conversion.md)
186
187
### Streaming Sources
188
189
Micro-batch and continuous streaming implementations with comprehensive trigger support and metrics.
190
191
```scala { .api }
192
// Micro-batch streaming
193
class KafkaMicroBatchStream extends MicroBatchStream
194
with SupportsTriggerAvailableNow
195
with ReportsSourceMetrics {
196
197
def initialOffset(): Offset
198
def latestOffset(): Offset
199
def latestOffset(startOffset: Offset, readLimit: ReadLimit): Offset
200
def planInputPartitions(start: Offset, end: Offset): Array[InputPartition]
201
def createReaderFactory(): PartitionReaderFactory
202
def commit(end: Offset): Unit
203
def stop(): Unit
204
}
205
206
// Continuous streaming
207
class KafkaContinuousStream extends ContinuousStream {
208
def mergeOffsets(offsets: Array[PartitionOffset]): Offset
209
def initialOffset(): Offset
210
def deserializeOffset(json: String): Offset
211
def commit(end: Offset): Unit
212
def stop(): Unit
213
}
214
```
215
216
[Streaming Sources](./streaming-sources.md)
217
218
### Batch Reading
219
220
Efficient batch reading with offset range calculation and partition optimization.
221
222
```scala { .api }
223
class KafkaBatch extends Batch {
224
def planInputPartitions(): Array[InputPartition]
225
def createReaderFactory(): PartitionReaderFactory
226
}
227
228
class KafkaBatchPartitionReader extends PartitionReader[InternalRow] {
229
def next(): Boolean
230
def get(): UnsafeRow
231
def close(): Unit
232
def currentMetricsValues(): Array[CustomTaskMetric]
233
}
234
```
235
236
[Batch Reading](./batch-reading.md)
237
238
### Writing to Kafka
239
240
Both batch and streaming write support with producer pooling, topic routing, and data validation.
241
242
```scala { .api }
243
// Core writer functionality
244
object KafkaWriter {
245
val TOPIC_ATTRIBUTE_NAME: String = "topic"
246
val KEY_ATTRIBUTE_NAME: String = "key"
247
val VALUE_ATTRIBUTE_NAME: String = "value"
248
val HEADERS_ATTRIBUTE_NAME: String = "headers"
249
val PARTITION_ATTRIBUTE_NAME: String = "partition"
250
251
def write(sparkSession: SparkSession, queryExecution: QueryExecution,
252
kafkaParams: ju.Map[String, Object], topic: Option[String]): Unit
253
}
254
255
// V2 DataSource write implementation
256
case class KafkaWrite(topic: Option[String], producerParams: ju.Map[String, Object], schema: StructType) extends Write {
257
def description(): String
258
def toBatch: BatchWrite
259
def toStreaming: StreamingWrite
260
}
261
```
262
263
[Writing to Kafka](./writing.md)
264
265
## Configuration Options
266
267
### Required Options
268
269
- `kafka.bootstrap.servers`: Kafka bootstrap servers (required)
270
- One of: `subscribe`, `subscribePattern`, or `assign` (required)
271
272
### Common Options
273
274
- `startingOffsets`: Where to start reading ("earliest", "latest", or JSON offset specification)
275
- `endingOffsets`: Where to stop reading for batch queries ("latest" or JSON offset specification)
276
- `failOnDataLoss`: Whether to fail query when data loss is detected (default: "true")
277
- `includeHeaders`: Include Kafka headers in DataFrame schema (default: "false")
278
- `maxOffsetsPerTrigger`: Maximum number of offsets to process per trigger
279
- `minOffsetsPerTrigger`: Minimum number of offsets to process per trigger
280
281
### Advanced Options
282
283
- `minPartitions`: Minimum number of partitions for processing
284
- `kafkaConsumer.pollTimeoutMs`: Consumer poll timeout in milliseconds
285
- `fetchOffset.numRetries`: Number of retries for offset fetching
286
- `fetchOffset.retryIntervalMs`: Retry interval for offset fetching
287
- `groupIdPrefix`: Prefix for consumer group IDs
288
289
## Error Handling
290
291
The connector provides structured exception handling for common Kafka integration scenarios:
292
293
- **Data Loss Detection**: Automatic detection of missing data due to Kafka retention or topic deletion
294
- **Offset Out of Range**: Handling of invalid offset requests
295
- **Connection Failures**: Retry logic for transient network issues
296
- **Configuration Validation**: Comprehensive validation of all configuration options
297
298
### Specific Exceptions
299
300
```scala { .api }
301
object KafkaExceptions {
302
def mismatchedTopicPartitionsBetweenEndOffsetAndPrefetched(
303
tpsForPrefetched: Set[TopicPartition],
304
tpsForEndOffset: Set[TopicPartition]): SparkException
305
306
def endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
307
prefetchedOffset: Map[TopicPartition, Long],
308
endOffset: Map[TopicPartition, Long]): SparkException
309
310
def lostTopicPartitionsInEndOffsetWithTriggerAvailableNow(
311
tpsForLatestOffset: Set[TopicPartition],
312
tpsForEndOffset: Set[TopicPartition]): SparkException
313
314
def endOffsetHasGreaterOffsetForTopicPartitionThanLatestWithTriggerAvailableNow(
315
latestOffset: Map[TopicPartition, Long],
316
endOffset: Map[TopicPartition, Long]): SparkException
317
}
318
319
## Custom Metrics
320
321
The connector exposes custom metrics for monitoring:
322
323
- `offsetOutOfRange`: Number of offsets that were out of range
324
- `dataLoss`: Number of data loss events detected
325
326
These metrics integrate with Spark's metrics system and can be monitored through Spark UI and external monitoring systems.