The Avro functions provide high-level operations for converting between binary Avro data and Spark SQL columns. These functions handle schema-based serialization and deserialization with comprehensive type mapping and error handling.
Available in both Scala and Python APIs with equivalent functionality.
Converts binary Avro data to Catalyst rows using a specified Avro schema.
def from_avro(data: Column, jsonFormatSchema: String): Columndef from_avro(data: ColumnOrName, jsonFormatSchema: str, options: Optional[Dict[str, str]] = None) -> ColumnParameters:
data: Column containing binary Avro datajsonFormatSchema: Avro schema in JSON string formatoptions (Python only): Optional dictionary of parsing optionsReturns: Column with deserialized Catalyst data matching the Avro schema
Usage Example:
Scala:
import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.functions.col
val avroSchema = """{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}"""
val df = spark.read.format("avro").load("users.avro")
val decodedDf = df.select(from_avro(col("binary_data"), avroSchema).as("user"))Python:
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import col
avro_schema = '{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}'
df = spark.read.format("avro").load("users.avro")
decoded_df = df.select(from_avro(col("binary_data"), avro_schema).alias("user"))Converts binary Avro data to Catalyst rows with additional parsing options.
def from_avro(data: Column, jsonFormatSchema: String, options: java.util.Map[String, String]): ColumnParameters:
data: Column containing binary Avro datajsonFormatSchema: Avro schema in JSON string formatoptions: Map of parsing options (mode, dateTimeRebaseMode, etc.)Returns: Column with deserialized Catalyst data
Usage Example:
import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._
val options: JMap[String, String] = Map(
"mode" -> "PERMISSIVE",
"dateTimeRebaseMode" -> "CORRECTED"
).asJava
val decodedDf = df.select(from_avro(col("binary_data"), avroSchema, options).as("user"))Converts Catalyst data to binary Avro format using the data's inferred schema.
def to_avro(data: Column): Columndef to_avro(data: ColumnOrName, jsonFormatSchema: str = "") -> ColumnParameters:
data: Column containing Catalyst data to serializejsonFormatSchema (Python): Optional output Avro schema in JSON string formatReturns: Column with binary Avro data
Usage Example:
Scala:
val encodedDf = df.select(to_avro(col("user_struct")).as("avro_data"))Python:
from pyspark.sql.avro.functions import to_avro
from pyspark.sql.functions import col
encoded_df = df.select(to_avro(col("user_struct")).alias("avro_data"))Converts Catalyst data to binary Avro format using a specified output schema.
def to_avro(data: Column, jsonFormatSchema: String): ColumnParameters:
data: Column containing Catalyst data to serializejsonFormatSchema: Target Avro schema in JSON string formatReturns: Column with binary Avro data conforming to the specified schema
Usage Example:
val outputSchema = """{"type":"record","name":"UserOutput","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}"""
val encodedDf = df.select(to_avro(col("user_struct"), outputSchema).as("avro_data"))Returns the DDL-formatted Spark SQL schema corresponding to an Avro schema.
def schema_of_avro(jsonFormatSchema: String): ColumnParameters:
jsonFormatSchema: Avro schema in JSON string formatReturns: Column containing the DDL schema string
Usage Example:
val schemaDf = spark.sql("SELECT schema_of_avro('"+avroSchema+"') AS spark_schema")
schemaDf.show(false)
// +--------------------------------------------------+
// |spark_schema |
// +--------------------------------------------------+
// |struct<name:string,age:int> |
// +--------------------------------------------------+Returns the DDL-formatted Spark SQL schema with additional parsing options.
def schema_of_avro(jsonFormatSchema: String, options: java.util.Map[String, String]): ColumnParameters:
jsonFormatSchema: Avro schema in JSON string formatoptions: Map of schema conversion optionsReturns: Column containing the DDL schema string
Usage Example:
val options: JMap[String, String] = Map(
"enableStableIdentifiersForUnionType" -> "true",
"recursiveFieldMaxDepth" -> "10"
).asJava
val schemaDf = spark.sql("SELECT schema_of_avro('"+avroSchema+"', map('enableStableIdentifiersForUnionType', 'true')) AS spark_schema")Note: schema_of_avro functions are only available in Scala API. Python users should use Spark SQL to access these functions:
# Python - use SQL to access schema_of_avro
schema_df = spark.sql(f"SELECT schema_of_avro('{avro_schema}') AS spark_schema")The following functions are deprecated and maintained for backward compatibility. Use the functions from org.apache.spark.sql.avro.functions instead.
// DEPRECATED: Use org.apache.spark.sql.avro.functions.from_avro instead
@deprecated("Please use 'org.apache.spark.sql.avro.functions.from_avro' instead.", "3.0.0")
def from_avro(data: Column, jsonFormatSchema: String): Column
// DEPRECATED: Use org.apache.spark.sql.avro.functions.to_avro instead
@deprecated("Please use 'org.apache.spark.sql.avro.functions.to_avro' instead.", "3.0.0")
def to_avro(data: Column): ColumnAll functions support different error handling modes through the mode option:
val options: JMap[String, String] = Map("mode" -> "FAILFAST").asJava
val strictDf = df.select(from_avro(col("binary_data"), avroSchema, options).as("user"))The functions automatically handle conversion between Avro types and Spark SQL types:
The Avro functions may throw specific exceptions for schema and type incompatibilities:
// Schema incompatibility errors (internal)
class IncompatibleSchemaException extends RuntimeException
// Unsupported Avro type errors (internal)
class UnsupportedAvroTypeException extends RuntimeExceptionCommon Error Scenarios:
Error Handling Best Practices:
import org.apache.spark.sql.AnalysisException
import org.apache.spark.SparkException
try {
val result = df.select(from_avro(col("data"), invalidSchema).as("parsed"))
result.collect()
} catch {
case e: AnalysisException =>
println(s"Schema validation failed: ${e.getMessage}")
case e: SparkException =>
println(s"Runtime processing error: ${e.getMessage}")
case e: Exception =>
println(s"Unexpected error: ${e.getMessage}")
}