0
# Schema Conversion
1
2
The Spark Avro connector provides utilities for converting between Avro schemas and Spark SQL schemas through the `SchemaConverters` object. This enables seamless interoperability between Avro and Spark data type systems, supporting complex nested types and logical type mappings.
3
4
## SchemaConverters Object
5
6
The `SchemaConverters` object provides bidirectional schema conversion capabilities.
7
8
```scala { .api }
9
@DeveloperApi
10
object SchemaConverters {
11
case class SchemaType(dataType: DataType, nullable: Boolean)
12
13
def toSqlType(avroSchema: Schema): SchemaType
14
def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType
15
def toAvroType(
16
catalystType: DataType,
17
nullable: Boolean,
18
recordName: String,
19
nameSpace: String
20
): Schema
21
}
22
```
23
24
## Converting Avro to Spark SQL Schema
25
26
### Basic Conversion
27
28
```scala { .api }
29
def toSqlType(avroSchema: Schema): SchemaType
30
```
31
32
**Parameters:**
33
- `avroSchema`: The Apache Avro schema to convert
34
35
**Returns:** `SchemaType` containing the Spark SQL DataType and nullability
36
37
**Usage Example:**
38
39
```scala
40
import org.apache.spark.sql.avro.SchemaConverters
41
import org.apache.avro.Schema
42
43
val avroSchemaJson = """
44
{
45
"type": "record",
46
"name": "User",
47
"fields": [
48
{"name": "id", "type": "long"},
49
{"name": "name", "type": "string"},
50
{"name": "email", "type": ["null", "string"], "default": null},
51
{"name": "age", "type": "int"},
52
{"name": "active", "type": "boolean"}
53
]
54
}
55
"""
56
57
val avroSchema = new Schema.Parser().parse(avroSchemaJson)
58
val schemaType = SchemaConverters.toSqlType(avroSchema)
59
60
println(s"Spark SQL DataType: ${schemaType.dataType}")
61
println(s"Is Nullable: ${schemaType.nullable}")
62
63
// Result:
64
// Spark SQL DataType: StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(email,StringType,true), StructField(age,IntegerType,false), StructField(active,BooleanType,false))
65
// Is Nullable: false
66
```
67
68
### Conversion with Options
69
70
```scala { .api }
71
def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType
72
```
73
74
**Additional Parameters:**
75
- `options`: Configuration options affecting conversion behavior
76
77
**Supported Options:**
78
- `enableStableIdentifiersForUnionType`: Use stable field names for union types
79
80
**Usage Example:**
81
82
```scala
83
val unionSchema = """
84
{
85
"type": "record",
86
"name": "Event",
87
"fields": [
88
{"name": "data", "type": [
89
{"type": "record", "name": "UserEvent", "fields": [{"name": "userId", "type": "long"}]},
90
{"type": "record", "name": "SystemEvent", "fields": [{"name": "systemId", "type": "string"}]}
91
]}
92
]
93
}
94
"""
95
96
val schema = new Schema.Parser().parse(unionSchema)
97
val options = Map("enableStableIdentifiersForUnionType" -> "true")
98
val schemaType = SchemaConverters.toSqlType(schema, options)
99
```
100
101
## Converting Spark SQL to Avro Schema
102
103
### Basic Conversion
104
105
```scala { .api }
106
def toAvroType(
107
catalystType: DataType,
108
nullable: Boolean,
109
recordName: String,
110
nameSpace: String
111
): Schema
112
```
113
114
**Parameters:**
115
- `catalystType`: Spark SQL DataType to convert
116
- `nullable`: Whether the root type should be nullable
117
- `recordName`: Name for the top-level record
118
- `nameSpace`: Namespace for the Avro schema
119
120
**Returns:** Apache Avro Schema object
121
122
**Usage Example:**
123
124
```scala
125
import org.apache.spark.sql.types._
126
127
val sparkSchema = StructType(Seq(
128
StructField("user_id", LongType, nullable = false),
129
StructField("username", StringType, nullable = false),
130
StructField("email", StringType, nullable = true),
131
StructField("created_at", TimestampType, nullable = false),
132
StructField("preferences", MapType(StringType, StringType), nullable = true)
133
))
134
135
val avroSchema = SchemaConverters.toAvroType(
136
catalystType = sparkSchema,
137
nullable = false,
138
recordName = "UserRecord",
139
nameSpace = "com.example.avro"
140
)
141
142
println(avroSchema.toString(true))
143
```
144
145
### Complex Type Conversion
146
147
Working with nested structures and arrays:
148
149
```scala
150
val complexSchema = StructType(Seq(
151
StructField("order_id", StringType, nullable = false),
152
StructField("customer", StructType(Seq(
153
StructField("id", LongType, nullable = false),
154
StructField("name", StringType, nullable = false),
155
StructField("addresses", ArrayType(StructType(Seq(
156
StructField("street", StringType, nullable = false),
157
StructField("city", StringType, nullable = false),
158
StructField("zip", StringType, nullable = true)
159
)), containsNull = false), nullable = true)
160
)), nullable = false),
161
StructField("items", ArrayType(StructType(Seq(
162
StructField("sku", StringType, nullable = false),
163
StructField("quantity", IntegerType, nullable = false),
164
StructField("price", DecimalType(10, 2), nullable = false)
165
)), containsNull = false), nullable = false)
166
))
167
168
val avroSchema = SchemaConverters.toAvroType(
169
complexSchema,
170
nullable = false,
171
recordName = "Order",
172
nameSpace = "com.ecommerce.orders"
173
)
174
```
175
176
## Type Mapping Reference
177
178
### Primitive Types
179
180
| Spark SQL Type | Avro Type | Notes |
181
|----------------|-----------|-------|
182
| BooleanType | boolean | Direct mapping |
183
| IntegerType | int | Direct mapping |
184
| LongType | long | Direct mapping |
185
| FloatType | float | Direct mapping |
186
| DoubleType | double | Direct mapping |
187
| StringType | string | Direct mapping |
188
| BinaryType | bytes | Direct mapping |
189
190
### Complex Types
191
192
| Spark SQL Type | Avro Type | Notes |
193
|----------------|-----------|-------|
194
| StructType | record | Field names and types preserved |
195
| ArrayType | array | Element type converted recursively |
196
| MapType | map | Key must be string, value type converted |
197
198
### Special Types
199
200
| Spark SQL Type | Avro Type | Notes |
201
|----------------|-----------|-------|
202
| TimestampType | long (timestamp-micros) | Logical type for microsecond precision |
203
| DateType | int (date) | Logical type for date values |
204
| DecimalType | bytes (decimal) | Logical type with precision/scale |
205
206
### Nullable Types
207
208
Spark SQL nullable fields are converted to Avro union types:
209
210
```scala
211
// Spark SQL: StructField("email", StringType, nullable = true)
212
// Avro: {"name": "email", "type": ["null", "string"], "default": null}
213
```
214
215
## Working with Logical Types
216
217
### Decimal Types
218
219
```scala
220
import org.apache.spark.sql.types.DecimalType
221
222
val decimalSchema = StructType(Seq(
223
StructField("price", DecimalType(10, 2), nullable = false),
224
StructField("tax", DecimalType(5, 4), nullable = true)
225
))
226
227
val avroSchema = SchemaConverters.toAvroType(
228
decimalSchema,
229
nullable = false,
230
recordName = "PriceInfo",
231
nameSpace = "com.example"
232
)
233
234
// Results in Avro schema with decimal logical types
235
```
236
237
### Timestamp and Date Types
238
239
```scala
240
val timeSchema = StructType(Seq(
241
StructField("created_at", TimestampType, nullable = false),
242
StructField("event_date", DateType, nullable = false),
243
StructField("updated_at", TimestampType, nullable = true)
244
))
245
246
val avroSchema = SchemaConverters.toAvroType(
247
timeSchema,
248
nullable = false,
249
recordName = "TimeRecord",
250
nameSpace = "com.example"
251
)
252
253
// Results in:
254
// - TimestampType -> long with timestamp-micros logical type
255
// - DateType -> int with date logical type
256
```
257
258
## Schema Validation and Compatibility
259
260
### Checking Data Type Support
261
262
Before conversion, verify that all data types are supported:
263
264
```scala
265
import org.apache.spark.sql.avro.AvroUtils
266
267
def validateSchema(schema: StructType): Boolean = {
268
schema.fields.forall(field => AvroUtils.supportsDataType(field.dataType))
269
}
270
271
val isSupported = validateSchema(yourSparkSchema)
272
if (!isSupported) {
273
throw new IllegalArgumentException("Schema contains unsupported data types")
274
}
275
```
276
277
### Schema Evolution Considerations
278
279
When converting schemas for evolution scenarios:
280
281
```scala
282
// Original schema
283
val v1Schema = StructType(Seq(
284
StructField("id", LongType, nullable = false),
285
StructField("name", StringType, nullable = false)
286
))
287
288
// Evolved schema (backward compatible)
289
val v2Schema = StructType(Seq(
290
StructField("id", LongType, nullable = false),
291
StructField("name", StringType, nullable = false),
292
StructField("email", StringType, nullable = true), // New optional field
293
StructField("created_at", TimestampType, nullable = true) // New optional field
294
))
295
296
val v1Avro = SchemaConverters.toAvroType(v1Schema, false, "User", "com.example")
297
val v2Avro = SchemaConverters.toAvroType(v2Schema, false, "User", "com.example")
298
299
// v2Avro can read data written with v1Avro schema
300
```
301
302
## Error Handling
303
304
### Unsupported Type Conversion
305
306
```scala
307
import org.apache.spark.sql.types.CalendarIntervalType
308
309
val unsupportedSchema = StructType(Seq(
310
StructField("id", LongType, nullable = false),
311
StructField("interval", CalendarIntervalType, nullable = true) // Not supported
312
))
313
314
try {
315
val avroSchema = SchemaConverters.toAvroType(
316
unsupportedSchema,
317
nullable = false,
318
recordName = "Test",
319
nameSpace = "com.example"
320
)
321
} catch {
322
case e: IllegalArgumentException =>
323
println(s"Conversion failed: ${e.getMessage}")
324
}
325
```
326
327
### Schema Parse Errors
328
329
```scala
330
val invalidAvroJson = """{"type": "invalid"}"""
331
332
try {
333
val avroSchema = new Schema.Parser().parse(invalidAvroJson)
334
val sparkType = SchemaConverters.toSqlType(avroSchema)
335
} catch {
336
case e: org.apache.avro.SchemaParseException =>
337
println(s"Invalid Avro schema: ${e.getMessage}")
338
}
339
```
340
341
## Practical Examples
342
343
### DataFrame Schema Conversion
344
345
Converting existing DataFrame schema to Avro for writing:
346
347
```scala
348
val df = spark.table("users")
349
val sparkSchema = df.schema
350
351
val avroSchema = SchemaConverters.toAvroType(
352
sparkSchema,
353
nullable = false,
354
recordName = "UserRecord",
355
nameSpace = "com.company.users"
356
)
357
358
// Use the schema for writing
359
df.write
360
.format("avro")
361
.option("avroSchema", avroSchema.toString)
362
.save("users_with_schema")
363
```
364
365
### Schema Registry Integration
366
367
Using converted schemas with external schema registries:
368
369
```scala
370
def registerSchema(subject: String, sparkSchema: StructType): Int = {
371
val avroSchema = SchemaConverters.toAvroType(
372
sparkSchema,
373
nullable = false,
374
recordName = subject.capitalize + "Record",
375
nameSpace = "com.company.schemas"
376
)
377
378
// Register with schema registry (pseudo-code)
379
schemaRegistry.register(subject, avroSchema)
380
}
381
382
val schemaId = registerSchema("user-events", df.schema)
383
```