CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-avro-2-12

Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format with automatic schema evolution support and built-in compression capabilities

Pending
Overview
Eval results
Files

binary-functions.mddocs/

Binary Data Functions

The Spark Avro connector provides SQL functions for converting between binary Avro data and Spark SQL structures. These functions enable processing of Avro-encoded columns within DataFrames, useful for working with message queues, event streams, and embedded binary Avro data.

Core Functions

from_avro Function

Converts binary Avro data to Spark SQL structures using a provided schema.

def from_avro(data: Column, jsonFormatSchema: String): Column

def from_avro(
  data: Column, 
  jsonFormatSchema: String, 
  options: java.util.Map[String, String]
): Column

Parameters:

  • data: Column containing binary Avro data (BinaryType)
  • jsonFormatSchema: Avro schema in JSON string format
  • options: Additional options for deserialization (optional)

Returns: Column with deserialized Spark SQL data structure

Basic Usage:

import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.functions._

val schema = """
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": ["null", "string"], "default": null}
  ]
}
"""

val df = spark.table("kafka_topic")
val decodedDF = df.select(
  from_avro(col("value"), schema).as("user_data")
)

decodedDF.select(
  col("user_data.id"),
  col("user_data.name"),
  col("user_data.email")
).show()

Usage with Options:

import scala.collection.JavaConverters._

val options = Map(
  "mode" -> "PERMISSIVE",
  "datetimeRebaseMode" -> "CORRECTED"
).asJava

val decodedDF = df.select(
  from_avro(col("avro_bytes"), schema, options).as("parsed_data")
)

to_avro Function

Converts Spark SQL data structures to binary Avro format.

def to_avro(data: Column): Column

def to_avro(data: Column, jsonFormatSchema: String): Column

Parameters:

  • data: Column with Spark SQL data structure
  • jsonFormatSchema: Target Avro schema in JSON format (optional)

Returns: Column with binary Avro data (BinaryType)

Basic Usage:

import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.functions._

val df = spark.table("users")

// Convert entire row to Avro
val avroDF = df.select(
  to_avro(struct(col("*"))).as("avro_data")
)

// Convert specific columns to Avro
val avroDF2 = df.select(
  col("id"),
  to_avro(struct(
    col("name"), 
    col("email"), 
    col("created_at")
  )).as("user_avro")
)

Usage with Custom Schema:

val outputSchema = """
{
  "type": "record",
  "name": "UserOutput",
  "namespace": "com.example",
  "fields": [
    {"name": "user_id", "type": "long"},
    {"name": "full_name", "type": "string"},
    {"name": "contact_email", "type": ["null", "string"]}
  ]
}
"""

val avroDF = df.select(
  to_avro(
    struct(
      col("id").as("user_id"),
      col("name").as("full_name"),
      col("email").as("contact_email")
    ),
    outputSchema
  ).as("formatted_avro")
)

Advanced Usage Patterns

Roundtrip Processing

Converting data from Avro to Spark SQL structures, processing, and back to Avro:

val schema = """
{
  "type": "record",
  "name": "Event",
  "fields": [
    {"name": "event_id", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "user_id", "type": "long"},
    {"name": "properties", "type": {"type": "map", "values": "string"}}
  ]
}
"""

val processedDF = df
  .select(from_avro(col("event_data"), schema).as("event"))
  .select(
    col("event.event_id"),
    col("event.timestamp"),
    col("event.user_id"),
    col("event.properties")
  )
  .filter(col("timestamp") > unix_timestamp() - 3600) // Last hour only
  .withColumn("processed_at", current_timestamp())
  .select(
    to_avro(struct(
      col("event_id"),
      col("timestamp"),
      col("user_id"),
      col("properties"),
      col("processed_at")
    )).as("processed_event")
  )

Kafka Integration

Common pattern for processing Avro messages from Kafka:

val kafkaDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "avro-events")
  .load()

val schema = """
{
  "type": "record",
  "name": "KafkaEvent",
  "fields": [
    {"name": "event_type", "type": "string"},
    {"name": "payload", "type": "string"},
    {"name": "metadata", "type": {"type": "map", "values": "string"}}
  ]
}
"""

