0
# Data Type Support
1
2
This document covers the comprehensive data type mappings between Avro and Spark SQL, including support for complex nested structures, logical types, and custom types.
3
4
## Type Mapping Overview
5
6
### Primitive Types
7
8
| Avro Type | Spark SQL Type | Notes |
9
|-----------|----------------|-------|
10
| `null` | `NullType` | Represents null values |
11
| `boolean` | `BooleanType` | Boolean values |
12
| `int` | `IntegerType` | 32-bit signed integer |
13
| `long` | `LongType` | 64-bit signed integer |
14
| `float` | `FloatType` | Single precision floating point |
15
| `double` | `DoubleType` | Double precision floating point |
16
| `bytes` | `BinaryType` | Variable-length byte array |
17
| `string` | `StringType` | UTF-8 string |
18
19
### Complex Types
20
21
| Avro Type | Spark SQL Type | Notes |
22
|-----------|----------------|-------|
23
| `record` | `StructType` | Structured record with named fields |
24
| `enum` | `StringType` | Enumeration converted to string |
25
| `array` | `ArrayType` | Variable-length array |
26
| `map` | `MapType` | Key-value mapping (keys must be strings) |
27
| `union` | `StructType` or nullable type | Depends on union composition |
28
| `fixed` | `BinaryType` | Fixed-length byte array |
29
30
## Primitive Type Examples
31
32
### Basic Type Conversion
33
34
```scala
35
import org.apache.spark.sql.avro.SchemaConverters
36
import org.apache.avro.Schema
37
38
// Boolean type
39
val booleanSchema = Schema.create(Schema.Type.BOOLEAN)
40
val sparkBoolean = SchemaConverters.toSqlType(booleanSchema)
41
// Result: BooleanType, nullable = false
42
43
// Integer types
44
val intSchema = Schema.create(Schema.Type.INT)
45
val longSchema = Schema.create(Schema.Type.LONG)
46
val sparkInt = SchemaConverters.toSqlType(intSchema) // IntegerType
47
val sparkLong = SchemaConverters.toSqlType(longSchema) // LongType
48
49
// Floating point types
50
val floatSchema = Schema.create(Schema.Type.FLOAT)
51
val doubleSchema = Schema.create(Schema.Type.DOUBLE)
52
val sparkFloat = SchemaConverters.toSqlType(floatSchema) // FloatType
53
val sparkDouble = SchemaConverters.toSqlType(doubleSchema) // DoubleType
54
55
// String and binary types
56
val stringSchema = Schema.create(Schema.Type.STRING)
57
val bytesSchema = Schema.create(Schema.Type.BYTES)
58
val sparkString = SchemaConverters.toSqlType(stringSchema) // StringType
59
val sparkBytes = SchemaConverters.toSqlType(bytesSchema) // BinaryType
60
```
61
62
### Reverse Conversion (Spark to Avro)
63
64
```scala
65
import org.apache.spark.sql.types._
66
67
// Convert Spark types to Avro
68
val avroInt = SchemaConverters.toAvroType(IntegerType) // int
69
val avroLong = SchemaConverters.toAvroType(LongType) // long
70
val avroString = SchemaConverters.toAvroType(StringType) // string
71
val avroBinary = SchemaConverters.toAvroType(BinaryType) // bytes
72
73
// Nullable types become unions with null
74
val avroNullableString = SchemaConverters.toAvroType(StringType, nullable = true)
75
// Result: ["null", "string"]
76
```
77
78
## Logical Types
79
80
### Date and Timestamp Types
81
82
```scala { .api }
83
// Spark SQL types with Avro logical type equivalents
84
DateType <-> int (date logical type)
85
TimestampType <-> long (timestamp-micros logical type)
86
TimestampNTZType <-> long (local-timestamp-micros logical type)
87
```
88
89
**Examples:**
90
```scala
91
// Date type conversion
92
val avroDateJson = """
93
{
94
"type": "int",
95
"logicalType": "date"
96
}
97
"""
98
val dateSchema = new Schema.Parser().parse(avroDateJson)
99
val sparkDate = SchemaConverters.toSqlType(dateSchema)
100
// Result: DateType, nullable = false
101
102
// Timestamp type conversion
103
val avroTimestampJson = """
104
{
105
"type": "long",
106
"logicalType": "timestamp-micros"
107
}
108
"""
109
val timestampSchema = new Schema.Parser().parse(avroTimestampJson)
110
val sparkTimestamp = SchemaConverters.toSqlType(timestampSchema)
111
// Result: TimestampType, nullable = false
112
113
// Local timestamp (no timezone)
114
val avroLocalTimestampJson = """
115
{
116
"type": "long",
117
"logicalType": "local-timestamp-micros"
118
}
119
"""
120
val localTimestampSchema = new Schema.Parser().parse(avroLocalTimestampJson)
121
val sparkLocalTimestamp = SchemaConverters.toSqlType(localTimestampSchema)
122
// Result: TimestampNTZType, nullable = false
123
```
124
125
### Decimal Types
126
127
```scala { .api }
128
DecimalType(precision, scale) <-> bytes/fixed (decimal logical type)
129
```
130
131
**Examples:**
132
```scala
133
// Avro decimal using bytes
134
val avroDecimalBytesJson = """
135
{
136
"type": "bytes",
137
"logicalType": "decimal",
138
"precision": 10,
139
"scale": 2
140
}
141
"""
142
val decimalBytesSchema = new Schema.Parser().parse(avroDecimalBytesJson)
143
val sparkDecimal = SchemaConverters.toSqlType(decimalBytesSchema)
144
// Result: DecimalType(10, 2), nullable = false
145
146
// Avro decimal using fixed
147
val avroDecimalFixedJson = """
148
{
149
"type": "fixed",
150
"name": "DecimalFixed",
151
"size": 8,
152
"logicalType": "decimal",
153
"precision": 18,
154
"scale": 4
155
}
156
"""
157
val decimalFixedSchema = new Schema.Parser().parse(avroDecimalFixedJson)
158
val sparkDecimalFixed = SchemaConverters.toSqlType(decimalFixedSchema)
159
// Result: DecimalType(18, 4), nullable = false
160
161
// Convert Spark decimal to Avro
162
val sparkDecimalType = DecimalType(precision = 12, scale = 3)
163
val avroDecimalSchema = SchemaConverters.toAvroType(sparkDecimalType)
164
// Result: fixed type with decimal logical type
165
```
166
167
## Complex Types
168
169
### Record Types (Structs)
170
171
```scala
172
val avroRecordJson = """
173
{
174
"type": "record",
175
"name": "Person",
176
"namespace": "com.example",
177
"fields": [
178
{"name": "id", "type": "long"},
179
{"name": "firstName", "type": "string"},
180
{"name": "lastName", "type": "string"},
181
{"name": "age", "type": ["null", "int"], "default": null},
182
{"name": "email", "type": ["null", "string"], "default": null}
183
]
184
}
185
"""
186
187
val recordSchema = new Schema.Parser().parse(avroRecordJson)
188
val sparkStruct = SchemaConverters.toSqlType(recordSchema)
189
190
sparkStruct.dataType match {
191
case StructType(fields) =>
192
fields.foreach { field =>
193
println(s"${field.name}: ${field.dataType}, nullable = ${field.nullable}")
194
}
195
// Output:
196
// id: LongType, nullable = false
197
// firstName: StringType, nullable = false
198
// lastName: StringType, nullable = false
199
// age: IntegerType, nullable = true
200
// email: StringType, nullable = true
201
}
202
```
203
204
### Array Types
205
206
```scala
207
// Simple array
208
val avroArrayJson = """
209
{
210
"type": "array",
211
"items": "string"
212
}
213
"""
214
val arraySchema = new Schema.Parser().parse(avroArrayJson)
215
val sparkArray = SchemaConverters.toSqlType(arraySchema)
216
// Result: ArrayType(StringType, containsNull = false)
217
218
// Array of nullable items
219
val avroNullableArrayJson = """
220
{
221
"type": "array",
222
"items": ["null", "string"]
223
}
224
"""
225
val nullableArraySchema = new Schema.Parser().parse(avroNullableArrayJson)
226
val sparkNullableArray = SchemaConverters.toSqlType(nullableArraySchema)
227
// Result: ArrayType(StringType, containsNull = true)
228
229
// Array of records
230
val avroRecordArrayJson = """
231
{
232
"type": "array",
233
"items": {
234
"type": "record",
235
"name": "Item",
236
"fields": [
237
{"name": "id", "type": "int"},
238
{"name": "name", "type": "string"}
239
]
240
}
241
}
242
"""
243
val recordArraySchema = new Schema.Parser().parse(avroRecordArrayJson)
244
val sparkRecordArray = SchemaConverters.toSqlType(recordArraySchema)
245
// Result: ArrayType(StructType(...), containsNull = false)
246
```
247
248
### Map Types
249
250
```scala
251
// Simple map (string keys to string values)
252
val avroMapJson = """
253
{
254
"type": "map",
255
"values": "string"
256
}
257
"""
258
val mapSchema = new Schema.Parser().parse(avroMapJson)
259
val sparkMap = SchemaConverters.toSqlType(mapSchema)
260
// Result: MapType(StringType, StringType, valueContainsNull = false)
261
262
// Map with nullable values
263
val avroNullableMapJson = """
264
{
265
"type": "map",
266
"values": ["null", "int"]
267
}
268
"""
269
val nullableMapSchema = new Schema.Parser().parse(avroNullableMapJson)
270
val sparkNullableMap = SchemaConverters.toSqlType(nullableMapSchema)
271
// Result: MapType(StringType, IntegerType, valueContainsNull = true)
272
273
// Map with complex values
274
val avroComplexMapJson = """
275
{
276
"type": "map",
277
"values": {
278
"type": "record",
279
"name": "Value",
280
"fields": [
281
{"name": "count", "type": "int"},
282
{"name": "total", "type": "double"}
283
]
284
}
285
}
286
"""
287
val complexMapSchema = new Schema.Parser().parse(avroComplexMapJson)
288
val sparkComplexMap = SchemaConverters.toSqlType(complexMapSchema)
289
// Result: MapType(StringType, StructType(...), valueContainsNull = false)
290
```
291
292
## Union Types
293
294
### Nullable Unions
295
296
```scala
297
// Simple nullable union
298
val nullableUnionJson = """
299
{
300
"type": "record",
301
"name": "Record",
302
"fields": [
303
{"name": "id", "type": "long"},
304
{"name": "optionalValue", "type": ["null", "string"]}
305
]
306
}
307
"""
308
val nullableUnionSchema = new Schema.Parser().parse(nullableUnionJson)
309
val sparkNullableUnion = SchemaConverters.toSqlType(nullableUnionSchema)
310
// Field "optionalValue" becomes StringType with nullable = true
311
```
312
313
### Complex Unions
314
315
```scala
316
// Multi-type union (converted to struct)
317
val complexUnionJson = """
318
{
319
"type": "record",
320
"name": "Event",
321
"fields": [
322
{"name": "id", "type": "long"},
323
{"name": "data", "type": [
324
{
325
"type": "record",
326
"name": "TextEvent",
327
"fields": [{"name": "message", "type": "string"}]
328
},
329
{
330
"type": "record",
331
"name": "NumericEvent",
332
"fields": [{"name": "value", "type": "double"}]
333
}
334
]}
335
]
336
}
337
"""
338
339
val complexUnionSchema = new Schema.Parser().parse(complexUnionJson)
340
341
// Default conversion (positional field names)
342
val defaultUnionStruct = SchemaConverters.toSqlType(complexUnionSchema)
343
// Field "data" becomes StructType with fields: member0, member1
344
345
// Stable identifier conversion
346
val stableUnionStruct = SchemaConverters.toSqlType(
347
complexUnionSchema,
348
useStableIdForUnionType = true
349
)
350
// Field "data" becomes StructType with fields: member_textevent, member_numericevent
351
```
352
353
### Union Type Compatibility
354
355
```scala
356
// Compatible primitive unions
357
val primitiveUnionJson = """
358
{
359
"type": ["int", "long"]
360
}
361
"""
362
val primitiveUnionSchema = new Schema.Parser().parse(primitiveUnionJson)
363
val sparkPrimitiveUnion = SchemaConverters.toSqlType(primitiveUnionSchema)
364
// Result: LongType (widened to accommodate both int and long)
365
366
val floatUnionJson = """
367
{
368
"type": ["float", "double"]
369
}
370
"""
371
val floatUnionSchema = new Schema.Parser().parse(floatUnionJson)
372
val sparkFloatUnion = SchemaConverters.toSqlType(floatUnionSchema)
373
// Result: DoubleType (widened to accommodate both float and double)
374
```
375
376
## Special Interval Types
377
378
### Interval Type Support
379
380
```scala { .api }
381
YearMonthIntervalType <-> int (with catalyst type property)
382
DayTimeIntervalType <-> long (with catalyst type property)
383
```
384
385
**Examples:**
386
```scala
387
import org.apache.spark.sql.types._
388
389
// Convert interval types to Avro (preserves type information)
390
val yearMonthInterval = YearMonthIntervalType()
391
val avroYearMonth = SchemaConverters.toAvroType(yearMonthInterval)
392
// Result: {"type": "int", "spark.sql.catalyst.type": "interval year to month"}
393
394
val dayTimeInterval = DayTimeIntervalType()
395
val avroDayTime = SchemaConverters.toAvroType(dayTimeInterval)
396
// Result: {"type": "long", "spark.sql.catalyst.type": "interval day to second"}
397
398
// When converting back, the catalyst type property is used
399
val restoredYearMonth = SchemaConverters.toSqlType(avroYearMonth)
400
// Result: YearMonthIntervalType (not IntegerType)
401
```
402
403
## Custom Type Handling
404
405
### User-Defined Types
406
407
```scala
408
// Custom UDT support through SQL types
409
import org.apache.spark.sql.types._
410
411
class CustomPointUDT extends UserDefinedType[CustomPoint] {
412
override def sqlType: DataType = StructType(Seq(
413
StructField("x", DoubleType, nullable = false),
414
StructField("y", DoubleType, nullable = false)
415
))
416
417
// Implementation details...
418
}
419
420
// UDT is converted based on its sqlType
421
val customUDT = new CustomPointUDT()
422
val avroCustomType = SchemaConverters.toAvroType(customUDT)
423
// Result: record type with x, y fields
424
```
425
426
### Enum Type Handling
427
428
```scala
429
val avroEnumJson = """
430
{
431
"type": "enum",
432
"name": "Color",
433
"symbols": ["RED", "GREEN", "BLUE"]
434
}
435
"""
436
val enumSchema = new Schema.Parser().parse(avroEnumJson)
437
val sparkEnum = SchemaConverters.toSqlType(enumSchema)
438
// Result: StringType (enum values become strings)
439
```
440
441
## Data Type Validation
442
443
### Supported Type Checking
444
445
```scala
446
import org.apache.spark.sql.avro.AvroUtils
447
448
// Check if a Spark SQL type is supported for Avro conversion
449
val supportedTypes = List(
450
IntegerType,
451
StringType,
452
StructType(Seq(StructField("id", LongType))),
453
ArrayType(StringType),
454
MapType(StringType, IntegerType)
455
)
456
457
supportedTypes.foreach { dataType =>
458
val isSupported = AvroUtils.supportsDataType(dataType)
459
println(s"$dataType supported: $isSupported")
460
}
461
462
// Unsupported types
463
val unsupportedTypes = List(
464
CalendarIntervalType, // Not supported
465
ObjectType(classOf[String]) // Not supported
466
)
467
```
468
469
### Schema Compatibility Checking
470
471
```scala
472
def checkTypeCompatibility(
473
writerType: DataType,
474
readerType: DataType
475
): Boolean = {
476
DataType.equalsIgnoreCompatibleNullability(writerType, readerType)
477
}
478
479
// Example compatibility checks
480
val writerSchema = StructType(Seq(
481
StructField("id", LongType, nullable = false),
482
StructField("name", StringType, nullable = false)
483
))
484
485
val readerSchema = StructType(Seq(
486
StructField("id", LongType, nullable = true), // Compatible (nullable widening)
487
StructField("name", StringType, nullable = false),
488
StructField("email", StringType, nullable = true) // New optional field
489
))
490
491
val compatible = checkTypeCompatibility(writerSchema, readerSchema)
492
```
493
494
## Performance Considerations
495
496
### Type Conversion Performance
497
498
```scala
499
// Efficient type patterns for high-performance scenarios
500
501
// 1. Prefer primitive types over complex unions
502
val efficientSchema = StructType(Seq(
503
StructField("id", LongType),
504
StructField("value", StringType)
505
))
506
507
// 2. Use appropriate precision for decimals
508
val efficientDecimal = DecimalType(precision = 18, scale = 2) // Common precision
509
val inefficientDecimal = DecimalType(precision = 38, scale = 18) // Maximum precision
510
511
// 3. Consider array element nullability
512
val efficientArray = ArrayType(StringType, containsNull = false) // Faster
513
val nullableArray = ArrayType(StringType, containsNull = true) // Slower due to null checks
514
```
515
516
### Memory Usage Optimization
517
518
```scala
519
// Optimize for memory usage with appropriate types
520
val memoryEfficientSchema = StructType(Seq(
521
StructField("id", IntegerType), // Use int instead of long if range permits
522
StructField("flag", BooleanType), // Boolean more efficient than string
523
StructField("score", FloatType), // Float instead of double if precision permits
524
StructField("data", BinaryType) // Binary for large opaque data
525
))
526
```