Core functions for working with Avro binary data in DataFrame columns. These functions enable conversion between Spark's internal Catalyst format and Avro binary format within SQL queries and DataFrame operations.
Converts a binary column containing Avro-format data into its corresponding Catalyst (Spark SQL) value. The specified schema must match the read data.
/**
* Converts a binary column of avro format into its corresponding catalyst value.
*
* @param data the binary column containing Avro data
* @param jsonFormatSchema the avro schema in JSON string format
* @return Column with decoded Catalyst data
* @since 2.4.0
*/
@Experimental
def from_avro(data: Column, jsonFormatSchema: String): ColumnUsage Examples:
import org.apache.spark.sql.avro._
import org.apache.spark.sql.functions.col
// Define Avro schema as JSON string
val userSchema = """{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}"""
// Convert binary Avro data to structured columns
val df = spark.table("avro_data")
val decodedDF = df.select(
from_avro(col("avro_bytes"), userSchema).as("user_data")
)
// Access nested fields from decoded data
val expandedDF = decodedDF.select(
col("user_data.name").as("user_name"),
col("user_data.age").as("user_age"),
col("user_data.email").as("user_email")
)Converts a column into binary Avro format. The input column data is serialized according to its inferred or specified Avro schema.
/**
* Converts a column into binary of avro format.
*
* @param data the data column to convert
* @return Column with Avro binary data
* @since 2.4.0
*/
@Experimental
def to_avro(data: Column): ColumnUsage Examples:
import org.apache.spark.sql.avro._
import org.apache.spark.sql.functions.{col, struct}
// Convert structured data to Avro binary
val df = spark.table("users")
val avroDF = df.select(
to_avro(struct(
col("name"),
col("age"),
col("email")
)).as("avro_data")
)
// Store binary Avro data for later processing
avroDF.write
.format("parquet")
.save("path/to/avro_binary_data")
// Convert entire row to Avro
val rowAsAvroDF = df.select(
to_avro(struct(col("*"))).as("row_as_avro")
)Both functions handle schema mismatches and malformed data:
from_avro: Throws runtime exceptions for schema mismatches or corrupted binary datato_avro: May throw IncompatibleSchemaException for unsupported data type conversionsCommon Error Scenarios:
// Schema mismatch - will throw runtime exception
val wrongSchema = """{"type": "string"}"""
val df = spark.table("complex_avro_data")
// This will fail if avro_bytes contains record data
val failingDF = df.select(from_avro(col("avro_bytes"), wrongSchema))
// Unsupported type conversion
import org.apache.spark.sql.types._
val unsupportedDF = spark.range(10).select(
// This may fail for complex nested structures not supported by Avro
to_avro(col("some_complex_column"))
)The from_avro function is implemented as a code-generated Catalyst expression:
case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String)
extends UnaryExpression with ExpectsInputTypes {
override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
override lazy val dataType: DataType = SchemaConverters.toSqlType(avroSchema).dataType
override def nullable: Boolean = true
override def prettyName: String = "from_avro"
}The to_avro function is implemented as a code-generated Catalyst expression:
case class CatalystDataToAvro(child: Expression) extends UnaryExpression {
override def dataType: DataType = BinaryType
override def prettyName: String = "to_avro"
}