Avro data source for Apache Spark SQL that provides functionality to read from and write to Avro files with DataFrames and Datasets
npx @tessl/cli install tessl/maven-org-apache-spark--spark-avro_2-13@4.0.0Apache Spark Avro provides comprehensive support for reading from and writing to Apache Avro files in Spark SQL. It enables seamless conversion between Avro binary format and Spark DataFrames/Datasets, with built-in functions for data transformation and robust schema handling.
Maven:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.13</artifactId>
<version>4.0.0</version>
</dependency>SBT:
libraryDependencies += "org.apache.spark" %% "spark-avro" % "4.0.0"import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.functions.colFor deprecated functions (not recommended):
import org.apache.spark.sql.avro.{from_avro, to_avro}Schema conversion utilities:
import org.apache.spark.sql.avro.SchemaConvertersPython API:
from pyspark.sql.avro.functions import from_avro, to_avroimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.functions._
val spark = SparkSession.builder()
.appName("AvroExample")
.getOrCreate()
// Reading Avro files
val df = spark.read.format("avro").load("path/to/avro/files")
// Writing Avro files
df.write.format("avro").save("path/to/output")
// Converting binary Avro data to DataFrame columns
val avroSchema = """{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}"""
val decodedDf = df.select(from_avro(col("avro_data"), avroSchema).as("user"))
// Converting DataFrame columns to binary Avro
val encodedDf = df.select(to_avro(col("user_struct")).as("avro_data"))
// Getting schema information
val schemaDf = spark.sql("SELECT schema_of_avro('"+avroSchema+"') AS avro_schema")Python Example:
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("AvroExample").getOrCreate()
# Reading Avro files
df = spark.read.format("avro").load("path/to/avro/files")
# Writing Avro files
df.write.format("avro").save("path/to/output")
# Converting binary Avro data to DataFrame columns
avro_schema = '{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}'
decoded_df = df.select(from_avro(col("avro_data"), avro_schema).alias("user"))
# Converting DataFrame columns to binary Avro
encoded_df = df.select(to_avro(col("user_struct")).alias("avro_data"))Apache Spark Avro is built around several key components:
from_avro, to_avro, schema_of_avro) for data conversion and schema operationsCore functions for converting between binary Avro data and Spark SQL columns. These functions handle schema-based serialization and deserialization with comprehensive type mapping.
def from_avro(data: Column, jsonFormatSchema: String): Column
def from_avro(data: Column, jsonFormatSchema: String, options: java.util.Map[String, String]): Column
def to_avro(data: Column): Column
def to_avro(data: Column, jsonFormatSchema: String): Column
def schema_of_avro(jsonFormatSchema: String): Column
def schema_of_avro(jsonFormatSchema: String, options: java.util.Map[String, String]): Columndef from_avro(data: ColumnOrName, jsonFormatSchema: str, options: Optional[Dict[str, str]] = None) -> Column
def to_avro(data: ColumnOrName, jsonFormatSchema: str = "") -> ColumnNative Spark DataSource V2 implementation for reading and writing Avro files with optimized performance and advanced features like schema inference and predicate pushdown.
// Reading
spark.read.format("avro").load(path)
spark.read.format("avro").option("avroSchema", schema).load(path)
// Writing
df.write.format("avro").save(path)
df.write.format("avro").option("compression", "snappy").save(path)Developer API for converting between Avro schemas and Spark SQL data types, supporting complex nested structures and logical types.
object SchemaConverters {
def toSqlType(avroSchema: org.apache.avro.Schema): SchemaType
def toSqlType(avroSchema: org.apache.avro.Schema, useStableIdForUnionType: Boolean,
stableIdPrefixForUnionType: String, recursiveFieldMaxDepth: Int): SchemaType
def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): org.apache.avro.Schema
}
case class SchemaType(dataType: DataType, nullable: Boolean)Comprehensive configuration options for controlling Avro processing behavior, including compression, schema handling, and parsing modes.
// Common options
Map(
"compression" -> "snappy",
"avroSchema" -> jsonSchema,
"mode" -> "PERMISSIVE",
"ignoreExtension" -> "true"
)// Schema conversion result
case class SchemaType(dataType: DataType, nullable: Boolean)// Compression codec enumeration
public enum AvroCompressionCodec {
UNCOMPRESSED("null", false),
DEFLATE("deflate", true),
SNAPPY("snappy", false),
BZIP2("bzip2", false),
XZ("xz", true),
ZSTANDARD("zstandard", true);
public String getCodecName();
public boolean getSupportCompressionLevel();
public String lowerCaseName();
public static AvroCompressionCodec fromString(String s);
}