or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

column-functions.mdconfiguration.mddata-source.mdindex.mdschema-conversion.md
tile.json

column-functions.mddocs/

Column Functions

Core functions for working with Avro binary data in DataFrame columns. These functions enable conversion between Spark's internal Catalyst format and Avro binary format within SQL queries and DataFrame operations.

Capabilities

from_avro Function

Converts a binary column containing Avro-format data into its corresponding Catalyst (Spark SQL) value. The specified schema must match the read data.

/**
 * Converts a binary column of avro format into its corresponding catalyst value.
 * 
 * @param data the binary column containing Avro data
 * @param jsonFormatSchema the avro schema in JSON string format
 * @return Column with decoded Catalyst data
 * @since 2.4.0
 */
@Experimental
def from_avro(data: Column, jsonFormatSchema: String): Column

Usage Examples:

import org.apache.spark.sql.avro._
import org.apache.spark.sql.functions.col

// Define Avro schema as JSON string
val userSchema = """{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"},
    {"name": "email", "type": ["null", "string"], "default": null}
  ]
}"""

// Convert binary Avro data to structured columns
val df = spark.table("avro_data")
val decodedDF = df.select(
  from_avro(col("avro_bytes"), userSchema).as("user_data")
)

// Access nested fields from decoded data
val expandedDF = decodedDF.select(
  col("user_data.name").as("user_name"),
  col("user_data.age").as("user_age"),
  col("user_data.email").as("user_email")
)

to_avro Function

Converts a column into binary Avro format. The input column data is serialized according to its inferred or specified Avro schema.

/**
 * Converts a column into binary of avro format.
 * 
 * @param data the data column to convert
 * @return Column with Avro binary data
 * @since 2.4.0
 */
@Experimental
def to_avro(data: Column): Column

Usage Examples:

import org.apache.spark.sql.avro._
import org.apache.spark.sql.functions.{col, struct}

// Convert structured data to Avro binary
val df = spark.table("users")
val avroDF = df.select(
  to_avro(struct(
    col("name"),
    col("age"), 
    col("email")
  )).as("avro_data")
)

// Store binary Avro data for later processing
avroDF.write
  .format("parquet")
  .save("path/to/avro_binary_data")

// Convert entire row to Avro
val rowAsAvroDF = df.select(
  to_avro(struct(col("*"))).as("row_as_avro")
)

Error Handling

Both functions handle schema mismatches and malformed data:

  • from_avro: Throws runtime exceptions for schema mismatches or corrupted binary data
  • to_avro: May throw IncompatibleSchemaException for unsupported data type conversions

Common Error Scenarios:

// Schema mismatch - will throw runtime exception
val wrongSchema = """{"type": "string"}"""
val df = spark.table("complex_avro_data")
// This will fail if avro_bytes contains record data
val failingDF = df.select(from_avro(col("avro_bytes"), wrongSchema))

// Unsupported type conversion
import org.apache.spark.sql.types._
val unsupportedDF = spark.range(10).select(
  // This may fail for complex nested structures not supported by Avro
  to_avro(col("some_complex_column"))
)

Expression Implementation Details

AvroDataToCatalyst Expression

The from_avro function is implemented as a code-generated Catalyst expression:

case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String)
  extends UnaryExpression with ExpectsInputTypes {
  
  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
  override lazy val dataType: DataType = SchemaConverters.toSqlType(avroSchema).dataType
  override def nullable: Boolean = true
  override def prettyName: String = "from_avro"
}

CatalystDataToAvro Expression

The to_avro function is implemented as a code-generated Catalyst expression:

case class CatalystDataToAvro(child: Expression) extends UnaryExpression {
  override def dataType: DataType = BinaryType
  override def prettyName: String = "to_avro"
}