Comprehensive file-based data source functionality for reading from and writing to Avro files using Spark's standard DataFrame read/write API. Supports automatic schema inference, configurable compression, and efficient distributed processing of Avro data.
Read Avro files using Spark's DataFrameReader with automatic schema inference and configurable options.
// Basic read operations
spark.read.format("avro").load(path: String): DataFrame
spark.read.format("avro").load(paths: String*): DataFrame
// With options
spark.read.format("avro")
.option(key: String, value: String)
.load(path: String): DataFrameUsage Examples:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Avro Reader Example")
.getOrCreate()
// Read single file
val df1 = spark.read
.format("avro")
.load("path/to/data.avro")
// Read multiple files
val df2 = spark.read
.format("avro")
.load("path/to/dir1", "path/to/dir2")
// Read with glob pattern
val df3 = spark.read
.format("avro")
.load("path/to/data/*.avro")
// Read with custom schema
val df4 = spark.read
.format("avro")
.option("avroSchema", customSchemaJson)
.load("path/to/data.avro")
// Read ignoring file extensions
val df5 = spark.read
.format("avro")
.option("ignoreExtension", "true")
.load("path/to/files_without_extension")Write DataFrames to Avro files with configurable compression and schema options.
// Basic write operations
df.write.format("avro").save(path: String): Unit
df.write.format("avro").mode(saveMode: SaveMode).save(path: String): Unit
// With options
df.write.format("avro")
.option(key: String, value: String)
.mode(saveMode: SaveMode)
.save(path: String): UnitUsage Examples:
import org.apache.spark.sql.SaveMode
val df = spark.table("source_data")
// Basic write
df.write
.format("avro")
.save("path/to/output")
// Write with compression
df.write
.format("avro")
.option("compression", "snappy")
.mode(SaveMode.Overwrite)
.save("path/to/compressed_output")
// Write with custom record name and namespace
df.write
.format("avro")
.option("recordName", "MyRecord")
.option("recordNamespace", "com.example.data")
.mode(SaveMode.Append)
.save("path/to/named_output")
// Write with custom schema
val customSchema = """{
"type": "record",
"name": "CustomRecord",
"fields": [
{"name": "id", "type": "long"},
{"name": "value", "type": "string"}
]
}"""
df.write
.format("avro")
.option("avroSchema", customSchema)
.save("path/to/custom_schema_output")avroSchema | None | Custom Avro schema in JSON format | Read/Write | \"{\\\"type\\\": \\\"record\\\", ...}\" |\n| recordName | \"topLevelRecord\" | Top-level record name in Avro schema | Write | \"UserRecord\" |\n| recordNamespace | \"\" | Record namespace in Avro schema | Write | \"com.example.data\" |\n| ignoreExtension | true | Ignore file extensions when reading | Read | \"false\" |\n| compression | \"snappy\" | Compression codec for write operations | Write | \"deflate\" |\n\n### Schema InferenceAutomatic schema detection from Avro files with support for schema evolution.
// Schema inference behavior
val inferredSchema = spark.read.format("avro").load(path).schemaSchema Inference Examples:
// Inspect inferred schema
val df = spark.read.format("avro").load("path/to/data.avro")
df.printSchema()
// Schema inference with multiple files (uses first readable file)
val multiFileDF = spark.read
.format("avro")
.load("path/to/multiple/*.avro")
// Handle corrupt files during schema inference
val robustDF = spark.read
.format("avro")
.option("ignoreCorruptFiles", "true")
.load("path/to/possibly_corrupt/*.avro")Avro files are splittable, enabling efficient distributed processing.
// Avro files support splitting for parallel processing
val parallelDF = spark.read
.format("avro")
.load("large_avro_file.avro")
// Spark automatically splits files across partitions
val partitionCount = parallelDF.rdd.getNumPartitionsConfiguration options for customizing read behavior:
// Available read options
.option("avroSchema", jsonSchemaString) // Custom schema
.option("ignoreExtension", "true|false") // Ignore .avro extension requirement
.option("ignoreCorruptFiles", "true|false") // Skip corrupt files during processingConfiguration options for customizing write behavior:
// Available write options
.option("avroSchema", jsonSchemaString) // Custom output schema
.option("recordName", recordName) // Top-level record name
.option("recordNamespace", namespace) // Record namespace
.option("compression", compressionCodec) // Compression: snappy, deflate, bzip2, xz, uncompressedMultiple compression codecs are supported for write operations:
Supported Compression Codecs:
// Compression options
"snappy" // Default - balanced compression and speed
"deflate" // Good compression ratio, configurable level
"bzip2" // High compression ratio, slower
"xz" // High compression ratio, slower
"uncompressed" // No compressionCompression Examples:
// Snappy compression (default)
df.write.format("avro").save("snappy_output")
// Deflate with custom level (configured via Spark config)
spark.conf.set("spark.sql.avro.deflate.level", "6")
df.write
.format("avro")
.option("compression", "deflate")
.save("deflate_output")
// High compression with bzip2
df.write
.format("avro")
.option("compression", "bzip2")
.save("bzip2_output")Common error scenarios and handling strategies:
// Handle missing files
try {
val df = spark.read.format("avro").load("missing_file.avro")
} catch {
case e: org.apache.spark.sql.AnalysisException =>
println(s"File not found: ${e.getMessage}")
}
// Handle schema evolution conflicts
try {
val df = spark.read
.format("avro")
.option("avroSchema", strictSchema)
.load("evolved_schema_files/*.avro")
} catch {
case e: Exception =>
println(s"Schema compatibility issue: ${e.getMessage}")
}