0
# Apache Spark Kafka Connector
1
2
The Apache Spark Kafka Connector (spark-sql-kafka-0-10_2.12) provides seamless integration between Apache Kafka and Apache Spark's Structured Streaming and SQL APIs. It enables both batch and streaming data processing from Kafka topics with exactly-once processing semantics, offset management, and fault tolerance.
3
4
## Package Information
5
6
- **Package Name**: spark-sql-kafka-0-10_2.12
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Installation**: `spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.6`
10
- **Dependencies**: Requires Apache Spark 3.5.6 and Kafka client libraries
11
12
## Core Usage Pattern
13
14
The connector is accessed through Spark SQL's DataSource API using the "kafka" format identifier:
15
16
```scala
17
// Reading from Kafka (streaming)
18
val df = spark.readStream
19
.format("kafka")
20
.option("kafka.bootstrap.servers", "localhost:9092")
21
.option("subscribe", "topic1,topic2")
22
.load()
23
24
// Writing to Kafka (streaming)
25
df.writeStream
26
.format("kafka")
27
.option("kafka.bootstrap.servers", "localhost:9092")
28
.option("topic", "output-topic")
29
.outputMode("append")
30
.start()
31
```
32
33
## Basic Usage
34
35
```scala
36
import org.apache.spark.sql.SparkSession
37
import org.apache.spark.sql.functions._
38
39
val spark = SparkSession.builder()
40
.appName("KafkaExample")
41
.getOrCreate()
42
43
// Stream from Kafka
44
val kafkaDF = spark.readStream
45
.format("kafka")
46
.option("kafka.bootstrap.servers", "localhost:9092")
47
.option("subscribe", "input-topic")
48
.option("startingOffsets", "earliest")
49
.load()
50
51
// Extract and process the value
52
val processedDF = kafkaDF
53
.select(col("value").cast("string").as("message"))
54
.filter(col("message").isNotNull)
55
56
// Write back to Kafka
57
val query = processedDF
58
.select(to_json(struct("*")).as("value"))
59
.writeStream
60
.format("kafka")
61
.option("kafka.bootstrap.servers", "localhost:9092")
62
.option("topic", "output-topic")
63
.outputMode("append")
64
.start()
65
```
66
67
## Architecture
68
69
The Spark Kafka connector is built around several key components:
70
71
- **KafkaSourceProvider**: Main entry point that registers the "kafka" format with Spark SQL
72
- **Fixed Schema**: Standardized schema for Kafka records (key, value, topic, partition, offset, timestamp, etc.)
73
- **Offset Management**: Comprehensive offset tracking and recovery for exactly-once processing
74
- **Consumer Strategies**: Flexible topic subscription patterns (subscribe, subscribePattern, assign)
75
- **Producer Integration**: Seamless writing back to Kafka topics with proper serialization
76
77
## Capabilities
78
79
### Streaming Data Reading
80
81
Read data from Kafka topics in real-time using Spark Structured Streaming with micro-batch or continuous processing modes.
82
83
```scala { .api }
84
// Streaming read operation
85
spark.readStream
86
.format("kafka")
87
.option("kafka.bootstrap.servers", servers: String)
88
.option("subscribe", topics: String) // or subscribepattern or assign
89
.option("startingOffsets", offsets: String) // "earliest", "latest", or JSON
90
.load(): DataFrame
91
```
92
93
[Streaming Operations](./streaming.md)
94
95
### Batch Data Reading
96
97
Read historical data from Kafka topics for batch processing and analysis.
98
99
```scala { .api }
100
// Batch read operation
101
spark.read
102
.format("kafka")
103
.option("kafka.bootstrap.servers", servers: String)
104
.option("subscribe", topics: String) // or subscribepattern or assign
105
.option("startingOffsets", startOffsets: String)
106
.option("endingOffsets", endOffsets: String)
107
.load(): DataFrame
108
```
109
110
[Batch Operations](./batch.md)
111
112
### Data Writing
113
114
Write DataFrame data to Kafka topics with proper serialization and partitioning.
115
116
```scala { .api }
117
// Streaming write operation
118
df.writeStream
119
.format("kafka")
120
.option("kafka.bootstrap.servers", servers: String)
121
.option("topic", topicName: String) // optional if specified in data
122
.outputMode("append")
123
.start(): StreamingQuery
124
125
// Batch write operation
126
df.write
127
.format("kafka")
128
.option("kafka.bootstrap.servers", servers: String)
129
.option("topic", topicName: String)
130
.save()
131
```
132
133
[Write Operations](./writing.md)
134
135
### Configuration Management
136
137
Comprehensive configuration options for connection, performance tuning, and reliability.
138
139
```scala { .api }
140
// Core configuration options
141
.option("kafka.bootstrap.servers", servers: String) // Required
142
.option("subscribe", topics: String) // Topic selection
143
.option("maxOffsetsPerTrigger", maxRecords: String) // Performance tuning
144
.option("failOnDataLoss", failOnLoss: String) // Reliability
145
```
146
147
[Configuration Options](./configuration.md)
148
149
## Data Schema
150
151
### Read Schema (Fixed)
152
153
All Kafka DataFrames have the following fixed schema:
154
155
```scala { .api }
156
// Fixed Kafka record schema
157
case class KafkaRecord(
158
key: Array[Byte], // Message key as byte array (nullable)
159
value: Array[Byte], // Message value as byte array
160
topic: String, // Topic name
161
partition: Int, // Partition number
162
offset: Long, // Message offset within partition
163
timestamp: java.sql.Timestamp, // Message timestamp
164
timestampType: Int, // 0=CreateTime, 1=LogAppendTime
165
headers: Array[KafkaHeader] // Optional headers (when includeHeaders=true)
166
)
167
168
case class KafkaHeader(
169
key: String,
170
value: Array[Byte]
171
)
172
```
173
174
### Write Schema (Flexible)
175
176
For writing, DataFrames can contain any combination of these fields:
177
178
```scala { .api }
179
// Write schema fields (all optional except value)
180
case class KafkaWriteRecord(
181
topic: String, // Target topic (optional if set in options)
182
key: Any, // Message key (will be serialized)
183
value: Any, // Message value (required, will be serialized)
184
partition: Int, // Specific partition (optional)
185
headers: Map[String, Array[Byte]] // Message headers (optional)
186
)
187
```
188
189
## Topic Selection Strategies
190
191
```scala { .api }
192
// Subscribe to specific topics by name
193
.option("subscribe", "topic1,topic2,topic3")
194
195
// Subscribe to topics matching a regex pattern
196
.option("subscribepattern", "events-.*")
197
198
// Assign specific partitions
199
.option("assign", """{"topic1":[0,1],"topic2":[0]}""")
200
```
201
202
## Offset Management Types
203
204
```scala { .api }
205
// Offset specification options
206
"earliest" // Start from earliest available offsets
207
"latest" // Start from latest available offsets
208
209
// Specific offsets per partition (JSON format)
210
"""{"topic1":{"0":23,"1":345},"topic2":{"0":0}}"""
211
212
// Global timestamp (milliseconds since epoch)
213
.option("startingTimestamp", "1609459200000")
214
215
// Per-partition timestamps (JSON format)
216
"""{"topic1":{"0":1609459200000,"1":1609459300000}}"""
217
```
218
219
## Error Handling
220
221
The connector provides robust error handling for common scenarios:
222
223
- **Data Loss Detection**: Configurable behavior when data is no longer available
224
- **Offset Validation**: Automatic validation of offset ranges and availability
225
- **Connection Failures**: Retry logic and graceful degradation
226
- **Schema Validation**: Input validation for write operations
227
- **Configuration Errors**: Clear error messages for invalid options
228
229
## Performance Considerations
230
231
- **Consumer Pooling**: Efficient reuse of Kafka consumers across tasks
232
- **Producer Caching**: Connection pooling for Kafka producers
233
- **Batch Size Control**: Configurable limits on records per micro-batch
234
- **Parallel Processing**: Automatic parallelization based on Kafka partitions
235
- **Memory Management**: Optimized handling of large message batches