0
# Schema Conversion
1
2
The Schema Conversion API provides developer-level utilities for converting between Avro schemas and Spark SQL data types. This API is marked as `@DeveloperApi` and is primarily used for advanced schema manipulation and custom data source implementations.
3
4
## Core API
5
6
### SchemaConverters Object
7
8
```scala { .api }
9
object SchemaConverters {
10
def toSqlType(avroSchema: org.apache.avro.Schema): SchemaType
11
def toSqlType(avroSchema: org.apache.avro.Schema, useStableIdForUnionType: Boolean,
12
stableIdPrefixForUnionType: String, recursiveFieldMaxDepth: Int): SchemaType
13
def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): org.apache.avro.Schema
14
}
15
```
16
17
### SchemaType Case Class
18
19
```scala { .api }
20
case class SchemaType(dataType: DataType, nullable: Boolean)
21
```
22
23
Represents the result of converting an Avro schema to a Spark SQL data type, including nullability information.
24
25
## Avro to Spark SQL Conversion
26
27
### Basic Conversion
28
29
Converts an Avro schema to a Spark SQL DataType:
30
31
```scala { .api }
32
def toSqlType(avroSchema: org.apache.avro.Schema): SchemaType
33
```
34
35
**Usage Example:**
36
```scala
37
import org.apache.spark.sql.avro.SchemaConverters
38
import org.apache.avro.Schema
39
40
val avroSchemaJson = """{
41
"type": "record",
42
"name": "User",
43
"fields": [
44
{"name": "id", "type": "long"},
45
{"name": "name", "type": "string"},
46
{"name": "email", "type": ["null", "string"], "default": null}
47
]
48
}"""
49
50
val avroSchema = new Schema.Parser().parse(avroSchemaJson)
51
val sparkSchema = SchemaConverters.toSqlType(avroSchema)
52
53
println(sparkSchema.dataType)
54
// StructType(List(
55
// StructField("id", LongType, false),
56
// StructField("name", StringType, false),
57
// StructField("email", StringType, true)
58
// ))
59
```
60
61
### Advanced Conversion
62
63
Converts with advanced options for union type handling and recursion control:
64
65
```scala { .api }
66
def toSqlType(avroSchema: org.apache.avro.Schema,
67
useStableIdForUnionType: Boolean,
68
stableIdPrefixForUnionType: String,
69
recursiveFieldMaxDepth: Int): SchemaType
70
```
71
72
**Parameters:**
73
- `avroSchema`: The Avro schema to convert
74
- `useStableIdForUnionType`: Use stable identifiers for union types
75
- `stableIdPrefixForUnionType`: Prefix for stable union identifiers
76
- `recursiveFieldMaxDepth`: Maximum depth for recursive field resolution
77
78
**Usage Example:**
79
```scala
80
val complexSchema = SchemaConverters.toSqlType(
81
avroSchema,
82
useStableIdForUnionType = true,
83
stableIdPrefixForUnionType = "union_",
84
recursiveFieldMaxDepth = 10
85
)
86
```
87
88
## Spark SQL to Avro Conversion
89
90
### Type Conversion
91
92
Converts a Spark SQL DataType to an Avro schema:
93
94
```scala { .api }
95
def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): org.apache.avro.Schema
96
```
97
98
**Parameters:**
99
- `catalystType`: Spark SQL DataType to convert
100
- `nullable`: Whether the field can be null
101
- `recordName`: Name for generated record types
102
- `nameSpace`: Namespace for generated schemas
103
104
**Usage Example:**
105
```scala
106
import org.apache.spark.sql.types._
107
108
val sparkSchema = StructType(Seq(
109
StructField("id", LongType, nullable = false),
110
StructField("name", StringType, nullable = false),
111
StructField("score", DoubleType, nullable = true)
112
))
113
114
val avroSchema = SchemaConverters.toAvroType(
115
sparkSchema,
116
nullable = false,
117
recordName = "UserRecord",
118
nameSpace = "com.example"
119
)
120
121
println(avroSchema.toString(true))
122
```
123
124
## Type Mapping
125
126
### Primitive Types
127
128
| Avro Type | Spark SQL Type | Notes |
129
|-----------|----------------|-------|
130
| boolean | BooleanType | Direct mapping |
131
| int | IntegerType | 32-bit signed integer |
132
| long | LongType | 64-bit signed integer |
133
| float | FloatType | 32-bit IEEE 754 |
134
| double | DoubleType | 64-bit IEEE 754 |
135
| bytes | BinaryType | Variable-length byte array |
136
| string | StringType | UTF-8 encoded string |
137
138
### Complex Types
139
140
| Avro Type | Spark SQL Type | Notes |
141
|-----------|----------------|-------|
142
| record | StructType | Named fields with types |
143
| array | ArrayType | Homogeneous element type |
144
| map | MapType | String keys, typed values |
145
| union | StructType or nullable field | Depends on union content |
146
| enum | StringType | Enum values as strings |
147
| fixed | BinaryType | Fixed-length byte array |
148
149
### Logical Types
150
151
| Avro Logical Type | Spark SQL Type | Notes |
152
|-------------------|----------------|-------|
153
| decimal | DecimalType | Precision and scale preserved |
154
| date | DateType | Days since Unix epoch |
155
| time-millis | IntegerType | Milliseconds since midnight |
156
| time-micros | LongType | Microseconds since midnight |
157
| timestamp-millis | TimestampType | Milliseconds since Unix epoch |
158
| timestamp-micros | TimestampType | Microseconds since Unix epoch |
159
| duration | CalendarIntervalType | ISO-8601 duration |
160
161
## Advanced Features
162
163
### Union Type Handling
164
165
Avro unions are handled differently based on their composition:
166
167
```scala
168
// Simple nullable field: ["null", "string"] -> StringType(nullable=true)
169
val nullableString = """{"name": "optional_field", "type": ["null", "string"]}"""
170
171
// Complex union: ["string", "int"] -> StructType with member_0, member_1 fields
172
val complexUnion = """{"name": "mixed_field", "type": ["string", "int"]}"""
173
```
174
175
### Recursive Schema Support
176
177
The API handles recursive schemas with configurable depth limits:
178
179
```scala
180
val recursiveSchema = """{
181
"type": "record",
182
"name": "Node",
183
"fields": [
184
{"name": "value", "type": "string"},
185
{"name": "children", "type": {"type": "array", "items": "Node"}}
186
]
187
}"""
188
189
val converted = SchemaConverters.toSqlType(
190
new Schema.Parser().parse(recursiveSchema),
191
useStableIdForUnionType = false,
192
stableIdPrefixForUnionType = "",
193
recursiveFieldMaxDepth = 5 // Limit recursion to prevent stack overflow
194
)
195
```
196
197
### Schema Evolution Compatibility
198
199
The conversion API supports schema evolution patterns:
200
201
```scala
202
// Reader schema (newer version)
203
val readerSchema = """{
204
"type": "record",
205
"name": "User",
206
"fields": [
207
{"name": "id", "type": "long"},
208
{"name": "name", "type": "string"},
209
{"name": "email", "type": "string", "default": ""}
210
]
211
}"""
212
213
// Writer schema (older version) - missing email field
214
val writerSchema = """{
215
"type": "record",
216
"name": "User",
217
"fields": [
218
{"name": "id", "type": "long"},
219
{"name": "name", "type": "string"}
220
]
221
}"""
222
223
// Both schemas convert to compatible Spark SQL types
224
val readerSparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(readerSchema))
225
val writerSparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(writerSchema))
226
```
227
228
## Error Handling
229
230
The conversion API throws exceptions for unsupported schema patterns:
231
232
- **UnsupportedAvroTypeException**: Thrown for unsupported Avro types
233
- **IncompatibleSchemaException**: Thrown for incompatible schema conversions
234
- **IllegalArgumentException**: Thrown for invalid parameters
235
236
```scala
237
try {
238
val converted = SchemaConverters.toSqlType(complexAvroSchema)
239
} catch {
240
case e: UnsupportedAvroTypeException =>
241
println(s"Unsupported Avro type: ${e.getMessage}")
242
case e: IncompatibleSchemaException =>
243
println(s"Schema incompatibility: ${e.getMessage}")
244
}
245
```