0
# Schema Conversion
1
2
This document covers converting between Avro schemas and Spark SQL schemas using the SchemaConverters utility.
3
4
## Core API
5
6
```scala { .api }
7
import org.apache.spark.sql.avro.SchemaConverters
8
import org.apache.spark.sql.types.{DataType, StructType}
9
import org.apache.avro.Schema
10
11
/**
12
* Wrapper for SQL data type and nullability.
13
*/
14
case class SchemaType(dataType: DataType, nullable: Boolean)
15
16
object SchemaConverters {
17
18
/**
19
* Converts an Avro schema to a corresponding Spark SQL schema.
20
* @param avroSchema the Avro schema to convert
21
* @return SchemaType containing the converted DataType and nullability
22
*/
23
def toSqlType(avroSchema: Schema): SchemaType
24
25
/**
26
* Converts an Avro schema with union type options.
27
* @param avroSchema the Avro schema to convert
28
* @param useStableIdForUnionType whether to use stable identifiers for union types
29
* @return SchemaType containing the converted DataType and nullability
30
*/
31
def toSqlType(avroSchema: Schema, useStableIdForUnionType: Boolean): SchemaType
32
33
/**
34
* Converts an Avro schema with parsing options.
35
* @param avroSchema the Avro schema to convert
36
* @param options conversion options map
37
* @return SchemaType containing the converted DataType and nullability
38
*/
39
def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType
40
41
/**
42
* Converts a Spark SQL schema to a corresponding Avro schema.
43
* @param catalystType the Spark SQL DataType to convert
44
* @param nullable whether the type should be nullable
45
* @param recordName the name for the top-level record (default: "topLevelRecord")
46
* @param nameSpace the namespace for the record (default: "")
47
* @return the corresponding Avro Schema
48
*/
49
def toAvroType(
50
catalystType: DataType,
51
nullable: Boolean = false,
52
recordName: String = "topLevelRecord",
53
nameSpace: String = ""
54
): Schema
55
}
56
```
57
58
## Avro to Spark SQL Conversion
59
60
### Basic Type Conversion
61
62
```scala
63
import org.apache.spark.sql.avro.SchemaConverters
64
import org.apache.avro.Schema
65
66
// Simple primitive type
67
val avroIntSchema = Schema.create(Schema.Type.INT)
68
val sparkType = SchemaConverters.toSqlType(avroIntSchema)
69
println(sparkType.dataType) // IntegerType
70
println(sparkType.nullable) // false
71
72
// String type
73
val avroStringSchema = Schema.create(Schema.Type.STRING)
74
val stringType = SchemaConverters.toSqlType(avroStringSchema)
75
println(stringType.dataType) // StringType
76
```
77
78
### Record Type Conversion
79
80
```scala
81
val avroRecordJson = """
82
{
83
"type": "record",
84
"name": "User",
85
"fields": [
86
{"name": "id", "type": "long"},
87
{"name": "name", "type": "string"},
88
{"name": "email", "type": ["null", "string"], "default": null},
89
{"name": "age", "type": ["null", "int"], "default": null}
90
]
91
}
92
"""
93
94
val avroSchema = new Schema.Parser().parse(avroRecordJson)
95
val schemaType = SchemaConverters.toSqlType(avroSchema)
96
97
schemaType.dataType match {
98
case structType: StructType =>
99
println(structType.treeString)
100
// root
101
// |-- id: long (nullable = false)
102
// |-- name: string (nullable = false)
103
// |-- email: string (nullable = true)
104
// |-- age: integer (nullable = true)
105
}
106
```
107
108
### Array and Map Conversion
109
110
```scala
111
// Array type
112
val avroArrayJson = """
113
{
114
"type": "array",
115
"items": "string"
116
}
117
"""
118
val arraySchema = new Schema.Parser().parse(avroArrayJson)
119
val arrayType = SchemaConverters.toSqlType(arraySchema)
120
println(arrayType.dataType) // ArrayType(StringType, containsNull=false)
121
122
// Map type
123
val avroMapJson = """
124
{
125
"type": "map",
126
"values": "int"
127
}
128
"""
129
val mapSchema = new Schema.Parser().parse(avroMapJson)
130
val mapType = SchemaConverters.toSqlType(mapSchema)
131
println(mapType.dataType) // MapType(StringType, IntegerType, valueContainsNull=false)
132
```
133
134
### Union Type Conversion
135
136
```scala
137
// Standard union handling
138
val unionJson = """
139
{
140
"type": "record",
141
"name": "Event",
142
"fields": [
143
{"name": "id", "type": "long"},
144
{"name": "data", "type": [
145
{"type": "record", "name": "TextEvent", "fields": [{"name": "message", "type": "string"}]},
146
{"type": "record", "name": "NumericEvent", "fields": [{"name": "value", "type": "double"}]}
147
]}
148
]
149
}
150
"""
151
152
val unionSchema = new Schema.Parser().parse(unionJson)
153
154
// Default union conversion (positional field names)
155
val defaultUnion = SchemaConverters.toSqlType(unionSchema)
156
157
// Stable union conversion (type-based field names)
158
val stableUnion = SchemaConverters.toSqlType(unionSchema, useStableIdForUnionType = true)
159
160
// With options
161
val options = Map("enableStableIdentifiersForUnionType" -> "true")
162
val optionsUnion = SchemaConverters.toSqlType(unionSchema, options)
163
```
164
165
## Spark SQL to Avro Conversion
166
167
### Basic Type Conversion
168
169
```scala
170
import org.apache.spark.sql.types._
171
172
// Convert basic types
173
val intSchema = SchemaConverters.toAvroType(IntegerType)
174
println(intSchema.toString) // "int"
175
176
val stringSchema = SchemaConverters.toAvroType(StringType, nullable = true)
177
println(stringSchema.toString) // ["null","string"]
178
```
179
180
### Struct Type Conversion
181
182
```scala
183
val sparkSchema = StructType(Seq(
184
StructField("id", LongType, nullable = false),
185
StructField("name", StringType, nullable = false),
186
StructField("email", StringType, nullable = true),
187
StructField("balance", DecimalType(10, 2), nullable = true)
188
))
189
190
val avroSchema = SchemaConverters.toAvroType(
191
sparkSchema,
192
nullable = false,
193
recordName = "Customer",
194
nameSpace = "com.example"
195
)
196
197
println(avroSchema.toString(true))
198
```
199
200
### Complex Type Conversion
201
202
```scala
203
// Array type
204
val arrayType = ArrayType(StringType, containsNull = true)
205
val avroArray = SchemaConverters.toAvroType(arrayType)
206
207
// Map type
208
val mapType = MapType(StringType, IntegerType, valueContainsNull = false)
209
val avroMap = SchemaConverters.toAvroType(mapType)
210
211
// Nested struct
212
val nestedSchema = StructType(Seq(
213
StructField("orderId", LongType, nullable = false),
214
StructField("customer", StructType(Seq(
215
StructField("id", LongType, nullable = false),
216
StructField("name", StringType, nullable = false)
217
)), nullable = false),
218
StructField("items", ArrayType(StructType(Seq(
219
StructField("productId", LongType, nullable = false),
220
StructField("quantity", IntegerType, nullable = false),
221
StructField("price", DoubleType, nullable = false)
222
))), nullable = false)
223
))
224
225
val nestedAvroSchema = SchemaConverters.toAvroType(
226
nestedSchema,
227
recordName = "Order",
228
nameSpace = "com.example.orders"
229
)
230
```
231
232
## Logical Type Support
233
234
### Date and Timestamp Types
235
236
```scala
237
// Date type conversion
238
val dateType = DateType
239
val avroDateSchema = SchemaConverters.toAvroType(dateType)
240
// Results in int type with date logical type
241
242
// Timestamp type conversion
243
val timestampType = TimestampType
244
val avroTimestampSchema = SchemaConverters.toAvroType(timestampType)
245
// Results in long type with timestamp-micros logical type
246
247
// TimestampNTZ (no timezone) conversion
248
val timestampNTZType = TimestampNTZType
249
val avroLocalTimestampSchema = SchemaConverters.toAvroType(timestampNTZType)
250
// Results in long type with local-timestamp-micros logical type
251
```
252
253
### Decimal Type Conversion
254
255
```scala
256
val decimalType = DecimalType(precision = 10, scale = 2)
257
val avroDecimalSchema = SchemaConverters.toAvroType(decimalType)
258
// Results in fixed type with decimal logical type
259
```
260
261
### Interval Types
262
263
```scala
264
// Year-month interval
265
val yearMonthInterval = YearMonthIntervalType()
266
val avroYMSchema = SchemaConverters.toAvroType(yearMonthInterval)
267
// Results in int type with catalyst type property
268
269
// Day-time interval
270
val dayTimeInterval = DayTimeIntervalType()
271
val avroDTSchema = SchemaConverters.toAvroType(dayTimeInterval)
272
// Results in long type with catalyst type property
273
```
274
275
## Schema Evolution Patterns
276
277
### Adding Optional Fields
278
279
```scala
280
// Original schema
281
val originalSchema = StructType(Seq(
282
StructField("id", LongType, nullable = false),
283
StructField("name", StringType, nullable = false)
284
))
285
286
// Evolved schema with new optional field
287
val evolvedSchema = StructType(Seq(
288
StructField("id", LongType, nullable = false),
289
StructField("name", StringType, nullable = false),
290
StructField("email", StringType, nullable = true) // New optional field
291
))
292
293
val evolvedAvroSchema = SchemaConverters.toAvroType(evolvedSchema)
294
```
295
296
### Handling Schema Compatibility
297
298
```scala
299
def isSchemaCompatible(readerSchema: Schema, writerSchema: Schema): Boolean = {
300
try {
301
// Convert both schemas to Spark SQL format
302
val readerSqlType = SchemaConverters.toSqlType(readerSchema)
303
val writerSqlType = SchemaConverters.toSqlType(writerSchema)
304
305
// Check structural compatibility
306
DataType.equalsIgnoreCompatibleNullability(
307
readerSqlType.dataType,
308
writerSqlType.dataType
309
)
310
} catch {
311
case _: Exception => false
312
}
313
}
314
```
315
316
## Custom Type Properties
317
318
### Preserving Catalyst Type Information
319
320
```scala
321
// Some Spark SQL types are preserved using custom properties
322
val intervalSchema = SchemaConverters.toAvroType(YearMonthIntervalType())
323
324
// The generated Avro schema includes catalyst type property:
325
// {"type": "int", "spark.sql.catalyst.type": "interval year to month"}
326
327
// When converting back to Spark SQL, the property is used to restore the exact type
328
val restoredType = SchemaConverters.toSqlType(intervalSchema)
329
// Results in YearMonthIntervalType instead of IntegerType
330
```
331
332
## Exception Types
333
334
```scala { .api }
335
/**
336
* Exception thrown when schema conversion fails due to incompatibility.
337
*/
338
class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
339
340
/**
341
* Exception thrown when an unsupported Avro type is encountered.
342
*/
343
class UnsupportedAvroTypeException(msg: String) extends Exception(msg)
344
```
345
346
## Error Handling
347
348
### Schema Conversion Errors
349
350
```scala
351
import org.apache.spark.sql.avro.IncompatibleSchemaException
352
353
try {
354
// Invalid recursive schema
355
val recursiveJson = """
356
{
357
"type": "record",
358
"name": "Node",
359
"fields": [
360
{"name": "value", "type": "int"},
361
{"name": "child", "type": "Node"}
362
]
363
}
364
"""
365
val recursiveSchema = new Schema.Parser().parse(recursiveJson)
366
val result = SchemaConverters.toSqlType(recursiveSchema)
367
} catch {
368
case e: IncompatibleSchemaException =>
369
println(s"Schema conversion failed: ${e.getMessage}")
370
}
371
```
372
373
### Unsupported Type Handling
374
375
```scala
376
try {
377
// Attempt to convert unsupported Spark SQL type
378
val unsupportedType = CalendarIntervalType
379
val avroSchema = SchemaConverters.toAvroType(unsupportedType)
380
} catch {
381
case e: IncompatibleSchemaException =>
382
println(s"Unsupported type conversion: ${e.getMessage}")
383
}
384
```
385
386
## Practical Examples
387
388
### Schema Registry Integration
389
390
```scala
391
// Example of using with Confluent Schema Registry
392
def convertSchemaRegistryToSpark(schemaRegistrySchema: String): StructType = {
393
val avroSchema = new Schema.Parser().parse(schemaRegistrySchema)
394
val schemaType = SchemaConverters.toSqlType(avroSchema)
395
schemaType.dataType.asInstanceOf[StructType]
396
}
397
398
def convertSparkToSchemaRegistry(sparkSchema: StructType, name: String): String = {
399
val avroSchema = SchemaConverters.toAvroType(
400
sparkSchema,
401
recordName = name,
402
nameSpace = "com.example"
403
)
404
avroSchema.toString(true) // Pretty-printed JSON
405
}
406
```
407
408
### Dynamic Schema Handling
409
410
```scala
411
def processWithDynamicSchema(df: DataFrame, avroSchemaJson: String): DataFrame = {
412
val avroSchema = new Schema.Parser().parse(avroSchemaJson)
413
val schemaType = SchemaConverters.toSqlType(avroSchema)
414
415
// Verify DataFrame schema compatibility
416
val dfSchema = df.schema
417
if (DataType.equalsIgnoreCompatibleNullability(dfSchema, schemaType.dataType)) {
418
// Schemas are compatible, proceed with processing
419
df.select("*")
420
} else {
421
throw new IllegalArgumentException("DataFrame schema incompatible with Avro schema")
422
}
423
}
424
```