Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format with automatic schema evolution support and built-in compression capabilities
—
The Spark Avro connector provides comprehensive configuration options through the AvroOptions class and related constants. These options control various aspects of Avro processing including compression, schema handling, field matching, and error handling.
The main configuration class for Avro operations.
class AvroOptions(
parameters: CaseInsensitiveMap[String],
conf: Configuration
) extends FileSourceOptions(parameters) {
val schema: Option[Schema]
val positionalFieldMatching: Boolean
val recordName: String
val recordNamespace: String
val compression: String
val parseMode: ParseMode
val datetimeRebaseModeInRead: String
val useStableIdForUnionType: Boolean
}object AvroOptions {
def apply(parameters: Map[String, String]): AvroOptions
}Usage Example:
import org.apache.spark.sql.avro.AvroOptions
val options = AvroOptions(Map(
"compression" -> "snappy",
"recordName" -> "MyRecord",
"recordNamespace" -> "com.example.avro"
))
println(s"Compression: ${options.compression}")
println(s"Record Name: ${options.recordName}")All configuration option keys are defined as constants in the AvroOptions companion object.
object AvroOptions {
val COMPRESSION: String
val RECORD_NAME: String
val RECORD_NAMESPACE: String
val AVRO_SCHEMA: String
val AVRO_SCHEMA_URL: String
val POSITIONAL_FIELD_MATCHING: String
val DATETIME_REBASE_MODE: String
val MODE: String
val IGNORE_EXTENSION: String // Deprecated
val STABLE_ID_FOR_UNION_TYPE: String
}Specify a custom Avro schema for reading or writing.
Option Key: AvroOptions.AVRO_SCHEMA ("avroSchema")
Usage in Reading:
val customSchema = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
"""
val df = spark.read
.format("avro")
.option(AvroOptions.AVRO_SCHEMA, customSchema)
.load("path/to/avro/files")Usage in Writing:
df.write
.format("avro")
.option(AvroOptions.AVRO_SCHEMA, customSchema)
.save("path/to/output")Load Avro schema from a URL or file path.
Option Key: AvroOptions.AVRO_SCHEMA_URL ("avroSchemaUrl")
val df = spark.read
.format("avro")
.option(AvroOptions.AVRO_SCHEMA_URL, "hdfs://cluster/schemas/user.avsc")
.load("path/to/avro/files")
// Also works with local files
val df2 = spark.read
.format("avro")
.option(AvroOptions.AVRO_SCHEMA_URL, "file:///local/path/schema.avsc")
.load("path/to/avro/files")Set the top-level record name for writing Avro files.
Option Key: AvroOptions.RECORD_NAME ("recordName")
Default: "topLevelRecord"
df.write
.format("avro")
.option(AvroOptions.RECORD_NAME, "UserRecord")
.save("path/to/output")Set the namespace for the top-level record.
Option Key: AvroOptions.RECORD_NAMESPACE ("recordNamespace")
Default: "" (empty string)
df.write
.format("avro")
.option(AvroOptions.RECORD_NAME, "User")
.option(AvroOptions.RECORD_NAMESPACE, "com.example.avro")
.save("path/to/output")Set the compression codec for writing Avro files.
Option Key: AvroOptions.COMPRESSION ("compression")
Default: Value from spark.sql.avro.compression.codec or "snappy"
Supported Codecs:
uncompressed: No compressionsnappy: Snappy compression (default)deflate: Deflate compressionbzip2: Bzip2 compressionxz: XZ compressionzstandard: Zstandard compression// Different compression options
df.write.format("avro").option(AvroOptions.COMPRESSION, "uncompressed").save("uncompressed")
df.write.format("avro").option(AvroOptions.COMPRESSION, "snappy").save("snappy")
df.write.format("avro").option(AvroOptions.COMPRESSION, "deflate").save("deflate")
df.write.format("avro").option(AvroOptions.COMPRESSION, "bzip2").save("bzip2")
df.write.format("avro").option(AvroOptions.COMPRESSION, "xz").save("xz")
df.write.format("avro").option(AvroOptions.COMPRESSION, "zstandard").save("zstandard")Control how fields are matched between Spark and Avro schemas.
Option Key: AvroOptions.POSITIONAL_FIELD_MATCHING ("positionalFieldMatching")
Default: false
Values:
false: Match fields by name (default)true: Match fields by position// Match by position instead of name
val df = spark.read
.format("avro")
.option(AvroOptions.POSITIONAL_FIELD_MATCHING, "true")
.load("path/to/avro/files")
// Useful when field names don't match but structure is identical
val sparkSchema = StructType(Seq(
StructField("user_id", LongType), // Position 0
StructField("full_name", StringType), // Position 1
StructField("email_addr", StringType) // Position 2
))
// Avro schema has different field names but same positions:
// {"name": "id", "type": "long"} // Position 0
// {"name": "name", "type": "string"} // Position 1
// {"name": "email", "type": "string"} // Position 2Control how parsing errors are handled.
Option Key: AvroOptions.MODE ("mode")
Default: FAILFAST
Values:
FAILFAST: Throw exception on first error (default)PERMISSIVE: Set corrupt records to null, continue processingDROPMALFORMED: Drop corrupt records, continue processing// Drop malformed records
val df = spark.read
.format("avro")
.option(AvroOptions.MODE, "DROPMALFORMED")
.load("path/to/avro/files")
// Keep corrupt records as null
val df2 = spark.read
.format("avro")
.option(AvroOptions.MODE, "PERMISSIVE")
.load("path/to/avro/files")Control rebasing of DATE and TIMESTAMP values between Julian and Proleptic Gregorian calendars.
Option Key: AvroOptions.DATETIME_REBASE_MODE ("datetimeRebaseMode")
Default: Value from spark.sql.avro.datetimeRebaseModeInRead
Values:
EXCEPTION: Throw exception for dates requiring rebasingLEGACY: Use legacy Julian calendar behaviorCORRECTED: Apply Proleptic Gregorian calendar correctionval df = spark.read
.format("avro")
.option(AvroOptions.DATETIME_REBASE_MODE, "CORRECTED")
.load("path/to/historical/avro/files")Control field naming for Avro union types when converting to Spark SQL.
Option Key: AvroOptions.STABLE_ID_FOR_UNION_TYPE ("enableStableIdentifiersForUnionType")
Default: false
Values:
false: Use dynamic field names (default)true: Use stable, consistent field names based on typeval unionSchema = """
{
"type": "record",
"name": "Event",
"fields": [
{"name": "data", "type": [
{"type": "record", "name": "UserEvent", "fields": [{"name": "userId", "type": "long"}]},
{"type": "record", "name": "SystemEvent", "fields": [{"name": "systemId", "type": "string"}]}
]}
]
}
"""
val df = spark.read
.format("avro")
.option(AvroOptions.AVRO_SCHEMA, unionSchema)
.option(AvroOptions.STABLE_ID_FOR_UNION_TYPE, "true")
.load("path/to/union/data")
// Results in stable field names like "data.member_userevent" and "data.member_systemevent"Option Key: AvroOptions.IGNORE_EXTENSION ("ignoreExtension")
Status: Deprecated in Spark 3.0, use pathGlobFilter instead
// Deprecated way
val df = spark.read
.format("avro")
.option(AvroOptions.IGNORE_EXTENSION, "true")
.load("path/to/files")
// Modern way
val df2 = spark.read
.format("avro")
.option("pathGlobFilter", "*.data") // Or other pattern
.load("path/to/files")Some Avro behavior can be controlled through Spark SQL configuration:
Default compression codec for Avro files.
spark.conf.set("spark.sql.avro.compression.codec", "zstandard")
// All subsequent Avro writes will use zstandard compression by default
df.write.format("avro").save("path/to/output")Compression level for deflate codec (1-9).
spark.conf.set("spark.sql.avro.deflate.level", "6")
df.write
.format("avro")
.option(AvroOptions.COMPRESSION, "deflate")
.save("path/to/output")Default datetime rebase mode for reading.
spark.conf.set("spark.sql.avro.datetimeRebaseModeInRead", "CORRECTED")Combining multiple options for complex scenarios:
val complexDF = spark.read
.format("avro")
.option(AvroOptions.AVRO_SCHEMA_URL, "hdfs://cluster/schemas/evolved-schema.avsc")
.option(AvroOptions.MODE, "PERMISSIVE")
.option(AvroOptions.POSITIONAL_FIELD_MATCHING, "false")
.option(AvroOptions.DATETIME_REBASE_MODE, "CORRECTED")
.option(AvroOptions.STABLE_ID_FOR_UNION_TYPE, "true")
.option("pathGlobFilter", "*.avro")
.load("path/to/avro/files")
complexDF.write
.format("avro")
.option(AvroOptions.COMPRESSION, "zstandard")
.option(AvroOptions.RECORD_NAME, "ProcessedEvent")
.option(AvroOptions.RECORD_NAMESPACE, "com.company.events")
.mode("overwrite")
.save("path/to/processed/output")Configuration for reading old data with new schema:
val evolvedSchema = """
{
"type": "record",
"name": "UserV2",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "phone", "type": ["null", "string"], "default": null},
{"name": "created_at", "type": ["null", "long"], "default": null}
]
}
"""
val migratedDF = spark.read
.format("avro")
.option(AvroOptions.AVRO_SCHEMA, evolvedSchema)
.option(AvroOptions.MODE, "PERMISSIVE") // Handle missing fields gracefully
.option(AvroOptions.DATETIME_REBASE_MODE, "CORRECTED")
.load("path/to/legacy/user/data")// Fastest compression (good for temporary data)
df.write.format("avro").option(AvroOptions.COMPRESSION, "snappy").save("temp")
// Best compression ratio (good for archival)
df.write.format("avro").option(AvroOptions.COMPRESSION, "zstandard").save("archive")
// No compression (fastest write, largest files)
df.write.format("avro").option(AvroOptions.COMPRESSION, "uncompressed").save("staging")When repeatedly using the same schema:
// Cache schema in broadcast variable for reuse
val schemaBC = spark.sparkContext.broadcast(complexSchema)
// Use in multiple operations
df1.write.format("avro").option(AvroOptions.AVRO_SCHEMA, schemaBC.value).save("output1")
df2.write.format("avro").option(AvroOptions.AVRO_SCHEMA, schemaBC.value).save("output2")Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-avro-2-12