or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

column-functions.mdconfiguration.mddata-source.mdindex.mdschema-conversion.md
tile.json

schema-conversion.mddocs/

Schema Conversion

Utilities for converting between Apache Avro schemas and Spark SQL schemas. Supports complex nested types, logical types, and bidirectional conversion with proper nullability handling.

Capabilities

Avro to Spark SQL Conversion

Convert Avro schemas to Spark SQL DataType structures with nullability information.

object SchemaConverters {
  /**
   * Converts an Avro schema to Spark SQL schema type
   * @param avroSchema the Avro schema to convert
   * @return SchemaType containing DataType and nullability information
   */
  def toSqlType(avroSchema: Schema): SchemaType
}

case class SchemaType(dataType: DataType, nullable: Boolean)

Usage Examples:

import org.apache.avro.Schema
import org.apache.spark.sql.avro.SchemaConverters

// Parse Avro schema from JSON
val avroSchemaJson = """{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": ["null", "string"], "default": null},
    {"name": "addresses", "type": {
      "type": "array",
      "items": {
        "type": "record",
        "name": "Address",
        "fields": [
          {"name": "street", "type": "string"},
          {"name": "city", "type": "string"}
        ]
      }
    }}
  ]
}"""

val avroSchema = new Schema.Parser().parse(avroSchemaJson)

// Convert to Spark SQL schema
val schemaType = SchemaConverters.toSqlType(avroSchema)
val sparkSqlSchema = schemaType.dataType.asInstanceOf[StructType]

// Use in DataFrame operations
val df = spark.read
  .schema(sparkSqlSchema)
  .format("avro")
  .load("path/to/user_data.avro")

// Print schema information
println(s"Nullable: ${schemaType.nullable}")
println(s"Spark Schema: ${sparkSqlSchema.prettyJson}")

Spark SQL to Avro Conversion

Convert Spark SQL DataType structures to Avro schemas with configurable naming.

object SchemaConverters {
  /**
   * Converts a Spark SQL DataType to Avro schema
   * @param catalystType the Spark SQL DataType to convert
   * @param nullable whether the type should be nullable
   * @param recordName name for record types (default: "topLevelRecord")
   * @param nameSpace namespace for record types (default: "")
   * @return Avro Schema
   * @throws IncompatibleSchemaException if the DataType cannot be converted
   */
  def toAvroType(
    catalystType: DataType, 
    nullable: Boolean = false,
    recordName: String = "topLevelRecord", 
    nameSpace: String = ""
  ): Schema
  
  /**
   * Exception thrown when schema conversion fails
   */
  case class IncompatibleSchemaException(msg: String) extends Exception(msg)
}

Usage Examples:

import org.apache.spark.sql.types._
import org.apache.spark.sql.avro.SchemaConverters

// Create Spark SQL schema
val sparkSchema = StructType(Seq(
  StructField("id", LongType, nullable = false),
  StructField("name", StringType, nullable = false),
  StructField("email", StringType, nullable = true),
  StructField("scores", ArrayType(DoubleType, containsNull = false), nullable = false)
))

// Convert to Avro schema
val avroSchema = SchemaConverters.toAvroType(
  sparkSchema, 
  nullable = false,
  recordName = "UserProfile",
  nameSpace = "com.example.data"
)

// Use in write operations
val df = spark.table("user_data")
df.write
  .format("avro")
  .option("avroSchema", avroSchema.toString)
  .save("path/to/output")

// Print schema JSON
println(avroSchema.toString(true))

// Convert to Avro schema
val avroSchema = SchemaConverters.toAvroType(
  catalystType = sparkSchema,
  nullable = false,
  recordName = "UserRecord",
  nameSpace = "com.example"
)

// Use in write operations
val df = spark.table("users")
df.write
  .format("avro")
  .option("avroSchema", avroSchema.toString)
  .save("path/to/output")

Supported Type Mappings

Comprehensive type conversion support between Avro and Spark SQL types.

Avro to Spark SQL Type Mappings

// Primitive types
NULL → NullType
BOOLEAN → BooleanType
INT → IntegerType
LONG → LongType
FLOAT → FloatType
DOUBLE → DoubleType
BYTES → BinaryType
STRING → StringType
ENUM → StringType

