0
# Data Source Registration
1
2
The Kafka connector registers automatically with Spark SQL as the "kafka" data source, implementing multiple interfaces to provide comprehensive streaming and batch capabilities.
3
4
## Capabilities
5
6
### KafkaSourceProvider
7
8
Main entry point class that implements all necessary Spark SQL interfaces for complete Kafka integration.
9
10
```scala { .api }
11
/**
12
* Primary provider class for all Kafka readers and writers
13
* Automatically registers with Spark SQL using the name "kafka"
14
*/
15
class KafkaSourceProvider extends DataSourceRegister
16
with StreamSourceProvider
17
with StreamSinkProvider
18
with RelationProvider
19
with CreatableRelationProvider
20
with SimpleTableProvider {
21
22
/** Returns the short name used to identify this data source */
23
def shortName(): String = "kafka"
24
25
/** Creates streaming source for reading Kafka data */
26
def createSource(sqlContext: SQLContext, metadataPath: String, schema: Option[StructType],
27
providerName: String, parameters: Map[String, String]): Source
28
29
/** Creates streaming sink for writing to Kafka */
30
def createSink(sqlContext: SQLContext, parameters: Map[String, String],
31
partitionColumns: Seq[String], outputMode: OutputMode): Sink
32
33
/** Creates batch relation for reading Kafka data */
34
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
35
36
/** Creates relation for writing DataFrame to Kafka */
37
def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String],
38
data: DataFrame): BaseRelation
39
40
/** Returns V2 DataSource table implementation */
41
def getTable(options: CaseInsensitiveStringMap): KafkaTable
42
}
43
```
44
45
**Usage Examples:**
46
47
The provider is used automatically when specifying "kafka" as the format:
48
49
```scala
50
// Streaming read
51
val stream = spark
52
.readStream
53
.format("kafka") // Uses KafkaSourceProvider automatically
54
.option("kafka.bootstrap.servers", "localhost:9092")
55
.option("subscribe", "my-topic")
56
.load()
57
58
// Batch read
59
val batch = spark
60
.read
61
.format("kafka") // Uses KafkaSourceProvider automatically
62
.option("kafka.bootstrap.servers", "localhost:9092")
63
.option("subscribe", "my-topic")
64
.load()
65
66
// Write
67
dataFrame
68
.write
69
.format("kafka") // Uses KafkaSourceProvider automatically
70
.option("kafka.bootstrap.servers", "localhost:9092")
71
.option("topic", "output-topic")
72
.save()
73
```
74
75
### KafkaTable
76
77
V2 DataSource API table implementation providing modern Spark SQL integration.
78
79
```scala { .api }
80
/**
81
* V2 DataSource table implementation for Kafka
82
* Supports both reading and writing with comprehensive capabilities
83
*/
84
class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite {
85
86
/** Returns the table name for identification */
87
def name(): String = "KafkaTable"
88
89
/** Returns the schema for Kafka records */
90
def schema(): StructType
91
92
/** Returns supported table capabilities */
93
def capabilities(): ju.Set[TableCapability]
94
95
/** Creates scan builder for reading operations */
96
def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
97
98
/** Creates write builder for writing operations */
99
def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder
100
}
101
```
102
103
**Supported Capabilities:**
104
105
```scala { .api }
106
// Table capabilities supported by KafkaTable
107
import org.apache.spark.sql.connector.catalog.TableCapability._
108
109
val supportedCapabilities: ju.Set[TableCapability] = ju.EnumSet.of(
110
BATCH_READ, // Batch reading support
111
BATCH_WRITE, // Batch writing support
112
MICRO_BATCH_READ, // Micro-batch streaming reads
113
CONTINUOUS_READ, // Continuous streaming reads
114
STREAMING_WRITE, // Streaming writes
115
ACCEPT_ANY_SCHEMA // Flexible schema handling
116
)
117
```
118
119
### KafkaScan
120
121
Scan implementation for reading Kafka data in both batch and streaming modes.
122
123
```scala { .api }
124
/**
125
* Scan implementation for reading Kafka data
126
* Handles conversion between different read modes
127
*/
128
class KafkaScan(options: CaseInsensitiveStringMap) extends Scan {
129
130
/** Returns the read schema for Kafka records */
131
def readSchema(): StructType
132
133
/** Converts scan to batch reading mode */
134
def toBatch(): Batch
135
136
/** Converts scan to micro-batch streaming mode */
137
def toMicroBatchStream(checkpointLocation: String): MicroBatchStream
138
139
/** Converts scan to continuous streaming mode */
140
def toContinuousStream(checkpointLocation: String): ContinuousStream
141
142
/** Returns supported custom metrics */
143
def supportedCustomMetrics(): Array[CustomMetric]
144
}
145
```
146
147
## Configuration Validation
148
149
The data source provider performs comprehensive validation of configuration options:
150
151
### Required Options Validation
152
153
```scala
154
// One of these subscription strategies must be specified:
155
// - "subscribe": "topic1,topic2,topic3"
156
// - "subscribePattern": "prefix-.*"
157
// - "assign": """{"topic1":[0,1],"topic2":[0,1]}"""
158
159
// Bootstrap servers must be specified:
160
// - "kafka.bootstrap.servers": "localhost:9092"
161
```
162
163
### Unsupported Kafka Options
164
165
The following Kafka consumer/producer options are not supported and will cause exceptions:
166
167
```scala
168
// Unsupported consumer options
169
"kafka.auto.offset.reset" // Use startingOffsets instead
170
"kafka.key.deserializer" // Fixed to ByteArrayDeserializer
171
"kafka.value.deserializer" // Fixed to ByteArrayDeserializer
172
"kafka.enable.auto.commit" // Managed internally
173
"kafka.interceptor.classes" // Not safe for Spark usage
174
175
// Unsupported producer options
176
"kafka.key.serializer" // Fixed to ByteArraySerializer
177
"kafka.value.serializer" // Fixed to ByteArraySerializer
178
```
179
180
### Stream vs Batch Option Validation
181
182
```scala
183
// Stream-only options (not valid for batch queries)
184
"endingOffsets" // Only for batch
185
"endingOffsetsByTimestamp" // Only for batch
186
187
// Batch-specific restrictions
188
// startingOffsets cannot be "latest" for batch queries
189
// endingOffsets cannot be "earliest" for batch queries
190
```
191
192
## Error Messages
193
194
The provider includes helpful error messages for common configuration issues:
195
196
```scala
197
// Missing subscription strategy
198
"One of the following options must be specified for Kafka source: subscribe, subscribePattern, assign"
199
200
// Custom group ID warning
201
"Kafka option 'kafka.group.id' has been set on this query, it is not recommended to set this option"
202
203
// Invalid offset configuration
204
"starting offset can't be latest for batch queries on Kafka"
205
"ending offset can't be earliest for batch queries on Kafka"
206
```