Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format with automatic schema evolution support and built-in compression capabilities
npx @tessl/cli install tessl/maven-org-apache-spark--spark-avro-2-12@3.5.0Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format, enabling efficient reading, writing, and processing of Avro files with automatic schema evolution support and built-in compression capabilities.
Maven:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>3.5.6</version>
</dependency>SBT:
libraryDependencies += "org.apache.spark" %% "spark-avro" % "3.5.6"import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrameFor DataSource API:
import org.apache.spark.sql.avro.AvroOptionsimport org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("AvroExample")
.getOrCreate()
// Read Avro files using DataSource API
val df = spark.read
.format("avro")
.load("path/to/avro/files")
df.show()// Write DataFrame to Avro format
df.write
.format("avro")
.option("compression", "snappy")
.save("path/to/output")import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.functions._
// Convert binary Avro column to structured data
val schema = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
"""
val decodedDF = df.select(
from_avro(col("avro_bytes"), schema).as("user_data")
)
// Convert structured data to binary Avro
val encodedDF = df.select(
to_avro(struct(col("name"), col("age"))).as("avro_bytes")
)The Spark Avro connector is built around several key components:
from_avro, to_avro)DataSource API integration for reading and writing Avro files directly through Spark SQL with automatic schema inference and support for partitioned datasets.
// DataSource read/write through format("avro")
spark.read.format("avro"): DataFrameReader
DataFrame.write.format("avro"): DataFrameWriter[Row]Core functions for converting between binary Avro data and Spark SQL structures, enabling processing of Avro-encoded columns within DataFrames.
def from_avro(data: Column, jsonFormatSchema: String): Column
def from_avro(data: Column, jsonFormatSchema: String, options: java.util.Map[String, String]): Column
def to_avro(data: Column): Column
def to_avro(data: Column, jsonFormatSchema: String): ColumnUtilities for converting between Avro schemas and Spark SQL schemas, supporting complex nested types and logical type mappings.
object SchemaConverters {
case class SchemaType(dataType: DataType, nullable: Boolean)
def toSqlType(avroSchema: Schema): SchemaType
def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType
def toAvroType(
catalystType: DataType,
nullable: Boolean,
recordName: String,
nameSpace: String
): Schema
}Comprehensive configuration system for controlling Avro processing behavior including compression, schema evolution, field matching, and error handling.
object AvroOptions {
val COMPRESSION: String
val RECORD_NAME: String
val RECORD_NAMESPACE: String
val AVRO_SCHEMA: String
val AVRO_SCHEMA_URL: String
val POSITIONAL_FIELD_MATCHING: String
val DATETIME_REBASE_MODE: String
def apply(parameters: Map[String, String]): AvroOptions
}Core utility functions for schema inference, type validation, and write preparation.
object AvroUtils {
def inferSchema(
spark: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]
): Option[StructType]
def supportsDataType(dataType: DataType): Boolean
def prepareWrite(
sqlConf: SQLConf,
job: Job,
options: Map[String, String],
dataSchema: StructType
): OutputWriterFactory
}class AvroDataToCatalyst(
child: Expression,
jsonFormatSchema: String,
options: Map[String, String]
) extends UnaryExpression
class CatalystDataToAvro(
child: Expression,
jsonFormatSchema: Option[String]
) extends UnaryExpressionclass AvroOptions(
parameters: CaseInsensitiveMap[String],
conf: Configuration
) extends FileSourceOptions(parameters) {
val schema: Option[Schema]
val positionalFieldMatching: Boolean
val recordName: String
val recordNamespace: String
val compression: String
val parseMode: ParseMode
val datetimeRebaseModeInRead: String
}// Note: Part of AvroUtils object
case class AvroMatchedField(
catalystField: StructField,
catalystPosition: Int,
avroField: Schema.Field
)
class AvroSchemaHelper(
avroSchema: Schema,
catalystSchema: StructType,
avroPath: Seq[String],
catalystPath: Seq[String],
positionalFieldMatch: Boolean
) {
val matchedFields: Seq[AvroMatchedField]
def getAvroField(fieldName: String, catalystPos: Int): Option[Schema.Field]
def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit
def validateNoExtraAvroFields(): Unit
}The connector throws specific exceptions for schema incompatibilities and processing errors:
// Private to avro package
private[avro] class IncompatibleSchemaException(
msg: String,
ex: Throwable = null
) extends Exception(msg, ex)
private[avro] class UnsupportedAvroTypeException(msg: String) extends Exception(msg)Common error scenarios:
The package object provides deprecated convenience functions that delegate to the main functions API:
package org.apache.spark.sql.avro {
@deprecated("Please use 'org.apache.spark.sql.avro.functions.from_avro' instead.", "3.0.0")
def from_avro(data: Column, jsonFormatSchema: String): Column
@deprecated("Please use 'org.apache.spark.sql.avro.functions.to_avro' instead.", "3.0.0")
def to_avro(data: Column): Column
}Migration Example:
// Old deprecated usage
import org.apache.spark.sql.avro._
val result = from_avro(col("data"), schema)
// New recommended usage
import org.apache.spark.sql.avro.functions._
val result = from_avro(col("data"), schema)For Java-based MapReduce integrations:
class SparkAvroKeyOutputFormat extends OutputFormat[AvroKey[GenericRecord], NullWritable]