or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddatasource.mdfunctions.mdindex.mdschema-conversion.md
tile.json

functions.mddocs/

Avro Functions

The Avro functions provide high-level operations for converting between binary Avro data and Spark SQL columns. These functions handle schema-based serialization and deserialization with comprehensive type mapping and error handling.

Available in both Scala and Python APIs with equivalent functionality.

Core Functions

from_avro

Converts binary Avro data to Catalyst rows using a specified Avro schema.

def from_avro(data: Column, jsonFormatSchema: String): Column
def from_avro(data: ColumnOrName, jsonFormatSchema: str, options: Optional[Dict[str, str]] = None) -> Column

Parameters:

  • data: Column containing binary Avro data
  • jsonFormatSchema: Avro schema in JSON string format
  • options (Python only): Optional dictionary of parsing options

Returns: Column with deserialized Catalyst data matching the Avro schema

Usage Example:

Scala:

import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.functions.col

val avroSchema = """{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}"""
val df = spark.read.format("avro").load("users.avro")
val decodedDf = df.select(from_avro(col("binary_data"), avroSchema).as("user"))

Python:

from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import col

avro_schema = '{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}'
df = spark.read.format("avro").load("users.avro")
decoded_df = df.select(from_avro(col("binary_data"), avro_schema).alias("user"))

from_avro (with options)

Converts binary Avro data to Catalyst rows with additional parsing options.

def from_avro(data: Column, jsonFormatSchema: String, options: java.util.Map[String, String]): Column

Parameters:

  • data: Column containing binary Avro data
  • jsonFormatSchema: Avro schema in JSON string format
  • options: Map of parsing options (mode, dateTimeRebaseMode, etc.)

Returns: Column with deserialized Catalyst data

Usage Example:

import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._

val options: JMap[String, String] = Map(
  "mode" -> "PERMISSIVE",
  "dateTimeRebaseMode" -> "CORRECTED"
).asJava

val decodedDf = df.select(from_avro(col("binary_data"), avroSchema, options).as("user"))

to_avro

Converts Catalyst data to binary Avro format using the data's inferred schema.

def to_avro(data: Column): Column
def to_avro(data: ColumnOrName, jsonFormatSchema: str = "") -> Column

Parameters:

  • data: Column containing Catalyst data to serialize
  • jsonFormatSchema (Python): Optional output Avro schema in JSON string format

Returns: Column with binary Avro data

Usage Example:

Scala:

val encodedDf = df.select(to_avro(col("user_struct")).as("avro_data"))

Python:

from pyspark.sql.avro.functions import to_avro
from pyspark.sql.functions import col

encoded_df = df.select(to_avro(col("user_struct")).alias("avro_data"))

to_avro (with schema)

Converts Catalyst data to binary Avro format using a specified output schema.

def to_avro(data: Column, jsonFormatSchema: String): Column

Parameters:

  • data: Column containing Catalyst data to serialize
  • jsonFormatSchema: Target Avro schema in JSON string format

Returns: Column with binary Avro data conforming to the specified schema

Usage Example:

val outputSchema = """{"type":"record","name":"UserOutput","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}"""
val encodedDf = df.select(to_avro(col("user_struct"), outputSchema).as("avro_data"))

schema_of_avro

Returns the DDL-formatted Spark SQL schema corresponding to an Avro schema.

def schema_of_avro(jsonFormatSchema: String): Column

Parameters:

  • jsonFormatSchema: Avro schema in JSON string format

Returns: Column containing the DDL schema string

Usage Example:

val schemaDf = spark.sql("SELECT schema_of_avro('"+avroSchema+"') AS spark_schema")
schemaDf.show(false)
// +--------------------------------------------------+
// |spark_schema                                      |
// +--------------------------------------------------+
// |struct<name:string,age:int>                       |
// +--------------------------------------------------+

schema_of_avro (with options)

Returns the DDL-formatted Spark SQL schema with additional parsing options.

def schema_of_avro(jsonFormatSchema: String, options: java.util.Map[String, String]): Column

Parameters:

  • jsonFormatSchema: Avro schema in JSON string format
  • options: Map of schema conversion options

Returns: Column containing the DDL schema string

Usage Example:

val options: JMap[String, String] = Map(
  "enableStableIdentifiersForUnionType" -> "true",
  "recursiveFieldMaxDepth" -> "10"
).asJava

val schemaDf = spark.sql("SELECT schema_of_avro('"+avroSchema+"', map('enableStableIdentifiersForUnionType', 'true')) AS spark_schema")

Note: schema_of_avro functions are only available in Scala API. Python users should use Spark SQL to access these functions:

# Python - use SQL to access schema_of_avro
schema_df = spark.sql(f"SELECT schema_of_avro('{avro_schema}') AS spark_schema")

Deprecated Functions

Legacy Package Functions

The following functions are deprecated and maintained for backward compatibility. Use the functions from org.apache.spark.sql.avro.functions instead.

// DEPRECATED: Use org.apache.spark.sql.avro.functions.from_avro instead
@deprecated("Please use 'org.apache.spark.sql.avro.functions.from_avro' instead.", "3.0.0")
def from_avro(data: Column, jsonFormatSchema: String): Column

// DEPRECATED: Use org.apache.spark.sql.avro.functions.to_avro instead  
@deprecated("Please use 'org.apache.spark.sql.avro.functions.to_avro' instead.", "3.0.0")
def to_avro(data: Column): Column

Error Handling

All functions support different error handling modes through the mode option:

  • PERMISSIVE (default): Sets corrupt records to null and continues processing
  • DROPMALFORMED: Ignores corrupt records completely
  • FAILFAST: Throws exception on first corrupt record
val options: JMap[String, String] = Map("mode" -> "FAILFAST").asJava
val strictDf = df.select(from_avro(col("binary_data"), avroSchema, options).as("user"))

Type Mapping

The functions automatically handle conversion between Avro types and Spark SQL types:

  • Avro primitive types → Spark SQL primitive types (string, int, long, float, double, boolean, bytes)
  • Avro records → Spark SQL structs
  • Avro arrays → Spark SQL arrays
  • Avro maps → Spark SQL maps
  • Avro unions → Spark SQL structs with nullable fields
  • Avro enums → Spark SQL strings
  • Avro logical types → Appropriate Spark SQL types (timestamps, dates, decimals)

Exception Handling

The Avro functions may throw specific exceptions for schema and type incompatibilities:

Exception Classes

// Schema incompatibility errors (internal)
class IncompatibleSchemaException extends RuntimeException

// Unsupported Avro type errors (internal) 
class UnsupportedAvroTypeException extends RuntimeException

Common Error Scenarios:

  • IncompatibleSchemaException: Thrown when provided schema is incompatible with data
  • UnsupportedAvroTypeException: Thrown when encountering unsupported Avro types
  • AnalysisException: Thrown for invalid schema JSON or malformed parameters
  • SparkException: Thrown for runtime errors during processing

Error Handling Best Practices:

import org.apache.spark.sql.AnalysisException
import org.apache.spark.SparkException

try {
  val result = df.select(from_avro(col("data"), invalidSchema).as("parsed"))
  result.collect()
} catch {
  case e: AnalysisException => 
    println(s"Schema validation failed: ${e.getMessage}")
  case e: SparkException =>
    println(s"Runtime processing error: ${e.getMessage}")
  case e: Exception =>
    println(s"Unexpected error: ${e.getMessage}")
}