Avro data source for Apache Spark SQL that provides functionality to read from and write to Avro files with DataFrames and Datasets
Apache Spark Avro provides comprehensive support for reading from and writing to Apache Avro files in Spark SQL. It enables seamless conversion between Avro binary format and Spark DataFrames/Datasets, with built-in functions for data transformation and robust schema handling.
Maven:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.13</artifactId>
<version>4.0.0</version>
</dependency>SBT:
libraryDependencies += "org.apache.spark" %% "spark-avro" % "4.0.0"import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.functions.colFor deprecated functions (not recommended):
import org.apache.spark.sql.avro.{from_avro, to_avro}Schema conversion utilities:
import org.apache.spark.sql.avro.SchemaConvertersPython API:
from pyspark.sql.avro.functions import from_avro, to_avroimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.functions._
val spark = SparkSession.builder()
.appName("AvroExample")
.getOrCreate()
// Reading Avro files
val df = spark.read.format("avro").load("path/to/avro/files")
// Writing Avro files
df.write.format("avro").save("path/to/output")
// Converting binary Avro data to DataFrame columns
val avroSchema = """{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}"""
val decodedDf = df.select(from_avro(col("avro_data"), avroSchema).as("user"))
// Converting DataFrame columns to binary Avro
val encodedDf = df.select(to_avro(col("user_struct")).as("avro_data"))
// Getting schema information
val schemaDf = spark.sql("SELECT schema_of_avro('"+avroSchema+"') AS avro_schema")Python Example:
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("AvroExample").getOrCreate()
# Reading Avro files
df = spark.read.format("avro").load("path/to/avro/files")
# Writing Avro files
df.write.format("avro").save("path/to/output")
# Converting binary Avro data to DataFrame columns
avro_schema = '{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}'
decoded_df = df.select(from_avro(col("avro_data"), avro_schema).alias("user"))
# Converting DataFrame columns to binary Avro
encoded_df = df.select(to_avro(col("user_struct")).alias("avro_data"))Apache Spark Avro is built around several key components:
from_avro, to_avro, schema_of_avro) for data conversion and schema operationsCore functions for converting between binary Avro data and Spark SQL columns. These functions handle schema-based serialization and deserialization with comprehensive type mapping.
def from_avro(data: Column, jsonFormatSchema: String): Column
def from_avro(data: Column, jsonFormatSchema: String, options: java.util.Map[String, String]): Column
def to_avro(data: Column): Column
def to_avro(data: Column, jsonFormatSchema: String): Column
def schema_of_avro(jsonFormatSchema: String): Column
def schema_of_avro(jsonFormatSchema: String, options: java.util.Map[String, String]): Columndef from_avro(data: ColumnOrName, jsonFormatSchema: str, options: Optional[Dict[str, str]] = None) -> Column
def to_avro(data: ColumnOrName, jsonFormatSchema: str = "") -> ColumnNative Spark DataSource V2 implementation for reading and writing Avro files with optimized performance and advanced features like schema inference and predicate pushdown.
// Reading
spark.read.format("avro").load(path)
spark.read.format("avro").option("avroSchema", schema).load(path)
// Writing
df.write.format("avro").save(path)
df.write.format("avro").option("compression", "snappy").save(path)Developer API for converting between Avro schemas and Spark SQL data types, supporting complex nested structures and logical types.
object SchemaConverters {
def toSqlType(avroSchema: org.apache.avro.Schema): SchemaType
def toSqlType(avroSchema: org.apache.avro.Schema, useStableIdForUnionType: Boolean,
stableIdPrefixForUnionType: String, recursiveFieldMaxDepth: Int): SchemaType
def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): org.apache.avro.Schema
}
case class SchemaType(dataType: DataType, nullable: Boolean)Comprehensive configuration options for controlling Avro processing behavior, including compression, schema handling, and parsing modes.
// Common options
Map(
"compression" -> "snappy",
"avroSchema" -> jsonSchema,
"mode" -> "PERMISSIVE",
"ignoreExtension" -> "true"
)// Schema conversion result
case class SchemaType(dataType: DataType, nullable: Boolean)// Compression codec enumeration
public enum AvroCompressionCodec {
UNCOMPRESSED("null", false),
DEFLATE("deflate", true),
SNAPPY("snappy", false),
BZIP2("bzip2", false),
XZ("xz", true),
ZSTANDARD("zstandard", true);
public String getCodecName();
public boolean getSupportCompressionLevel();
public String lowerCaseName();
public static AvroCompressionCodec fromString(String s);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-avro-2-13