The Avro DataSource provides native Spark integration for reading and writing Avro files through the DataSource V2 API. It offers optimized performance with features like predicate pushdown, column pruning, and automatic schema inference.
val df = spark.read.format("avro").load(path: String)
val df = spark.read.format("avro").load(paths: String*)Usage Example:
// Read single file
val df = spark.read.format("avro").load("path/to/file.avro")
// Read multiple files
val df = spark.read.format("avro").load("file1.avro", "file2.avro")
// Read directory of Avro files
val df = spark.read.format("avro").load("path/to/avro/directory")Specify a schema for consistent structure across files:
val df = spark.read.format("avro")
.option("avroSchema", jsonSchema)
.load(path)Usage Example:
val userSchema = """{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}"""
val df = spark.read.format("avro")
.option("avroSchema", userSchema)
.load("users/*.avro")val df = spark.read.format("avro")
.option("ignoreExtension", "true")
.option("mode", "PERMISSIVE")
.load(path)Common Read Options:
ignoreExtension: Read files regardless of extension (default: false)mode: Error handling mode (PERMISSIVE, DROPMALFORMED, FAILFAST)avroSchema: Override schema for readingrecursiveFieldMaxDepth: Maximum recursion depth for nested fieldsdf.write.format("avro").save(path: String)Usage Example:
val df = spark.range(1000).select($"id", ($"id" * 2).as("value"))
df.write.format("avro").save("output/numbers.avro")df.write.format("avro")
.option("compression", codec)
.save(path)Available Compression Codecs:
uncompressed (default)snappydeflatebzip2xzzstandardUsage Example:
df.write.format("avro")
.option("compression", "snappy")
.save("output/compressed.avro")df.write.format("avro")
.option("avroSchema", jsonSchema)
.save(path)Usage Example:
val outputSchema = """{
"type": "record",
"name": "Output",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "long"},
{"name": "value", "type": "double"}
]
}"""
df.write.format("avro")
.option("avroSchema", outputSchema)
.save("output/custom-schema.avro")df.write.format("avro")
.partitionBy(columns: String*)
.save(path)Usage Example:
val salesDf = spark.read.format("avro").load("sales.avro")
salesDf.write.format("avro")
.partitionBy("year", "month")
.option("compression", "snappy")
.save("partitioned-sales")The DataSource supports schema evolution when reading multiple files:
// Reads files with different but compatible schemas
val df = spark.read.format("avro")
.option("mode", "PERMISSIVE")
.load("evolving-schema/*.avro")Automatically pushes down filter predicates to reduce I/O:
val filteredDf = spark.read.format("avro")
.load("large-dataset.avro")
.filter($"age" > 21) // Filter pushed down to file scanningOnly reads required columns from files:
val projectedDf = spark.read.format("avro")
.load("users.avro")
.select("name", "email") // Only reads name and email columnsUse Avro files directly in SQL queries:
// Create temporary view
spark.read.format("avro").load("users.avro").createOrReplaceTempView("users")
// Query with SQL
spark.sql("SELECT name, age FROM users WHERE age > 18")
// Create external table
spark.sql("""
CREATE TABLE user_data
USING avro
LOCATION 'path/to/avro/files'
""")Map(
"compression" -> "snappy", // Compression codec
"avroSchema" -> jsonSchema, // Custom output schema
"recordName" -> "MyRecord", // Record name for schema generation
"recordNamespace" -> "com.example" // Namespace for generated schema
)Map(
"ignoreExtension" -> "true", // Ignore file extensions
"mode" -> "PERMISSIVE", // Error handling mode
"avroSchema" -> jsonSchema, // Override input schema
"recursiveFieldMaxDepth" -> "5" // Max recursion depth
)// Expensive - scans all files
val df1 = spark.read.format("avro").load("many-files/*.avro")
// Efficient - uses provided schema
val df2 = spark.read.format("avro")
.option("avroSchema", knownSchema)
.load("many-files/*.avro")