0
# Schema Conversion
1
2
Utilities for converting between Apache Avro schemas and Spark SQL schemas. Supports complex nested types, logical types, and bidirectional conversion with proper nullability handling.
3
4
## Capabilities
5
6
### Avro to Spark SQL Conversion
7
8
Convert Avro schemas to Spark SQL DataType structures with nullability information.
9
10
```scala { .api }
11
object SchemaConverters {
12
/**
13
* Converts an Avro schema to Spark SQL schema type
14
* @param avroSchema the Avro schema to convert
15
* @return SchemaType containing DataType and nullability information
16
*/
17
def toSqlType(avroSchema: Schema): SchemaType
18
}
19
20
case class SchemaType(dataType: DataType, nullable: Boolean)
21
```
22
23
**Usage Examples:**
24
25
```scala
26
import org.apache.avro.Schema
27
import org.apache.spark.sql.avro.SchemaConverters
28
29
// Parse Avro schema from JSON
30
val avroSchemaJson = """{
31
"type": "record",
32
"name": "User",
33
"fields": [
34
{"name": "id", "type": "long"},
35
{"name": "name", "type": "string"},
36
{"name": "email", "type": ["null", "string"], "default": null},
37
{"name": "addresses", "type": {
38
"type": "array",
39
"items": {
40
"type": "record",
41
"name": "Address",
42
"fields": [
43
{"name": "street", "type": "string"},
44
{"name": "city", "type": "string"}
45
]
46
}
47
}}
48
]
49
}"""
50
51
val avroSchema = new Schema.Parser().parse(avroSchemaJson)
52
53
// Convert to Spark SQL schema
54
val schemaType = SchemaConverters.toSqlType(avroSchema)
55
val sparkSqlSchema = schemaType.dataType.asInstanceOf[StructType]
56
57
// Use in DataFrame operations
58
val df = spark.read
59
.schema(sparkSqlSchema)
60
.format("avro")
61
.load("path/to/user_data.avro")
62
63
// Print schema information
64
println(s"Nullable: ${schemaType.nullable}")
65
println(s"Spark Schema: ${sparkSqlSchema.prettyJson}")
66
```
67
68
### Spark SQL to Avro Conversion
69
70
Convert Spark SQL DataType structures to Avro schemas with configurable naming.
71
72
```scala { .api }
73
object SchemaConverters {
74
/**
75
* Converts a Spark SQL DataType to Avro schema
76
* @param catalystType the Spark SQL DataType to convert
77
* @param nullable whether the type should be nullable
78
* @param recordName name for record types (default: "topLevelRecord")
79
* @param nameSpace namespace for record types (default: "")
80
* @return Avro Schema
81
* @throws IncompatibleSchemaException if the DataType cannot be converted
82
*/
83
def toAvroType(
84
catalystType: DataType,
85
nullable: Boolean = false,
86
recordName: String = "topLevelRecord",
87
nameSpace: String = ""
88
): Schema
89
90
/**
91
* Exception thrown when schema conversion fails
92
*/
93
case class IncompatibleSchemaException(msg: String) extends Exception(msg)
94
}
95
```
96
97
**Usage Examples:**
98
99
```scala
100
import org.apache.spark.sql.types._
101
import org.apache.spark.sql.avro.SchemaConverters
102
103
// Create Spark SQL schema
104
val sparkSchema = StructType(Seq(
105
StructField("id", LongType, nullable = false),
106
StructField("name", StringType, nullable = false),
107
StructField("email", StringType, nullable = true),
108
StructField("scores", ArrayType(DoubleType, containsNull = false), nullable = false)
109
))
110
111
// Convert to Avro schema
112
val avroSchema = SchemaConverters.toAvroType(
113
sparkSchema,
114
nullable = false,
115
recordName = "UserProfile",
116
nameSpace = "com.example.data"
117
)
118
119
// Use in write operations
120
val df = spark.table("user_data")
121
df.write
122
.format("avro")
123
.option("avroSchema", avroSchema.toString)
124
.save("path/to/output")
125
126
// Print schema JSON
127
println(avroSchema.toString(true))
128
129
// Convert to Avro schema
130
val avroSchema = SchemaConverters.toAvroType(
131
catalystType = sparkSchema,
132
nullable = false,
133
recordName = "UserRecord",
134
nameSpace = "com.example"
135
)
136
137
// Use in write operations
138
val df = spark.table("users")
139
df.write
140
.format("avro")
141
.option("avroSchema", avroSchema.toString)
142
.save("path/to/output")
143
```
144
145
### Supported Type Mappings
146
147
Comprehensive type conversion support between Avro and Spark SQL types.
148
149
#### Avro to Spark SQL Type Mappings
150
151
```scala { .api }
152
// Primitive types
153
NULL → NullType
154
BOOLEAN → BooleanType
155
INT → IntegerType
156
LONG → LongType
157
FLOAT → FloatType
158
DOUBLE → DoubleType
159
BYTES → BinaryType
160
STRING → StringType
161
ENUM → StringType
162
163
// Logical types
164
INT with date logical type → DateType
165
LONG with timestamp-millis logical type → TimestampType
166
LONG with timestamp-micros logical type → TimestampType
167
BYTES/FIXED with decimal logical type → DecimalType
168
169
// Complex types
170
RECORD → StructType
171
ARRAY → ArrayType
172
MAP → MapType(StringType, valueType)
173
UNION → Special handling (see below)
174
FIXED → BinaryType
175
```
176
177
#### Spark SQL to Avro Type Mappings
178
179
```scala { .api }
180
// Primitive types
181
BooleanType → BOOLEAN
182
ByteType → INT
183
ShortType → INT
184
IntegerType → INT
185
LongType → LONG
186
FloatType → FLOAT
187
DoubleType → DOUBLE
188
StringType → STRING
189
BinaryType → BYTES
190
191
// Special types
192
DateType → INT with date logical type
193
TimestampType → LONG with timestamp-micros logical type
194
DecimalType → FIXED with decimal logical type
195
196
// Complex types
197
StructType → RECORD
198
ArrayType → ARRAY
199
MapType → MAP (only with StringType keys)
200
```
201
202
### UNION Type Handling
203
204
Special handling for Avro UNION types with multiple conversion strategies.
205
206
```scala { .api }
207
// Union type conversion strategies:
208
209
// 1. Nullable unions (union with null)
210
["null", "string"] → StringType with nullable = true
211
212
// 2. Simple type promotion unions
213
["int", "long"] → LongType
214
["float", "double"] → DoubleType
215
216
// 3. Complex unions (converted to struct with member fields)
217
["string", "int", "record"] → StructType with fields:
218
- member0: StringType (nullable = true)
219
- member1: IntegerType (nullable = true)
220
- member2: StructType (nullable = true)
221
```
222
223
**Union Handling Examples:**
224
225
```scala
226
// Nullable field union
227
val nullableUnion = """["null", "string"]"""
228
// Converts to: StringType, nullable = true
229
230
// Type promotion union
231
val promotionUnion = """["int", "long"]"""
232
// Converts to: LongType, nullable = false
233
234
// Complex union
235
val complexUnion = """[
236
"string",
237
{"type": "record", "name": "Address", "fields": [
238
{"name": "street", "type": "string"}
239
]}
240
]"""
241
// Converts to: StructType with member0 (string) and member1 (Address record)
242
```
243
244
### Logical Types Support
245
246
Full support for Avro logical types with proper Spark SQL mapping.
247
248
#### Date and Timestamp Logical Types
249
250
```scala { .api }
251
// Date logical type (days since epoch)
252
{
253
"type": "int",
254
"logicalType": "date"
255
} → DateType
256
257
// Timestamp logical types
258
{
259
"type": "long",
260
"logicalType": "timestamp-millis"
261
} → TimestampType
262
263
{
264
"type": "long",
265
"logicalType": "timestamp-micros"
266
} → TimestampType
267
```
268
269
#### Decimal Logical Type
270
271
```scala { .api }
272
// Decimal as BYTES
273
{
274
"type": "bytes",
275
"logicalType": "decimal",
276
"precision": 10,
277
"scale": 2
278
} → DecimalType(10, 2)
279
280
// Decimal as FIXED
281
{
282
"type": "fixed",
283
"name": "DecimalFixed",
284
"size": 8,
285
"logicalType": "decimal",
286
"precision": 18,
287
"scale": 4
288
} → DecimalType(18, 4)
289
```
290
291
### Schema Evolution Support
292
293
Handling schema evolution and compatibility between different schema versions.
294
295
```scala
296
// Schema evolution example
297
val v1Schema = """{
298
"type": "record",
299
"name": "User",
300
"fields": [
301
{"name": "id", "type": "long"},
302
{"name": "name", "type": "string"}
303
]
304
}"""
305
306
val v2Schema = """{
307
"type": "record",
308
"name": "User",
309
"fields": [
310
{"name": "id", "type": "long"},
311
{"name": "name", "type": "string"},
312
{"name": "email", "type": ["null", "string"], "default": null}
313
]
314
}"""
315
316
// Reading evolved data with original schema
317
val df = spark.read
318
.format("avro")
319
.option("avroSchema", v1Schema) // Use older schema
320
.load("path/to/v2_data.avro") // Reading newer data
321
```
322
323
### Error Handling
324
325
Schema conversion error handling and common issues.
326
327
```scala { .api }
328
class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
329
```
330
331
**Common Error Scenarios:**
332
333
```scala
334
// Unsupported type conversion
335
try {
336
val unsupportedType = MapType(IntegerType, StringType) // Avro MAP requires string keys
337
SchemaConverters.toAvroType(unsupportedType)
338
} catch {
339
case e: IncompatibleSchemaException =>
340
println(s"Conversion failed: ${e.getMessage}")
341
}
342
343
// Recursive schema detection
344
val recursiveSchema = """{
345
"type": "record",
346
"name": "Node",
347
"fields": [
348
{"name": "value", "type": "string"},
349
{"name": "child", "type": "Node"}
350
]
351
}"""
352
353
try {
354
SchemaConverters.toSqlType(new Schema.Parser().parse(recursiveSchema))
355
} catch {
356
case e: IncompatibleSchemaException =>
357
println("Recursive schemas not supported")
358
}
359
```