val decodedDF = kafkaDF.select(
  col("key").cast("string").as("message_key"),
  from_avro(col("value"), schema).as("event_data"),
  col("timestamp").as("kafka_timestamp")
).select(
  col("message_key"),
  col("event_data.event_type"),
  col("event_data.payload"),
  col("event_data.metadata"),
  col("kafka_timestamp")
)

Nested Schema Handling

Working with complex nested types:

val nestedSchema = """
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer", "type": {
      "type": "record",
      "name": "Customer",
      "fields": [
        {"name": "id", "type": "long"},
        {"name": "name", "type": "string"},
        {"name": "address", "type": {
          "type": "record",
          "name": "Address",
          "fields": [
            {"name": "street", "type": "string"},
            {"name": "city", "type": "string"},
            {"name": "country", "type": "string"}
          ]
        }}
      ]
    }},
    {"name": "items", "type": {
      "type": "array",
      "items": {
        "type": "record",
        "name": "Item",
        "fields": [
          {"name": "sku", "type": "string"},
          {"name": "quantity", "type": "int"},
          {"name": "price", "type": "double"}
        ]
      }
    }}
  ]
}
"""

val orderDF = df.select(
  from_avro(col("order_data"), nestedSchema).as("order")
).select(
  col("order.order_id"),
  col("order.customer.name").as("customer_name"),
  col("order.customer.address.city").as("city"),
  size(col("order.items")).as("item_count"),
  expr("aggregate(order.items, 0.0, (acc, item) -> acc + item.price * item.quantity)").as("total_amount")
)

Options for from_avro

When using the three-parameter version of from_avro, the following options are supported:

Mode Options

  • PERMISSIVE (default): Parse all records, set corrupt records to null
  • DROPMALFORMED: Drop records that don't match the schema
  • FAILFAST: Throw exception on first corrupt record
val options = Map("mode" -> "DROPMALFORMED").asJava
val cleanDF = df.select(
  from_avro(col("data"), schema, options).as("parsed")
).filter(col("parsed").isNotNull)

DateTime Rebase Mode

Controls handling of DATE and TIMESTAMP values:

  • EXCEPTION: Throw exception for dates that need rebasing
  • LEGACY: Use legacy Julian calendar handling
  • CORRECTED: Apply proleptic Gregorian calendar correction
val options = Map("datetimeRebaseMode" -> "CORRECTED").asJava
val df = sourceDF.select(
  from_avro(col("event_data"), schema, options).as("event")
)

Schema Evolution Support

The avroSchema option allows specifying an evolved schema different from the encoded data:

val evolvedSchema = """
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "phone", "type": ["null", "string"], "default": null}
  ]
}
"""

val options = Map("avroSchema" -> evolvedSchema).asJava
val df = oldDataDF.select(
  from_avro(col("user_bytes"), originalSchema, options).as("user")
)

Error Handling

Schema Mismatch Errors

When schemas don't match the binary data:

import org.apache.spark.sql.AnalysisException

try {
  val result = df.select(
    from_avro(col("data"), invalidSchema).as("parsed")
  ).collect()
} catch {
  case e: AnalysisException => 
    println(s"Schema validation failed: ${e.getMessage}")
}

Null Handling

Binary functions handle null input gracefully:

// null binary data results in null output
val dfWithNulls = df.select(
  when(col("data").isNull, lit(null))
    .otherwise(from_avro(col("data"), schema))
    .as("parsed_data")
)

Performance Considerations

Caching Parsed Data

When repeatedly accessing parsed Avro data:

val parsedDF = df.select(
  from_avro(col("avro_data"), schema).as("parsed")
).cache()

// Multiple operations on the same parsed data
val aggregated = parsedDF.groupBy("parsed.category").count()
val filtered = parsedDF.filter(col("parsed.amount") > 100)

Schema Registry Integration

For production use with schema registries:

// Pseudo-code for schema registry integration
def getSchemaFromRegistry(schemaId: Int): String = {
  // Fetch schema from Confluent Schema Registry or similar
  schemaRegistry.getSchemaById(schemaId).toString
}

val schemaId = 42
val schema = getSchemaFromRegistry(schemaId)

val decodedDF = kafkaDF.select(
  from_avro(col("value"), schema).as("event_data")
)

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-avro-2-12

docs

binary-functions.md

configuration.md

file-operations.md

index.md

schema-conversion.md

tile.json