Apache Spark SQL connector for reading and writing Avro data with comprehensive serialization capabilities
npx @tessl/cli install tessl/maven-org-apache-spark--spark-avro_2-11@2.4.0Apache Spark SQL connector for reading and writing Avro data. The spark-avro library provides comprehensive functionality for working with Apache Avro format in Spark SQL applications, including serialization, deserialization, and schema conversion capabilities for high-performance big data processing scenarios where Avro's schema evolution and compact binary serialization are essential.
org.apache.spark:spark-avro_2.11:2.4.8--packages org.apache.spark:spark-avro_2.11:2.4.8--packages org.apache.spark:spark-avro_2.11:2.4.8Maven Dependency:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.8</version>
</dependency>Note: The spark-avro module is external and not included in spark-submit or spark-shell by default.
import org.apache.spark.sql.avro._For DataFrame operations:
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions.colimport org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.avro._
import org.apache.spark.sql.functions.col
// Initialize SparkSession
val spark = SparkSession.builder()
.appName("Spark Avro Example")
.config("spark.sql.extensions", "org.apache.spark.sql.avro.AvroExtensions")
.getOrCreate()
import spark.implicits._
// Read from Avro files
val df = spark.read
.format("avro")
.load("path/to/avro/files")
// Write to Avro files
df.write
.format("avro")
.mode("overwrite")
.save("path/to/output")
// Convert binary Avro data in columns
val avroSchema = """{"type": "record", "name": "User", "fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]}"""
val transformedDF = df.select(
from_avro(col("avroData"), avroSchema).as("userData"),
to_avro(col("structData")).as("binaryAvro")
)Spark Avro is built around several key components:
AvroFileFormat implements Spark's FileFormat and DataSourceRegister interfacesSchemaConverters provides bidirectional mapping between Avro and Spark SQL schemasAvroSerializer and AvroDeserializer handle data conversion between Catalyst and Avro formatsfrom_avro and to_avro expressions for working with binary Avro data in DataFrame columnsAvroOptions provides comprehensive configuration for read/write operationsCore functions for working with Avro binary data in DataFrame columns, enabling conversion between Catalyst internal format and Avro binary format within SQL queries and DataFrame operations.
def from_avro(data: Column, jsonFormatSchema: String): Column
def to_avro(data: Column): ColumnComprehensive file-based data source functionality for reading from and writing to Avro files using Spark's standard DataFrame read/write API with automatic schema inference and configurable options.
// Reading
spark.read.format("avro").option(key, value).load(path)
// Writing
df.write.format("avro").mode(saveMode).option(key, value).save(path)Utilities for converting between Apache Avro schemas and Spark SQL schemas, supporting complex nested types, logical types, and schema evolution patterns.
object SchemaConverters {
def toSqlType(avroSchema: Schema): SchemaType
def toAvroType(catalystType: DataType, nullable: Boolean = false,
recordName: String = "topLevelRecord", nameSpace: String = ""): Schema
}
case class SchemaType(dataType: DataType, nullable: Boolean)Comprehensive configuration system for customizing Avro read and write operations, including schema specification, compression settings, and file handling options.
class AvroOptions(parameters: Map[String, String], conf: Configuration) {
val schema: Option[String]
val recordName: String
val recordNamespace: String
val ignoreExtension: Boolean
val compression: String
}// Core Spark SQL types
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
// Avro schema types
import org.apache.avro.Schema
// Configuration and options
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
// Exception types
case class IncompatibleSchemaException(msg: String) extends Exception(msg)