or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddatasource.mdfunctions.mdindex.mdschema-conversion.md
tile.json

schema-conversion.mddocs/

Schema Conversion

The Schema Conversion API provides developer-level utilities for converting between Avro schemas and Spark SQL data types. This API is marked as @DeveloperApi and is primarily used for advanced schema manipulation and custom data source implementations.

Core API

SchemaConverters Object

object SchemaConverters {
  def toSqlType(avroSchema: org.apache.avro.Schema): SchemaType
  def toSqlType(avroSchema: org.apache.avro.Schema, useStableIdForUnionType: Boolean, 
                stableIdPrefixForUnionType: String, recursiveFieldMaxDepth: Int): SchemaType
  def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): org.apache.avro.Schema
}

SchemaType Case Class

case class SchemaType(dataType: DataType, nullable: Boolean)

Represents the result of converting an Avro schema to a Spark SQL data type, including nullability information.

Avro to Spark SQL Conversion

Basic Conversion

Converts an Avro schema to a Spark SQL DataType:

def toSqlType(avroSchema: org.apache.avro.Schema): SchemaType

Usage Example:

import org.apache.spark.sql.avro.SchemaConverters
import org.apache.avro.Schema

val avroSchemaJson = """{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": ["null", "string"], "default": null}
  ]
}"""

val avroSchema = new Schema.Parser().parse(avroSchemaJson)
val sparkSchema = SchemaConverters.toSqlType(avroSchema)

println(sparkSchema.dataType)
// StructType(List(
//   StructField("id", LongType, false),
//   StructField("name", StringType, false),
//   StructField("email", StringType, true)
// ))

Advanced Conversion

Converts with advanced options for union type handling and recursion control:

def toSqlType(avroSchema: org.apache.avro.Schema, 
              useStableIdForUnionType: Boolean, 
              stableIdPrefixForUnionType: String, 
              recursiveFieldMaxDepth: Int): SchemaType

Parameters:

  • avroSchema: The Avro schema to convert
  • useStableIdForUnionType: Use stable identifiers for union types
  • stableIdPrefixForUnionType: Prefix for stable union identifiers
  • recursiveFieldMaxDepth: Maximum depth for recursive field resolution

Usage Example:

val complexSchema = SchemaConverters.toSqlType(
  avroSchema,
  useStableIdForUnionType = true,
  stableIdPrefixForUnionType = "union_",
  recursiveFieldMaxDepth = 10
)

Spark SQL to Avro Conversion

Type Conversion

Converts a Spark SQL DataType to an Avro schema:

def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): org.apache.avro.Schema

Parameters:

  • catalystType: Spark SQL DataType to convert
  • nullable: Whether the field can be null
  • recordName: Name for generated record types
  • nameSpace: Namespace for generated schemas

Usage Example:

import org.apache.spark.sql.types._

val sparkSchema = StructType(Seq(
  StructField("id", LongType, nullable = false),
  StructField("name", StringType, nullable = false),
  StructField("score", DoubleType, nullable = true)
))

val avroSchema = SchemaConverters.toAvroType(
  sparkSchema,
  nullable = false,
  recordName = "UserRecord",
  nameSpace = "com.example"
)

println(avroSchema.toString(true))

Type Mapping

Primitive Types

Avro TypeSpark SQL TypeNotes
booleanBooleanTypeDirect mapping
intIntegerType32-bit signed integer
longLongType64-bit signed integer
floatFloatType32-bit IEEE 754
doubleDoubleType64-bit IEEE 754
bytesBinaryTypeVariable-length byte array
stringStringTypeUTF-8 encoded string

Complex Types

Avro TypeSpark SQL TypeNotes
recordStructTypeNamed fields with types
arrayArrayTypeHomogeneous element type
mapMapTypeString keys, typed values
unionStructType or nullable fieldDepends on union content
enumStringTypeEnum values as strings
fixedBinaryTypeFixed-length byte array

Logical Types

Avro Logical TypeSpark SQL TypeNotes
decimalDecimalTypePrecision and scale preserved
dateDateTypeDays since Unix epoch
time-millisIntegerTypeMilliseconds since midnight
time-microsLongTypeMicroseconds since midnight
timestamp-millisTimestampTypeMilliseconds since Unix epoch
timestamp-microsTimestampTypeMicroseconds since Unix epoch
durationCalendarIntervalTypeISO-8601 duration

Advanced Features

Union Type Handling

Avro unions are handled differently based on their composition:

// Simple nullable field: ["null", "string"] -> StringType(nullable=true)
val nullableString = """{"name": "optional_field", "type": ["null", "string"]}"""

// Complex union: ["string", "int"] -> StructType with member_0, member_1 fields
val complexUnion = """{"name": "mixed_field", "type": ["string", "int"]}"""

Recursive Schema Support

The API handles recursive schemas with configurable depth limits:

val recursiveSchema = """{
  "type": "record",
  "name": "Node",
  "fields": [
    {"name": "value", "type": "string"},
    {"name": "children", "type": {"type": "array", "items": "Node"}}
  ]
}"""

val converted = SchemaConverters.toSqlType(
  new Schema.Parser().parse(recursiveSchema),
  useStableIdForUnionType = false,
  stableIdPrefixForUnionType = "",
  recursiveFieldMaxDepth = 5  // Limit recursion to prevent stack overflow
)

Schema Evolution Compatibility

The conversion API supports schema evolution patterns:

// Reader schema (newer version)
val readerSchema = """{
  "type": "record",
  "name": "User", 
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string", "default": ""}
  ]
}"""

// Writer schema (older version) - missing email field
val writerSchema = """{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"}
  ]
}"""

// Both schemas convert to compatible Spark SQL types
val readerSparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(readerSchema))
val writerSparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(writerSchema))

Error Handling

The conversion API throws exceptions for unsupported schema patterns:

  • UnsupportedAvroTypeException: Thrown for unsupported Avro types
  • IncompatibleSchemaException: Thrown for incompatible schema conversions
  • IllegalArgumentException: Thrown for invalid parameters
try {
  val converted = SchemaConverters.toSqlType(complexAvroSchema)
} catch {
  case e: UnsupportedAvroTypeException => 
    println(s"Unsupported Avro type: ${e.getMessage}")
  case e: IncompatibleSchemaException =>
    println(s"Schema incompatibility: ${e.getMessage}")
}