The Schema Conversion API provides developer-level utilities for converting between Avro schemas and Spark SQL data types. This API is marked as @DeveloperApi and is primarily used for advanced schema manipulation and custom data source implementations.
object SchemaConverters {
def toSqlType(avroSchema: org.apache.avro.Schema): SchemaType
def toSqlType(avroSchema: org.apache.avro.Schema, useStableIdForUnionType: Boolean,
stableIdPrefixForUnionType: String, recursiveFieldMaxDepth: Int): SchemaType
def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): org.apache.avro.Schema
}case class SchemaType(dataType: DataType, nullable: Boolean)Represents the result of converting an Avro schema to a Spark SQL data type, including nullability information.
Converts an Avro schema to a Spark SQL DataType:
def toSqlType(avroSchema: org.apache.avro.Schema): SchemaTypeUsage Example:
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.avro.Schema
val avroSchemaJson = """{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}"""
val avroSchema = new Schema.Parser().parse(avroSchemaJson)
val sparkSchema = SchemaConverters.toSqlType(avroSchema)
println(sparkSchema.dataType)
// StructType(List(
// StructField("id", LongType, false),
// StructField("name", StringType, false),
// StructField("email", StringType, true)
// ))Converts with advanced options for union type handling and recursion control:
def toSqlType(avroSchema: org.apache.avro.Schema,
useStableIdForUnionType: Boolean,
stableIdPrefixForUnionType: String,
recursiveFieldMaxDepth: Int): SchemaTypeParameters:
avroSchema: The Avro schema to convertuseStableIdForUnionType: Use stable identifiers for union typesstableIdPrefixForUnionType: Prefix for stable union identifiersrecursiveFieldMaxDepth: Maximum depth for recursive field resolutionUsage Example:
val complexSchema = SchemaConverters.toSqlType(
avroSchema,
useStableIdForUnionType = true,
stableIdPrefixForUnionType = "union_",
recursiveFieldMaxDepth = 10
)Converts a Spark SQL DataType to an Avro schema:
def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): org.apache.avro.SchemaParameters:
catalystType: Spark SQL DataType to convertnullable: Whether the field can be nullrecordName: Name for generated record typesnameSpace: Namespace for generated schemasUsage Example:
import org.apache.spark.sql.types._
val sparkSchema = StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("score", DoubleType, nullable = true)
))
val avroSchema = SchemaConverters.toAvroType(
sparkSchema,
nullable = false,
recordName = "UserRecord",
nameSpace = "com.example"
)
println(avroSchema.toString(true))| Avro Type | Spark SQL Type | Notes |
|---|---|---|
| boolean | BooleanType | Direct mapping |
| int | IntegerType | 32-bit signed integer |
| long | LongType | 64-bit signed integer |
| float | FloatType | 32-bit IEEE 754 |
| double | DoubleType | 64-bit IEEE 754 |
| bytes | BinaryType | Variable-length byte array |
| string | StringType | UTF-8 encoded string |
| Avro Type | Spark SQL Type | Notes |
|---|---|---|
| record | StructType | Named fields with types |
| array | ArrayType | Homogeneous element type |
| map | MapType | String keys, typed values |
| union | StructType or nullable field | Depends on union content |
| enum | StringType | Enum values as strings |
| fixed | BinaryType | Fixed-length byte array |
| Avro Logical Type | Spark SQL Type | Notes |
|---|---|---|
| decimal | DecimalType | Precision and scale preserved |
| date | DateType | Days since Unix epoch |
| time-millis | IntegerType | Milliseconds since midnight |
| time-micros | LongType | Microseconds since midnight |
| timestamp-millis | TimestampType | Milliseconds since Unix epoch |
| timestamp-micros | TimestampType | Microseconds since Unix epoch |
| duration | CalendarIntervalType | ISO-8601 duration |
Avro unions are handled differently based on their composition:
// Simple nullable field: ["null", "string"] -> StringType(nullable=true)
val nullableString = """{"name": "optional_field", "type": ["null", "string"]}"""
// Complex union: ["string", "int"] -> StructType with member_0, member_1 fields
val complexUnion = """{"name": "mixed_field", "type": ["string", "int"]}"""The API handles recursive schemas with configurable depth limits:
val recursiveSchema = """{
"type": "record",
"name": "Node",
"fields": [
{"name": "value", "type": "string"},
{"name": "children", "type": {"type": "array", "items": "Node"}}
]
}"""
val converted = SchemaConverters.toSqlType(
new Schema.Parser().parse(recursiveSchema),
useStableIdForUnionType = false,
stableIdPrefixForUnionType = "",
recursiveFieldMaxDepth = 5 // Limit recursion to prevent stack overflow
)The conversion API supports schema evolution patterns:
// Reader schema (newer version)
val readerSchema = """{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string", "default": ""}
]
}"""
// Writer schema (older version) - missing email field
val writerSchema = """{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}"""
// Both schemas convert to compatible Spark SQL types
val readerSparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(readerSchema))
val writerSparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(writerSchema))The conversion API throws exceptions for unsupported schema patterns:
try {
val converted = SchemaConverters.toSqlType(complexAvroSchema)
} catch {
case e: UnsupportedAvroTypeException =>
println(s"Unsupported Avro type: ${e.getMessage}")
case e: IncompatibleSchemaException =>
println(s"Schema incompatibility: ${e.getMessage}")
}