or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

binary-functions.mdconfiguration.mdfile-operations.mdindex.mdschema-conversion.md
tile.json

tessl/maven-org-apache-spark--spark-avro-2-12

Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format with automatic schema evolution support and built-in compression capabilities

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-avro_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-avro-2-12@3.5.0

index.mddocs/

Apache Spark Avro Connector

Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format, enabling efficient reading, writing, and processing of Avro files with automatic schema evolution support and built-in compression capabilities.

Package Information

  • Package Name: spark-avro_2.12
  • Package Type: Maven
  • Coordinate: org.apache.spark:spark-avro_2.12
  • Version: 3.5.6
  • Language: Scala (with Java interoperability)
  • Installation: Add to your build.gradle or pom.xml:

Maven:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-avro_2.12</artifactId>
  <version>3.5.6</version>
</dependency>

SBT:

libraryDependencies += "org.apache.spark" %% "spark-avro" % "3.5.6"

Core Imports

import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame

For DataSource API:

import org.apache.spark.sql.avro.AvroOptions

Basic Usage

Reading Avro Files

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("AvroExample")
  .getOrCreate()

// Read Avro files using DataSource API
val df = spark.read
  .format("avro")
  .load("path/to/avro/files")

df.show()

Writing Avro Files

// Write DataFrame to Avro format
df.write
  .format("avro")
  .option("compression", "snappy")
  .save("path/to/output")

Processing Binary Avro Data

import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.functions._

// Convert binary Avro column to structured data
val schema = """
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}
"""

val decodedDF = df.select(
  from_avro(col("avro_bytes"), schema).as("user_data")
)

// Convert structured data to binary Avro
val encodedDF = df.select(
  to_avro(struct(col("name"), col("age"))).as("avro_bytes")
)

Architecture

The Spark Avro connector is built around several key components:

  • DataSource Integration: Native Spark SQL DataSource v1 and v2 implementations for seamless file I/O
  • Function API: SQL functions for binary Avro data transformation (from_avro, to_avro)
  • Schema Conversion: Bidirectional schema mapping between Avro and Spark SQL data types
  • Serialization Engine: High-performance Catalyst expression-based serializers and deserializers
  • Configuration System: Comprehensive options for compression, schema evolution, and compatibility settings

Capabilities

File I/O Operations

DataSource API integration for reading and writing Avro files directly through Spark SQL with automatic schema inference and support for partitioned datasets.

// DataSource read/write through format("avro")
spark.read.format("avro"): DataFrameReader
DataFrame.write.format("avro"): DataFrameWriter[Row]

File Operations

Binary Data Processing

Core functions for converting between binary Avro data and Spark SQL structures, enabling processing of Avro-encoded columns within DataFrames.

def from_avro(data: Column, jsonFormatSchema: String): Column
def from_avro(data: Column, jsonFormatSchema: String, options: java.util.Map[String, String]): Column
def to_avro(data: Column): Column
def to_avro(data: Column, jsonFormatSchema: String): Column

Binary Data Functions

Schema Conversion

Utilities for converting between Avro schemas and Spark SQL schemas, supporting complex nested types and logical type mappings.

object SchemaConverters {
  case class SchemaType(dataType: DataType, nullable: Boolean)
  
  def toSqlType(avroSchema: Schema): SchemaType
  def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType
  def toAvroType(
    catalystType: DataType, 
    nullable: Boolean, 
    recordName: String, 
    nameSpace: String
  ): Schema
}

Schema Conversion

Configuration Options

Comprehensive configuration system for controlling Avro processing behavior including compression, schema evolution, field matching, and error handling.

object AvroOptions {
  val COMPRESSION: String
  val RECORD_NAME: String
  val RECORD_NAMESPACE: String
  val AVRO_SCHEMA: String
  val AVRO_SCHEMA_URL: String
  val POSITIONAL_FIELD_MATCHING: String
  val DATETIME_REBASE_MODE: String
  
  def apply(parameters: Map[String, String]): AvroOptions
}

Configuration

Utility Functions

Core utility functions for schema inference, type validation, and write preparation.

object AvroUtils {
  def inferSchema(
    spark: SparkSession,
    options: Map[String, String],
    files: Seq[FileStatus]
  ): Option[StructType]
  
  def supportsDataType(dataType: DataType): Boolean
  
  def prepareWrite(
    sqlConf: SQLConf,
    job: Job,
    options: Map[String, String],
    dataSchema: StructType
  ): OutputWriterFactory
}

Types

Core Expression Types

class AvroDataToCatalyst(
  child: Expression,
  jsonFormatSchema: String,
  options: Map[String, String]
) extends UnaryExpression

class CatalystDataToAvro(
  child: Expression,
  jsonFormatSchema: Option[String]
) extends UnaryExpression

Option Types

class AvroOptions(
  parameters: CaseInsensitiveMap[String],
  conf: Configuration
) extends FileSourceOptions(parameters) {
  val schema: Option[Schema]
  val positionalFieldMatching: Boolean
  val recordName: String
  val recordNamespace: String
  val compression: String
  val parseMode: ParseMode
  val datetimeRebaseModeInRead: String
}

Utility Types

// Note: Part of AvroUtils object
case class AvroMatchedField(
  catalystField: StructField,
  catalystPosition: Int,
  avroField: Schema.Field
)

class AvroSchemaHelper(
  avroSchema: Schema,
  catalystSchema: StructType,
  avroPath: Seq[String],
  catalystPath: Seq[String],
  positionalFieldMatch: Boolean
) {
  val matchedFields: Seq[AvroMatchedField]
  def getAvroField(fieldName: String, catalystPos: Int): Option[Schema.Field]
  def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit
  def validateNoExtraAvroFields(): Unit
}

Error Handling

The connector throws specific exceptions for schema incompatibilities and processing errors:

// Private to avro package
private[avro] class IncompatibleSchemaException(
  msg: String, 
  ex: Throwable = null
) extends Exception(msg, ex)

private[avro] class UnsupportedAvroTypeException(msg: String) extends Exception(msg)

Common error scenarios:

  • Schema mismatch between Avro and Spark SQL schemas
  • Invalid JSON schema format
  • Unsupported data type conversions
  • Corrupt or unreadable Avro files

Deprecated Functionality

Package Object Functions (Deprecated since 3.0.0)

The package object provides deprecated convenience functions that delegate to the main functions API:

package org.apache.spark.sql.avro {
  @deprecated("Please use 'org.apache.spark.sql.avro.functions.from_avro' instead.", "3.0.0")
  def from_avro(data: Column, jsonFormatSchema: String): Column
  
  @deprecated("Please use 'org.apache.spark.sql.avro.functions.to_avro' instead.", "3.0.0")
  def to_avro(data: Column): Column
}

Migration Example:

// Old deprecated usage
import org.apache.spark.sql.avro._
val result = from_avro(col("data"), schema)

// New recommended usage
import org.apache.spark.sql.avro.functions._
val result = from_avro(col("data"), schema)

Java Integration Components

For Java-based MapReduce integrations:

class SparkAvroKeyOutputFormat extends OutputFormat[AvroKey[GenericRecord], NullWritable]