Utilities for converting between Apache Avro schemas and Spark SQL schemas. Supports complex nested types, logical types, and bidirectional conversion with proper nullability handling.
Convert Avro schemas to Spark SQL DataType structures with nullability information.
object SchemaConverters {
/**
* Converts an Avro schema to Spark SQL schema type
* @param avroSchema the Avro schema to convert
* @return SchemaType containing DataType and nullability information
*/
def toSqlType(avroSchema: Schema): SchemaType
}
case class SchemaType(dataType: DataType, nullable: Boolean)Usage Examples:
import org.apache.avro.Schema
import org.apache.spark.sql.avro.SchemaConverters
// Parse Avro schema from JSON
val avroSchemaJson = """{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null},
{"name": "addresses", "type": {
"type": "array",
"items": {
"type": "record",
"name": "Address",
"fields": [
{"name": "street", "type": "string"},
{"name": "city", "type": "string"}
]
}
}}
]
}"""
val avroSchema = new Schema.Parser().parse(avroSchemaJson)
// Convert to Spark SQL schema
val schemaType = SchemaConverters.toSqlType(avroSchema)
val sparkSqlSchema = schemaType.dataType.asInstanceOf[StructType]
// Use in DataFrame operations
val df = spark.read
.schema(sparkSqlSchema)
.format("avro")
.load("path/to/user_data.avro")
// Print schema information
println(s"Nullable: ${schemaType.nullable}")
println(s"Spark Schema: ${sparkSqlSchema.prettyJson}")Convert Spark SQL DataType structures to Avro schemas with configurable naming.
object SchemaConverters {
/**
* Converts a Spark SQL DataType to Avro schema
* @param catalystType the Spark SQL DataType to convert
* @param nullable whether the type should be nullable
* @param recordName name for record types (default: "topLevelRecord")
* @param nameSpace namespace for record types (default: "")
* @return Avro Schema
* @throws IncompatibleSchemaException if the DataType cannot be converted
*/
def toAvroType(
catalystType: DataType,
nullable: Boolean = false,
recordName: String = "topLevelRecord",
nameSpace: String = ""
): Schema
/**
* Exception thrown when schema conversion fails
*/
case class IncompatibleSchemaException(msg: String) extends Exception(msg)
}Usage Examples:
import org.apache.spark.sql.types._
import org.apache.spark.sql.avro.SchemaConverters
// Create Spark SQL schema
val sparkSchema = StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("email", StringType, nullable = true),
StructField("scores", ArrayType(DoubleType, containsNull = false), nullable = false)
))
// Convert to Avro schema
val avroSchema = SchemaConverters.toAvroType(
sparkSchema,
nullable = false,
recordName = "UserProfile",
nameSpace = "com.example.data"
)
// Use in write operations
val df = spark.table("user_data")
df.write
.format("avro")
.option("avroSchema", avroSchema.toString)
.save("path/to/output")
// Print schema JSON
println(avroSchema.toString(true))
// Convert to Avro schema
val avroSchema = SchemaConverters.toAvroType(
catalystType = sparkSchema,
nullable = false,
recordName = "UserRecord",
nameSpace = "com.example"
)
// Use in write operations
val df = spark.table("users")
df.write
.format("avro")
.option("avroSchema", avroSchema.toString)
.save("path/to/output")Comprehensive type conversion support between Avro and Spark SQL types.
// Primitive types
NULL → NullType
BOOLEAN → BooleanType
INT → IntegerType
LONG → LongType
FLOAT → FloatType
DOUBLE → DoubleType
BYTES → BinaryType
STRING → StringType
ENUM → StringType
// Logical types
INT with date logical type → DateType
LONG with timestamp-millis logical type → TimestampType
LONG with timestamp-micros logical type → TimestampType
BYTES/FIXED with decimal logical type → DecimalType
// Complex types
RECORD → StructType
ARRAY → ArrayType
MAP → MapType(StringType, valueType)
UNION → Special handling (see below)
FIXED → BinaryType// Primitive types
BooleanType → BOOLEAN
ByteType → INT
ShortType → INT
IntegerType → INT
LongType → LONG
FloatType → FLOAT
DoubleType → DOUBLE
StringType → STRING
BinaryType → BYTES
// Special types
DateType → INT with date logical type
TimestampType → LONG with timestamp-micros logical type
DecimalType → FIXED with decimal logical type
// Complex types
StructType → RECORD
ArrayType → ARRAY
MapType → MAP (only with StringType keys)Special handling for Avro UNION types with multiple conversion strategies.
// Union type conversion strategies:
// 1. Nullable unions (union with null)
["null", "string"] → StringType with nullable = true
// 2. Simple type promotion unions
["int", "long"] → LongType
["float", "double"] → DoubleType
// 3. Complex unions (converted to struct with member fields)
["string", "int", "record"] → StructType with fields:
- member0: StringType (nullable = true)
- member1: IntegerType (nullable = true)
- member2: StructType (nullable = true)Union Handling Examples:
// Nullable field union
val nullableUnion = """["null", "string"]"""
// Converts to: StringType, nullable = true
// Type promotion union
val promotionUnion = """["int", "long"]"""
// Converts to: LongType, nullable = false
// Complex union
val complexUnion = """[
"string",
{"type": "record", "name": "Address", "fields": [
{"name": "street", "type": "string"}
]}
]"""
// Converts to: StructType with member0 (string) and member1 (Address record)Full support for Avro logical types with proper Spark SQL mapping.
// Date logical type (days since epoch)
{
"type": "int",
"logicalType": "date"
} → DateType
// Timestamp logical types
{
"type": "long",
"logicalType": "timestamp-millis"
} → TimestampType
{
"type": "long",
"logicalType": "timestamp-micros"
} → TimestampType// Decimal as BYTES
{
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 2
} → DecimalType(10, 2)
// Decimal as FIXED
{
"type": "fixed",
"name": "DecimalFixed",
"size": 8,
"logicalType": "decimal",
"precision": 18,
"scale": 4
} → DecimalType(18, 4)Handling schema evolution and compatibility between different schema versions.
// Schema evolution example
val v1Schema = """{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}"""
val v2Schema = """{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}"""
// Reading evolved data with original schema
val df = spark.read
.format("avro")
.option("avroSchema", v1Schema) // Use older schema
.load("path/to/v2_data.avro") // Reading newer dataSchema conversion error handling and common issues.
class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)Common Error Scenarios:
// Unsupported type conversion
try {
val unsupportedType = MapType(IntegerType, StringType) // Avro MAP requires string keys
SchemaConverters.toAvroType(unsupportedType)
} catch {
case e: IncompatibleSchemaException =>
println(s"Conversion failed: ${e.getMessage}")
}
// Recursive schema detection
val recursiveSchema = """{
"type": "record",
"name": "Node",
"fields": [
{"name": "value", "type": "string"},
{"name": "child", "type": "Node"}
]
}"""
try {
SchemaConverters.toSqlType(new Schema.Parser().parse(recursiveSchema))
} catch {
case e: IncompatibleSchemaException =>
println("Recursive schemas not supported")
}