Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format with automatic schema evolution support and built-in compression capabilities
—
The Spark Avro connector provides SQL functions for converting between binary Avro data and Spark SQL structures. These functions enable processing of Avro-encoded columns within DataFrames, useful for working with message queues, event streams, and embedded binary Avro data.
Converts binary Avro data to Spark SQL structures using a provided schema.
def from_avro(data: Column, jsonFormatSchema: String): Column
def from_avro(
data: Column,
jsonFormatSchema: String,
options: java.util.Map[String, String]
): ColumnParameters:
data: Column containing binary Avro data (BinaryType)jsonFormatSchema: Avro schema in JSON string formatoptions: Additional options for deserialization (optional)Returns: Column with deserialized Spark SQL data structure
Basic Usage:
import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.functions._
val schema = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
"""
val df = spark.table("kafka_topic")
val decodedDF = df.select(
from_avro(col("value"), schema).as("user_data")
)
decodedDF.select(
col("user_data.id"),
col("user_data.name"),
col("user_data.email")
).show()Usage with Options:
import scala.collection.JavaConverters._
val options = Map(
"mode" -> "PERMISSIVE",
"datetimeRebaseMode" -> "CORRECTED"
).asJava
val decodedDF = df.select(
from_avro(col("avro_bytes"), schema, options).as("parsed_data")
)Converts Spark SQL data structures to binary Avro format.
def to_avro(data: Column): Column
def to_avro(data: Column, jsonFormatSchema: String): ColumnParameters:
data: Column with Spark SQL data structurejsonFormatSchema: Target Avro schema in JSON format (optional)Returns: Column with binary Avro data (BinaryType)
Basic Usage:
import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.functions._
val df = spark.table("users")
// Convert entire row to Avro
val avroDF = df.select(
to_avro(struct(col("*"))).as("avro_data")
)
// Convert specific columns to Avro
val avroDF2 = df.select(
col("id"),
to_avro(struct(
col("name"),
col("email"),
col("created_at")
)).as("user_avro")
)Usage with Custom Schema:
val outputSchema = """
{
"type": "record",
"name": "UserOutput",
"namespace": "com.example",
"fields": [
{"name": "user_id", "type": "long"},
{"name": "full_name", "type": "string"},
{"name": "contact_email", "type": ["null", "string"]}
]
}
"""
val avroDF = df.select(
to_avro(
struct(
col("id").as("user_id"),
col("name").as("full_name"),
col("email").as("contact_email")
),
outputSchema
).as("formatted_avro")
)Converting data from Avro to Spark SQL structures, processing, and back to Avro:
val schema = """
{
"type": "record",
"name": "Event",
"fields": [
{"name": "event_id", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "user_id", "type": "long"},
{"name": "properties", "type": {"type": "map", "values": "string"}}
]
}
"""
val processedDF = df
.select(from_avro(col("event_data"), schema).as("event"))
.select(
col("event.event_id"),
col("event.timestamp"),
col("event.user_id"),
col("event.properties")
)
.filter(col("timestamp") > unix_timestamp() - 3600) // Last hour only
.withColumn("processed_at", current_timestamp())
.select(
to_avro(struct(
col("event_id"),
col("timestamp"),
col("user_id"),
col("properties"),
col("processed_at")
)).as("processed_event")
)Common pattern for processing Avro messages from Kafka:
val kafkaDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "avro-events")
.load()
val schema = """
{
"type": "record",
"name": "KafkaEvent",
"fields": [
{"name": "event_type", "type": "string"},
{"name": "payload", "type": "string"},
{"name": "metadata", "type": {"type": "map", "values": "string"}}
]
}
"""
val decodedDF = kafkaDF.select(
col("key").cast("string").as("message_key"),
from_avro(col("value"), schema).as("event_data"),
col("timestamp").as("kafka_timestamp")
).select(
col("message_key"),
col("event_data.event_type"),
col("event_data.payload"),
col("event_data.metadata"),
col("kafka_timestamp")
)Working with complex nested types:
val nestedSchema = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer", "type": {
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "address", "type": {
"type": "record",
"name": "Address",
"fields": [
{"name": "street", "type": "string"},
{"name": "city", "type": "string"},
{"name": "country", "type": "string"}
]
}}
]
}},
{"name": "items", "type": {
"type": "array",
"items": {
"type": "record",
"name": "Item",
"fields": [
{"name": "sku", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "price", "type": "double"}
]
}
}}
]
}
"""
val orderDF = df.select(
from_avro(col("order_data"), nestedSchema).as("order")
).select(
col("order.order_id"),
col("order.customer.name").as("customer_name"),
col("order.customer.address.city").as("city"),
size(col("order.items")).as("item_count"),
expr("aggregate(order.items, 0.0, (acc, item) -> acc + item.price * item.quantity)").as("total_amount")
)When using the three-parameter version of from_avro, the following options are supported:
PERMISSIVE (default): Parse all records, set corrupt records to nullDROPMALFORMED: Drop records that don't match the schemaFAILFAST: Throw exception on first corrupt recordval options = Map("mode" -> "DROPMALFORMED").asJava
val cleanDF = df.select(
from_avro(col("data"), schema, options).as("parsed")
).filter(col("parsed").isNotNull)Controls handling of DATE and TIMESTAMP values:
EXCEPTION: Throw exception for dates that need rebasingLEGACY: Use legacy Julian calendar handlingCORRECTED: Apply proleptic Gregorian calendar correctionval options = Map("datetimeRebaseMode" -> "CORRECTED").asJava
val df = sourceDF.select(
from_avro(col("event_data"), schema, options).as("event")
)The avroSchema option allows specifying an evolved schema different from the encoded data:
val evolvedSchema = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "phone", "type": ["null", "string"], "default": null}
]
}
"""
val options = Map("avroSchema" -> evolvedSchema).asJava
val df = oldDataDF.select(
from_avro(col("user_bytes"), originalSchema, options).as("user")
)When schemas don't match the binary data:
import org.apache.spark.sql.AnalysisException
try {
val result = df.select(
from_avro(col("data"), invalidSchema).as("parsed")
).collect()
} catch {
case e: AnalysisException =>
println(s"Schema validation failed: ${e.getMessage}")
}Binary functions handle null input gracefully:
// null binary data results in null output
val dfWithNulls = df.select(
when(col("data").isNull, lit(null))
.otherwise(from_avro(col("data"), schema))
.as("parsed_data")
)When repeatedly accessing parsed Avro data:
val parsedDF = df.select(
from_avro(col("avro_data"), schema).as("parsed")
).cache()
// Multiple operations on the same parsed data
val aggregated = parsedDF.groupBy("parsed.category").count()
val filtered = parsedDF.filter(col("parsed.amount") > 100)For production use with schema registries:
// Pseudo-code for schema registry integration
def getSchemaFromRegistry(schemaId: Int): String = {
// Fetch schema from Confluent Schema Registry or similar
schemaRegistry.getSchemaById(schemaId).toString
}
val schemaId = 42
val schema = getSchemaFromRegistry(schemaId)
val decodedDF = kafkaDF.select(
from_avro(col("value"), schema).as("event_data")
)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-avro-2-12