Comprehensive configuration system for customizing Avro read and write operations. The AvroOptions class provides case-insensitive parameter handling with intelligent defaults and integration with Hadoop configuration.
Central configuration class for Avro read and write operations with case-insensitive parameter handling.
class AvroOptions(
@transient val parameters: CaseInsensitiveMap[String],
@transient val conf: Configuration
) extends Logging with Serializable {
def this(parameters: Map[String, String], conf: Configuration) = {
this(CaseInsensitiveMap(parameters), conf)
}
// Configuration properties
val schema: Option[String] // Optional user-provided schema in JSON format
val recordName: String // Top level record name (default: "topLevelRecord")
val recordNamespace: String // Record namespace (default: "")
val ignoreExtension: Boolean // Ignore file extensions in read operations
val compression: String // Compression codec for write operations
}Usage Examples:
import org.apache.spark.sql.avro.AvroOptions
import org.apache.hadoop.conf.Configuration
val hadoopConf = new Configuration()
val options = Map(
"avroSchema" -> customSchemaJson,
"recordName" -> "MyRecord",
"compression" -> "snappy"
)
val avroOptions = new AvroOptions(options, hadoopConf)
// Access configuration values
val customSchema = avroOptions.schema
val recordName = avroOptions.recordName
val compressionCodec = avroOptions.compressionOptions for specifying and customizing Avro schemas during read and write operations.
val schema: Option[String] // Optional user-provided schema in JSON formatUsage Examples:
val customSchema = """{
"type": "record",
"name": "CustomUser",
"namespace": "com.example",
"fields": [
{"name": "userId", "type": "long"},
{"name": "userName", "type": "string"},
{"name": "metadata", "type": ["null", "string"], "default": null}
]
}"""
// Read with custom schema
val df = spark.read
.format("avro")
.option("avroSchema", customSchema)
.load("path/to/data.avro")
// Write with custom schema
df.write
.format("avro")
.option("avroSchema", customSchema)
.save("path/to/output")val recordName: String // Top level record name (default: "topLevelRecord")
val recordNamespace: String // Record namespace (default: "")Usage Examples:
// Configure record naming for write operations
df.write
.format("avro")
.option("recordName", "UserProfile")
.option("recordNamespace", "com.company.data")
.save("path/to/named_output")
// Example of generated Avro schema with custom naming
// Result schema will have:
// {
// "type": "record",
// "name": "UserProfile",
// "namespace": "com.company.data",
// "fields": [...]
// }\n```\n\n### File Extension Handling\n\nControls how Avro files are identified and processed based on file extensions.\n\n#### ignoreExtension Option\n\n```scala { .api }\nval ignoreExtension: Boolean // Control file extension filtering in read operations\n```\n\n**Usage Examples:**\n\n```scala\n// Read all files regardless of extension\nval df1 = spark.read\n .format(\"avro\")\n .option(\"ignoreExtension\", \"true\")\n .load(\"path/to/mixed_files\")\n\n// Only read .avro files (default behavior)\nval df2 = spark.read\n .format(\"avro\")\n .option(\"ignoreExtension\", \"false\")\n .load(\"path/to/mixed_files\")\n\n// Use Hadoop configuration property\n// Set via spark.conf.set() or hadoop configuration\nspark.conf.set(\"avro.mapred.ignore.inputs.without.extension\", \"false\")\n```\n\n### Compression Configuration\n\nOptions for configuring Avro file compression during write operations.\n\n#### Compression Option\n\n```scala { .api }\nval compression: String // Compression codec (default: \"snappy\")\n```\n\n**Supported Compression Codecs:**\n- `uncompressed` - No compression\n- `snappy` - Google Snappy compression (default)\n- `deflate` - DEFLATE compression\n- `bzip2` - Bzip2 compression\n- `xz` - XZ compression\n\n**Usage Examples:**\n\n```scala\n// Write with different compression codecs\nval df = spark.table(\"source_data\")\n\n// Snappy compression (default)\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"snappy\")\n .save(\"path/to/snappy_output\")\n\n// No compression\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"uncompressed\")\n .save(\"path/to/uncompressed_output\")\n\n// DEFLATE compression with custom level\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"deflate\")\n .save(\"path/to/deflate_output\")\n\n// Using Spark configuration\nspark.conf.set(\"spark.sql.avro.compression.codec\", \"bzip2\")\ndf.write.format(\"avro\").save(\"path/to/bzip2_output\")\n```\n\n## Spark Configuration Properties\n\nSpark-level configuration properties that affect Avro operations globally.\n\n### Built-in Configuration Properties\n\n```scala { .api }\n// Spark SQL configuration properties for Avro\n\"spark.sql.avro.compression.codec\" // Default compression codec\n\"spark.sql.avro.deflate.level\" // DEFLATE compression level\n\"spark.sql.legacy.replaceDatabricksSparkAvro.enabled\" // Databricks compatibility\n```\n\n**Configuration Examples:**\n\n```scala\n// Set global Avro compression\nspark.conf.set(\"spark.sql.avro.compression.codec\", \"deflate\")\n\n// Set DEFLATE compression level (1-9, higher = better compression)\nspark.conf.set(\"spark.sql.avro.deflate.level\", \"6\")\n\n// Enable Databricks Spark Avro compatibility\nspark.conf.set(\"spark.sql.legacy.replaceDatabricksSparkAvro.enabled\", \"true\")\n\n// Hadoop-level Avro configuration\nspark.conf.set(\"avro.mapred.ignore.inputs.without.extension\", \"false\")\n```\n\n### Hadoop Configuration Integration\n\nAvroOptions integrates with Hadoop Configuration for system-wide settings.\n\n```scala { .api }\n// Hadoop configuration properties affecting Avro operations\n\"avro.mapred.ignore.inputs.without.extension\" // File extension handling\n```\n\n**Usage Examples:**\n\n```scala\nimport org.apache.hadoop.conf.Configuration\n\n// Access Hadoop configuration\nval hadoopConf = spark.sparkContext.hadoopConfiguration\n\n// Set Hadoop-level Avro properties\nhadoopConf.setBoolean(\"avro.mapred.ignore.inputs.without.extension\", false)\n\n// Create AvroOptions with Hadoop configuration\nval options = new AvroOptions(\n Map(\"compression\" -> \"snappy\", \"recordName\" -> \"MyRecord\"),\n hadoopConf\n)\n```\n\n## Advanced Configuration Patterns\n\n### Dynamic Configuration\n\nConfiguring Avro options dynamically based on data characteristics.\n\n```scala\n// Dynamic compression based on data size\nval dataSize = spark.table(\"source_data\").count()\nval compressionCodec = if (dataSize > 1000000) \"bzip2\" else \"snappy\"\n\ndf.write\n .format(\"avro\")\n .option(\"compression\", compressionCodec)\n .save(\"path/to/output\")\n\n// Dynamic schema based on DataFrame structure\nval schema = df.schema\nval avroSchema = SchemaConverters.toAvroType(\n schema,\n recordName = s\"${tableName}Record\",\n nameSpace = \"com.company.generated\"\n)\n\ndf.write\n .format(\"avro\")\n .option(\"avroSchema\", avroSchema.toString)\n .save(\"path/to/schema_output\")\n```\n\n### Configuration Validation\n\nValidating configuration options before operations.\n\n```scala\n// Validate compression codec\nval supportedCodecs = Set(\"uncompressed\", \"snappy\", \"deflate\", \"bzip2\", \"xz\")\nval requestedCodec = \"gzip\" // Invalid codec\n\nif (!supportedCodecs.contains(requestedCodec)) {\n throw new IllegalArgumentException(s\"Unsupported compression codec: $requestedCodec\")\n}\n\n// Validate schema JSON format\nval schemaJson = \"\"\"invalid json\"\"\"\ntry {\n new Schema.Parser().parse(schemaJson)\n} catch {\n case e: Exception =>\n throw new IllegalArgumentException(s\"Invalid Avro schema: ${e.getMessage}\")\n}\n```\n\n### Performance Tuning Configuration\n\nOptimal configuration settings for different use cases.\n\n```scala\n// High throughput configuration (large files, streaming)\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"snappy\") // Fast compression\n .option(\"recordName\", \"StreamRecord\")\n .mode(\"append\")\n .save(\"path/to/streaming_output\")\n\n// Storage optimization configuration (archival, cold storage)\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"bzip2\") // High compression ratio\n .option(\"recordName\", \"ArchivalRecord\")\n .option(\"recordNamespace\", \"com.company.archive\")\n .mode(\"overwrite\")\n .save(\"path/to/archived_output\")\n\n// Schema evolution configuration\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"deflate\")\n .option(\"avroSchema\", evolutionCompatibleSchema)\n .mode(\"append\")\n .save(\"path/to/versioned_output\")
.save("path/to/named_output")
// Generated Avro schema will have:
// "name": "UserProfile"
// "namespace": "com.company.data"Options for controlling how Avro files are processed and interpreted.
val ignoreExtension: Boolean // Whether to ignore .avro file extension requirementConfiguration Logic:
// Priority order for ignoreExtension:
// 1. Explicit option value
// 2. Hadoop config: avro.mapred.ignore.inputs.without.extension
// 3. Default: false (require .avro extension)Usage Examples:
// Read files without .avro extension
val df1 = spark.read
.format("avro")
.option("ignoreExtension", "true")
.load("path/to/files_without_extension")
// Use Hadoop configuration default
val hadoopConf = spark.sparkContext.hadoopConfiguration
hadoopConf.setBoolean("avro.mapred.ignore.inputs.without.extension", true)
val df2 = spark.read
.format("avro")
.load("path/to/mixed_files")Comprehensive compression options for write operations with multiple codec support.
val compression: String // Compression codec for write operationsSupported Compression Codecs:
"uncompressed" // No compression
"snappy" // Default - balanced compression and speed
"deflate" // Good compression ratio, configurable level
"bzip2" // High compression ratio, slower processing
"xz" // High compression ratio, slower processingConfiguration Priority:
// Priority order for compression:
// 1. Explicit option value
// 2. Spark config: spark.sql.avro.compression.codec
// 3. Default: "snappy"Usage Examples:
// Set compression via option
df.write
.format("avro")
.option("compression", "deflate")
.save("path/to/compressed_output")
// Set global default compression
spark.conf.set("spark.sql.avro.compression.codec", "bzip2")
// Configure deflate compression level
spark.conf.set("spark.sql.avro.deflate.level", "9") // Max compression
df.write
.format("avro")
.option("compression", "deflate")
.save("path/to/max_compressed_output")Additional configuration options for specialized use cases.
// Handle corrupt files during read operations
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
val robustDF = spark.read
.format("avro")
.load("path/to/potentially_corrupt_files")// Control file discovery behavior
val options = Map(
"ignoreExtension" -> "true",
"recursiveFileLookup" -> "true" // Spark SQL option
)
val df = spark.read
.format("avro")
.options(options)
.load("path/to/nested_directories")The AvroOptions class performs validation and provides helpful error messages for invalid configurations.
Validation Examples:
// Invalid compression codec
try {
df.write
.format("avro")
.option("compression", "invalid_codec")
.save("output")
} catch {
case e: IllegalArgumentException =>
println(s"Invalid compression codec: ${e.getMessage}")
}
// Schema validation during parse
try {
val invalidSchema = """{"type": "invalid_type"}"""
spark.read
.format("avro")
.option("avroSchema", invalidSchema)
.load("data.avro")
} catch {
case e: Exception =>
println(s"Schema parse error: ${e.getMessage}")
}AvroOptions integrates with Spark's configuration system for global defaults.
// Global Avro configuration
spark.conf.set("spark.sql.avro.compression.codec", "snappy")
spark.conf.set("spark.sql.avro.deflate.level", "6")
// File handling configuration
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
spark.conf.set("spark.sql.files.ignoreMissingFiles", "true")// Access Hadoop configuration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// Set Avro-specific Hadoop properties
hadoopConf.setBoolean("avro.mapred.ignore.inputs.without.extension", true)
// Configuration is automatically used by AvroOptions
val df = spark.read.format("avro").load("data")Comprehensive example showing all configuration options:
import org.apache.spark.sql.{SparkSession, SaveMode}
val spark = SparkSession.builder()
.appName("Avro Configuration Example")
.config("spark.sql.avro.compression.codec", "snappy")
.config("spark.sql.avro.deflate.level", "6")
.getOrCreate()
val customSchema = """{
"type": "record",
"name": "ProcessedEvent",
"namespace": "com.company.events",
"fields": [
{"name": "eventId", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "payload", "type": ["null", "string"], "default": null}
]
}"""
val readOptions = Map(
"avroSchema" -> customSchema,
"ignoreExtension" -> "true"
)
val writeOptions = Map(
"avroSchema" -> customSchema,
"recordName" -> "ProcessedEvent",
"recordNamespace" -> "com.company.events",
"compression" -> "deflate"
)
// Read with configuration
val inputDF = spark.read
.format("avro")
.options(readOptions)
.load("path/to/input")
// Process data
val processedDF = inputDF.filter($"timestamp" > 1609459200000L)
// Write with configuration
processedDF.write
.format("avro")
.options(writeOptions)
.mode(SaveMode.Overwrite)
.save("path/to/output")