Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format with automatic schema evolution support and built-in compression capabilities
—
The Spark Avro connector provides DataSource API integration for reading and writing Avro files directly through Spark SQL with automatic schema inference, partitioning support, and optimized I/O performance.
spark.read.format("avro").load(path: String): DataFrame
spark.read.format("avro").load(paths: String*): DataFrameBasic Usage:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("AvroReader")
.getOrCreate()
// Read single file or directory
val df = spark.read.format("avro").load("path/to/file.avro")
// Read multiple paths
val df2 = spark.read.format("avro").load(
"path/to/file1.avro",
"path/to/file2.avro",
"path/to/directory"
)spark.read.format("avro").schema(schema: StructType): DataFrameReaderUsage with Custom Schema:
import org.apache.spark.sql.types._
val customSchema = StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("email", StringType, nullable = true)
))
val df = spark.read
.format("avro")
.schema(customSchema)
.load("path/to/avro/files")spark.read.format("avro").option(key: String, value: String): DataFrameReader
spark.read.format("avro").options(options: Map[String, String]): DataFrameReaderAvailable Options:
avroSchema: Specify evolved Avro schema for readingavroSchemaUrl: URL to load Avro schema frommode: Parse mode for handling corrupt records (PERMISSIVE, DROPMALFORMED, FAILFAST)positionalFieldMatching: Match fields by position instead of namedatetimeRebaseMode: Rebase DATE/TIMESTAMP values (EXCEPTION, LEGACY, CORRECTED)Usage Example:
val df = spark.read
.format("avro")
.option("mode", "DROPMALFORMED")
.option("avroSchema", evolvedSchemaJson)
.option("positionalFieldMatching", "true")
.load("path/to/avro/files")DataFrame.write.format("avro").save(path: String): Unit
DataFrame.write.format("avro").save(): DataFrameWriter[Row]Basic Usage:
// Write DataFrame to Avro files
df.write
.format("avro")
.save("path/to/output")
// Write with mode specification
df.write
.format("avro")
.mode("overwrite")
.save("path/to/output")DataFrame.write.format("avro").mode(saveMode: String): DataFrameWriter[Row]
DataFrame.write.format("avro").mode(saveMode: SaveMode): DataFrameWriter[Row]Supported modes:
overwrite: Overwrite existing dataappend: Append to existing dataignore: Ignore if data existserror (default): Throw error if data existsDataFrame.write.format("avro").partitionBy(colNames: String*): DataFrameWriter[Row]Usage Example:
df.write
.format("avro")
.partitionBy("year", "month")
.save("path/to/partitioned/output")DataFrame.write.format("avro").option(key: String, value: String): DataFrameWriter[Row]
DataFrame.write.format("avro").options(options: Map[String, String]): DataFrameWriter[Row]Available Options:
compression: Compression codec (uncompressed, snappy, deflate, bzip2, xz, zstandard)recordName: Top-level record name (default: "topLevelRecord")recordNamespace: Record namespace (default: "")avroSchema: Custom output Avro schemaUsage Example:
df.write
.format("avro")
.option("compression", "snappy")
.option("recordName", "UserRecord")
.option("recordNamespace", "com.example.avro")
.save("path/to/compressed/output")Reading with evolved schemas allows you to process Avro files with compatible but different schemas:
val evolvedSchema = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "created_at", "type": "long", "default": 0}
]
}
"""
val df = spark.read
.format("avro")
.option("avroSchema", evolvedSchema)
.load("path/to/older/avro/files")The connector supports predicate pushdown for efficient querying:
val filteredDF = spark.read
.format("avro")
.load("path/to/large/dataset")
.filter($"created_date" >= "2023-01-01")
.filter($"status" === "active")Automatic compression detection on read and configurable compression on write:
// Writing with different compression codecs
df.write.format("avro").option("compression", "snappy").save("snappy-output")
df.write.format("avro").option("compression", "deflate").save("deflate-output")
df.write.format("avro").option("compression", "bzip2").save("bzip2-output")// Drop corrupt records
val cleanDF = spark.read
.format("avro")
.option("mode", "DROPMALFORMED")
.load("path/to/files")
// Collect corrupt records in a column
val dfWithCorrupt = spark.read
.format("avro")
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.load("path/to/files")// Include files without .avro extension
val df = spark.read
.format("avro")
.option("ignoreExtension", "true") // deprecated - use pathGlobFilter instead
.load("path/to/mixed/files")
// Modern approach using pathGlobFilter
val df2 = spark.read
.format("avro")
.option("pathGlobFilter", "*.data")
.load("path/to/files")The connector also provides DataSource V2 implementation for enhanced performance:
class AvroDataSourceV2 extends FileDataSourceV2 {
def shortName(): String // Returns "avro"
def getTable(options: CaseInsensitiveStringMap): Table
def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table
}The DataSource V2 API provides the same functionality with improved performance characteristics and better integration with Spark's Catalyst optimizer.
class AvroTable extends FileTable
class AvroScan extends FileScan
class AvroScanBuilder extends FileScanBuilder
class AvroPartitionReaderFactory extends FilePartitionReaderFactory
class AvroWrite extends FileWriteThese classes work together to provide the complete DataSource V2 implementation, handling table metadata, scan planning, partition reading, and write operations.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-avro-2-12