Apache Spark Avro provides extensive configuration options for controlling Avro processing behavior, including compression settings, schema handling, parsing modes, and performance optimizations.
Configuration options for reading Avro files and deserializing Avro data.
val readOptions = Map(
"mode" -> "PERMISSIVE" // PERMISSIVE | DROPMALFORMED | FAILFAST
)Available Modes:
Usage Example:
// Strict parsing - fail on any malformed data
val strictDf = spark.read.format("avro")
.option("mode", "FAILFAST")
.load("sensitive-data.avro")
// Permissive parsing - continue with nulls for bad records
val lenientDf = spark.read.format("avro")
.option("mode", "PERMISSIVE")
.load("messy-data.avro")val schemaOptions = Map(
"avroSchema" -> jsonSchemaString, // Override input schema
"ignoreExtension" -> "true", // Read files regardless of extension
"recursiveFieldMaxDepth" -> "10" // Maximum recursion depth for nested fields
)Schema Override Example:
val fixedSchema = """{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}"""
val df = spark.read.format("avro")
.option("avroSchema", fixedSchema)
.option("ignoreExtension", "true")
.load("data-files/*")val unionOptions = Map(
"enableStableIdentifiersForUnionType" -> "true", // Use stable identifiers for union types
"stableIdentifierPrefixForUnionType" -> "union_" // Prefix for stable union identifiers
)Usage Example:
import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._
val options: JMap[String, String] = Map(
"enableStableIdentifiersForUnionType" -> "true",
"stableIdentifierPrefixForUnionType" -> "variant_"
).asJava
val df = df.select(from_avro(col("union_data"), unionSchema, options).as("parsed"))val datetimeOptions = Map(
"datetimeRebaseMode" -> "CORRECTED" // EXCEPTION | CORRECTED | LEGACY
)Rebase Modes:
Configuration options for writing Avro files and serializing data to Avro format.
val compressionOptions = Map(
"compression" -> "snappy" // uncompressed | deflate | snappy | bzip2 | xz | zstandard
)Available Compression Codecs:
| Codec | Performance | Compression Ratio | Use Case |
|---|---|---|---|
| uncompressed | Fastest | None | Development, high-speed networks |
| snappy | Very Fast | Good | General purpose, balanced performance |
| deflate | Fast | Better | Standard compression |
| bzip2 | Slow | Best | Archival, storage-optimized |
| xz | Slowest | Excellent | Long-term storage |
| zstandard | Fast | Excellent | Modern high-performance compression |
The compression codecs are defined as a Java enum with additional utility methods:
public enum AvroCompressionCodec {
UNCOMPRESSED, DEFLATE, SNAPPY, BZIP2, XZ, ZSTANDARD;
public String getCodecName();
public boolean getSupportCompressionLevel();
public String lowerCaseName();
public static AvroCompressionCodec fromString(String s);
}Enum Methods:
getCodecName(): Returns the Avro codec name constantgetSupportCompressionLevel(): Returns true if codec supports compression levelslowerCaseName(): Returns lowercase name for the codecfromString(String): Parses codec from string (case-insensitive)Usage Example:
// High-performance compression
df.write.format("avro")
.option("compression", "snappy")
.save("fast-access-data")
// Maximum compression for archival
df.write.format("avro")
.option("compression", "zstandard")
.save("archived-data")val schemaGenOptions = Map(
"recordName" -> "MyRecord", // Name for generated record types
"recordNamespace" -> "com.example", // Namespace for generated schemas
"avroSchema" -> customSchemaJson // Use custom output schema
)Custom Schema Example:
val outputSchema = """{
"type": "record",
"name": "ExportRecord",
"namespace": "com.company.exports",
"fields": [
{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-micros"}},
{"name": "user_id", "type": "string"},
{"name": "metrics", "type": {"type": "map", "values": "double"}}
]
}"""
df.write.format("avro")
.option("avroSchema", outputSchema)
.option("compression", "snappy")
.save("structured-export")Configuration options for from_avro, to_avro, and schema_of_avro functions.
val functionOptions: java.util.Map[String, String] = Map(
"mode" -> "PERMISSIVE",
"datetimeRebaseMode" -> "CORRECTED",
"enableStableIdentifiersForUnionType" -> "false",
"stableIdentifierPrefixForUnionType" -> "",
"recursiveFieldMaxDepth" -> "5"
).asJavaComplete Function Usage:
import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._
import org.apache.spark.sql.avro.functions._
val parseOptions: JMap[String, String] = Map(
"mode" -> "FAILFAST",
"datetimeRebaseMode" -> "CORRECTED",
"enableStableIdentifiersForUnionType" -> "true",
"recursiveFieldMaxDepth" -> "10"
).asJava
val decodedDf = df.select(
from_avro(col("avro_binary"), avroSchema, parseOptions).as("decoded_data")
)
val schemaOptions: JMap[String, String] = Map(
"enableStableIdentifiersForUnionType" -> "true",
"recursiveFieldMaxDepth" -> "8"
).asJava
val schemaDf = spark.sql(
"SELECT schema_of_avro('"+complexSchema+"', map('enableStableIdentifiersForUnionType', 'true')) AS spark_schema"
)// Control output file size through partitioning
df.coalesce(numPartitions)
.write.format("avro")
.option("compression", "snappy")
.save(path)
// Optimal partition size: 128MB - 1GB per file
val optimalPartitions = (df.count() * avgRowSizeBytes / targetPartitionSizeBytes).toInt// Disable schema inference by providing explicit schema
val df = spark.read.format("avro")
.option("avroSchema", knownSchema) // Avoids scanning all files
.load("large-dataset/*.avro")val memoryOptions = Map(
"recursiveFieldMaxDepth" -> "3", // Limit nesting to control memory usage
"mode" -> "DROPMALFORMED" // Skip problematic records to maintain memory
)For type-safe configuration, use the predefined constants:
// Common option keys (from AvroOptions)
object AvroOptionKeys {
val IGNORE_EXTENSION = "ignoreExtension"
val MODE = "mode"
val RECORD_NAME = "recordName"
val COMPRESSION = "compression"
val AVRO_SCHEMA = "avroSchema"
val AVRO_SCHEMA_URL = "avroSchemaUrl"
val RECORD_NAMESPACE = "recordNamespace"
val POSITIONAL_FIELD_MATCHING = "positionalFieldMatching"
val DATETIME_REBASE_MODE = "datetimeRebaseMode"
val STABLE_ID_FOR_UNION_TYPE = "enableStableIdentifiersForUnionType"
val STABLE_ID_PREFIX_FOR_UNION_TYPE = "stableIdentifierPrefixForUnionType"
val RECURSIVE_FIELD_MAX_DEPTH = "recursiveFieldMaxDepth"
}val productionReadConfig = Map(
"mode" -> "FAILFAST", // Strict error handling
"ignoreExtension" -> "false", // Validate file extensions
"recursiveFieldMaxDepth" -> "5", // Reasonable nesting limit
"datetimeRebaseMode" -> "CORRECTED" // Handle historical dates correctly
)
val df = spark.read.format("avro")
.options(productionReadConfig)
.load("production-data/*.avro")val highPerfWriteConfig = Map(
"compression" -> "snappy", // Fast compression
"recordName" -> "OptimizedRecord", // Descriptive record name
"recordNamespace" -> "com.company.data" // Proper namespace
)
df.coalesce(200) // Optimize partition count
.write.format("avro")
.options(highPerfWriteConfig)
.save("optimized-output")val devConfig = Map(
"mode" -> "PERMISSIVE", // Lenient error handling
"ignoreExtension" -> "true", // Flexible file reading
"compression" -> "uncompressed" // Fast writes for testing
)
val testDf = spark.read.format("avro")
.options(devConfig)
.load("test-data/*")