0
# Schema Conversion
1
2
The Kafka connector automatically converts Kafka ConsumerRecord objects into Spark DataFrame rows, providing a structured view of Kafka messages with optional header support.
3
4
## Capabilities
5
6
### Kafka Record Schema
7
8
The connector provides a fixed schema for Kafka records that cannot be customized. The schema varies based on whether headers are included.
9
10
### Default Schema (Headers Disabled)
11
12
When `includeHeaders` is false or not specified:
13
14
```scala { .api }
15
val kafkaSchemaWithoutHeaders = StructType(Array(
16
StructField("key", BinaryType, nullable = true),
17
StructField("value", BinaryType, nullable = true),
18
StructField("topic", StringType, nullable = false),
19
StructField("partition", IntegerType, nullable = false),
20
StructField("offset", LongType, nullable = false),
21
StructField("timestamp", TimestampType, nullable = true),
22
StructField("timestampType", IntegerType, nullable = true)
23
))
24
```
25
26
**Field Descriptions:**
27
28
- `key`: Message key as binary data (null if no key)
29
- `value`: Message value as binary data (null if no value)
30
- `topic`: Topic name where message was published
31
- `partition`: Partition number within the topic
32
- `offset`: Offset of the message within the partition
33
- `timestamp`: Message timestamp (producer or broker timestamp)
34
- `timestampType`: Type of timestamp (0 = CreateTime, 1 = LogAppendTime)
35
36
**Usage Examples:**
37
38
```scala
39
val kafkaStream = spark
40
.readStream
41
.format("kafka")
42
.option("kafka.bootstrap.servers", "localhost:9092")
43
.option("subscribe", "my-topic")
44
.load()
45
46
// Access fields directly
47
val processedStream = kafkaStream
48
.select(
49
col("topic"),
50
col("partition"),
51
col("offset"),
52
col("timestamp"),
53
expr("CAST(key AS STRING)").as("key_str"),
54
expr("CAST(value AS STRING)").as("value_str")
55
)
56
```
57
58
### Extended Schema (Headers Enabled)
59
60
When `includeHeaders` is set to "true":
61
62
```scala { .api }
63
val kafkaSchemaWithHeaders = StructType(Array(
64
StructField("key", BinaryType, nullable = true),
65
StructField("value", BinaryType, nullable = true),
66
StructField("topic", StringType, nullable = false),
67
StructField("partition", IntegerType, nullable = false),
68
StructField("offset", LongType, nullable = false),
69
StructField("timestamp", TimestampType, nullable = true),
70
StructField("timestampType", IntegerType, nullable = true),
71
StructField("headers", ArrayType(StructType(Array(
72
StructField("key", StringType, nullable = false),
73
StructField("value", BinaryType, nullable = true)
74
))), nullable = true)
75
))
76
```
77
78
**Headers Field Structure:**
79
80
```scala { .api }
81
val headersType = ArrayType(StructType(Array(
82
StructField("key", StringType, nullable = false), // Header key as string
83
StructField("value", BinaryType, nullable = true) // Header value as binary
84
)))
85
```
86
87
**Configuration:**
88
89
```scala
90
val kafkaStreamWithHeaders = spark
91
.readStream
92
.format("kafka")
93
.option("kafka.bootstrap.servers", "localhost:9092")
94
.option("subscribe", "my-topic")
95
.option("includeHeaders", "true") // Enable headers
96
.load()
97
```
98
99
**Usage Examples:**
100
101
```scala
102
// Process messages with headers
103
val processedWithHeaders = kafkaStreamWithHeaders
104
.select(
105
col("topic"),
106
expr("CAST(key AS STRING)").as("key_str"),
107
expr("CAST(value AS STRING)").as("value_str"),
108
col("headers")
109
)
110
111
// Extract specific header values
112
val withExtractedHeaders = kafkaStreamWithHeaders
113
.select(
114
col("*"),
115
expr("filter(headers, x -> x.key = 'content-type')[0].value").as("content_type_header"),
116
expr("filter(headers, x -> x.key = 'correlation-id')[0].value").as("correlation_id_header")
117
)
118
```
119
120
### KafkaRecordToRowConverter
121
122
Internal converter class that handles the transformation from Kafka ConsumerRecord to Spark rows.
123
124
```scala { .api }
125
/**
126
* Converts Kafka ConsumerRecord to Spark InternalRow/UnsafeRow
127
* Handles both header-enabled and header-disabled modes
128
*/
129
class KafkaRecordToRowConverter(
130
includeHeaders: Boolean
131
) {
132
133
/** Convert to InternalRow without headers */
134
val toInternalRowWithoutHeaders: ConsumerRecord[Array[Byte], Array[Byte]] => InternalRow
135
136
/** Convert to InternalRow with headers */
137
val toInternalRowWithHeaders: ConsumerRecord[Array[Byte], Array[Byte]] => InternalRow
138
139
/** Convert to UnsafeRow without headers */
140
val toUnsafeRowWithoutHeadersProjector: ConsumerRecord[Array[Byte], Array[Byte]] => UnsafeRow
141
142
/** Convert to UnsafeRow with headers */
143
val toUnsafeRowWithHeadersProjector: ConsumerRecord[Array[Byte], Array[Byte]] => UnsafeRow
144
145
/** Generic UnsafeRow projector based on header setting */
146
def toUnsafeRowProjector(includeHeaders: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] => UnsafeRow
147
}
148
```
149
150
**Companion Object:**
151
152
```scala { .api }
153
object KafkaRecordToRowConverter {
154
/** Returns appropriate schema based on header inclusion */
155
def kafkaSchema(includeHeaders: Boolean): StructType
156
157
/** Headers array type definition */
158
val headersType: DataType = ArrayType(StructType(Array(
159
StructField("key", StringType),
160
StructField("value", BinaryType)
161
)))
162
}
163
```
164
165
## Data Type Conversions
166
167
### Binary Data Handling
168
169
Kafka keys and values are always treated as binary data in Spark:
170
171
```scala
172
// Keys and values come as BinaryType - cast to appropriate types
173
val stringData = kafkaStream
174
.select(
175
expr("CAST(key AS STRING)").as("key_string"),
176
expr("CAST(value AS STRING)").as("value_string")
177
)
178
179
// Parse JSON values
180
val jsonData = kafkaStream
181
.select(
182
expr("CAST(value AS STRING)").as("json_string")
183
)
184
.select(
185
from_json(col("json_string"), jsonSchema).as("parsed_data")
186
)
187
```
188
189
### Timestamp Handling
190
191
Kafka timestamps are converted to Spark TimestampType:
192
193
```scala
194
// Work with timestamps
195
val withTimestamps = kafkaStream
196
.select(
197
col("timestamp"),
198
col("timestampType"),
199
date_format(col("timestamp"), "yyyy-MM-dd HH:mm:ss").as("formatted_timestamp"),
200
hour(col("timestamp")).as("hour"),
201
date(col("timestamp")).as("date")
202
)
203
204
// Filter by time ranges
205
val recentMessages = kafkaStream
206
.filter(col("timestamp") > lit("2023-01-01 00:00:00").cast(TimestampType))
207
```
208
209
### Header Processing
210
211
Process Kafka headers as structured data:
212
213
```scala
214
// Work with headers (when includeHeaders = "true")
215
val withHeaderProcessing = kafkaStreamWithHeaders
216
.select(
217
col("*"),
218
size(col("headers")).as("header_count"),
219
220
// Extract all header keys
221
expr("transform(headers, x -> x.key)").as("header_keys"),
222
223
// Find specific header by key
224
expr("filter(headers, x -> x.key = 'content-type')").as("content_type_headers"),
225
226
// Extract header value as string
227
expr("CAST(filter(headers, x -> x.key = 'user-id')[0].value AS STRING)").as("user_id")
228
)
229
```
230
231
## Schema Validation
232
233
The connector enforces a fixed schema and will reject custom schemas:
234
235
```scala
236
// This will fail - custom schemas are not supported
237
spark
238
.readStream
239
.format("kafka")
240
.schema(customSchema) // Will throw exception
241
.option("kafka.bootstrap.servers", "localhost:9092")
242
.option("subscribe", "my-topic")
243
.load()
244
// Exception: "Kafka source has a fixed schema and cannot be set with a custom one"
245
```
246
247
## Performance Considerations
248
249
### Projection Optimization
250
251
The converter supports column projection for better performance:
252
253
```scala
254
// Only select needed columns to reduce processing overhead
255
val optimized = kafkaStream
256
.select("value", "timestamp", "topic") // Only process these columns
257
.filter(col("timestamp") > recentTimestamp)
258
```
259
260
### Binary vs String Conversion
261
262
```scala
263
// Efficient - work with binary data when possible
264
val binaryProcessing = kafkaStream
265
.select(col("value"))
266
.filter(length(col("value")) > 100)
267
268
// Less efficient - casting to string for every row
269
val stringProcessing = kafkaStream
270
.select(expr("CAST(value AS STRING)").as("value_str"))
271
.filter(length(col("value_str")) > 100)
272
```
273
274
### Header Processing Performance
275
276
```scala
277
// Efficient - only include headers when needed
278
val withoutHeaders = spark
279
.readStream
280
.format("kafka")
281
.option("includeHeaders", "false") // Default, more efficient
282
.load()
283
284
// Less efficient - headers require additional processing
285
val withHeaders = spark
286
.readStream
287
.format("kafka")
288
.option("includeHeaders", "true") // Only when headers are needed
289
.load()
290
```
291
292
## Common Patterns
293
294
### JSON Message Processing
295
296
```scala
297
import org.apache.spark.sql.types._
298
299
// Define JSON schema
300
val messageSchema = StructType(Array(
301
StructField("user_id", StringType),
302
StructField("event_type", StringType),
303
StructField("timestamp", LongType),
304
StructField("properties", MapType(StringType, StringType))
305
))
306
307
// Parse JSON messages
308
val parsedMessages = kafkaStream
309
.select(
310
col("topic"),
311
col("partition"),
312
col("offset"),
313
col("timestamp").as("kafka_timestamp"),
314
from_json(expr("CAST(value AS STRING)"), messageSchema).as("message")
315
)
316
.select(
317
col("topic"),
318
col("partition"),
319
col("offset"),
320
col("kafka_timestamp"),
321
col("message.*")
322
)
323
```
324
325
### Avro Message Processing
326
327
```scala
328
// For Avro messages, use external libraries like spark-avro
329
val avroMessages = kafkaStream
330
.select(
331
col("topic"),
332
from_avro(col("value"), avroSchemaString).as("avro_data")
333
)
334
.select(
335
col("topic"),
336
col("avro_data.*")
337
)
338
```
339
340
### Message Routing by Topic
341
342
```scala
343
// Process different topics differently based on schema
344
val processedMessages = kafkaStream
345
.withColumn("processed_value",
346
when(col("topic") === "user-events",
347
from_json(expr("CAST(value AS STRING)"), userEventSchema))
348
.when(col("topic") === "system-logs",
349
from_json(expr("CAST(value AS STRING)"), logSchema))
350
.otherwise(lit(null))
351
)
352
```