// Logical types
INT with date logical type → DateType
LONG with timestamp-millis logical type → TimestampType
LONG with timestamp-micros logical type → TimestampType
BYTES/FIXED with decimal logical type → DecimalType

// Complex types
RECORD → StructType
ARRAY → ArrayType
MAP → MapType(StringType, valueType)
UNION → Special handling (see below)
FIXED → BinaryType

Spark SQL to Avro Type Mappings

// Primitive types
BooleanType → BOOLEAN
ByteType → INT
ShortType → INT
IntegerType → INT
LongType → LONG
FloatType → FLOAT
DoubleType → DOUBLE
StringType → STRING
BinaryType → BYTES

// Special types
DateType → INT with date logical type
TimestampType → LONG with timestamp-micros logical type
DecimalType → FIXED with decimal logical type

// Complex types
StructType → RECORD
ArrayType → ARRAY
MapType → MAP (only with StringType keys)

UNION Type Handling

Special handling for Avro UNION types with multiple conversion strategies.

// Union type conversion strategies:

// 1. Nullable unions (union with null)
["null", "string"] → StringType with nullable = true

// 2. Simple type promotion unions
["int", "long"] → LongType
["float", "double"] → DoubleType

// 3. Complex unions (converted to struct with member fields)
["string", "int", "record"] → StructType with fields:
  - member0: StringType (nullable = true)
  - member1: IntegerType (nullable = true) 
  - member2: StructType (nullable = true)

Union Handling Examples:

// Nullable field union
val nullableUnion = """["null", "string"]"""
// Converts to: StringType, nullable = true

// Type promotion union
val promotionUnion = """["int", "long"]"""
// Converts to: LongType, nullable = false

// Complex union
val complexUnion = """[
  "string",
  {"type": "record", "name": "Address", "fields": [
    {"name": "street", "type": "string"}
  ]}
]"""
// Converts to: StructType with member0 (string) and member1 (Address record)

Logical Types Support

Full support for Avro logical types with proper Spark SQL mapping.

Date and Timestamp Logical Types

// Date logical type (days since epoch)
{
  "type": "int",
  "logicalType": "date"
} → DateType

// Timestamp logical types
{
  "type": "long", 
  "logicalType": "timestamp-millis"
} → TimestampType

{
  "type": "long",
  "logicalType": "timestamp-micros"  
} → TimestampType

Decimal Logical Type

// Decimal as BYTES
{
  "type": "bytes",
  "logicalType": "decimal",
  "precision": 10,
  "scale": 2
} → DecimalType(10, 2)

// Decimal as FIXED
{
  "type": "fixed",
  "name": "DecimalFixed",
  "size": 8,
  "logicalType": "decimal", 
  "precision": 18,
  "scale": 4
} → DecimalType(18, 4)

Schema Evolution Support

Handling schema evolution and compatibility between different schema versions.

// Schema evolution example
val v1Schema = """{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"}
  ]
}"""

val v2Schema = """{
  "type": "record", 
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": ["null", "string"], "default": null}
  ]
}"""

// Reading evolved data with original schema
val df = spark.read
  .format("avro")
  .option("avroSchema", v1Schema)  // Use older schema
  .load("path/to/v2_data.avro")    // Reading newer data

Error Handling

Schema conversion error handling and common issues.

class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)

Common Error Scenarios:

// Unsupported type conversion
try {
  val unsupportedType = MapType(IntegerType, StringType) // Avro MAP requires string keys
  SchemaConverters.toAvroType(unsupportedType)
} catch {
  case e: IncompatibleSchemaException =>
    println(s"Conversion failed: ${e.getMessage}")
}

// Recursive schema detection
val recursiveSchema = """{
  "type": "record",
  "name": "Node", 
  "fields": [
    {"name": "value", "type": "string"},
    {"name": "child", "type": "Node"} 
  ]
}"""

try {
  SchemaConverters.toSqlType(new Schema.Parser().parse(recursiveSchema))
} catch {
  case e: IncompatibleSchemaException =>
    println("Recursive schemas not supported")
}