or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

column-functions.mdconfiguration.mddata-source.mdindex.mdschema-conversion.md
tile.json

configuration.mddocs/

Configuration Options

Comprehensive configuration system for customizing Avro read and write operations. The AvroOptions class provides case-insensitive parameter handling with intelligent defaults and integration with Hadoop configuration.

Capabilities

AvroOptions Class

Central configuration class for Avro read and write operations with case-insensitive parameter handling.

class AvroOptions(
  @transient val parameters: CaseInsensitiveMap[String], 
  @transient val conf: Configuration
) extends Logging with Serializable {
  
  def this(parameters: Map[String, String], conf: Configuration) = {
    this(CaseInsensitiveMap(parameters), conf)
  }
  
  // Configuration properties
  val schema: Option[String]           // Optional user-provided schema in JSON format
  val recordName: String              // Top level record name (default: "topLevelRecord")
  val recordNamespace: String         // Record namespace (default: "")
  val ignoreExtension: Boolean        // Ignore file extensions in read operations
  val compression: String             // Compression codec for write operations
}

Usage Examples:

import org.apache.spark.sql.avro.AvroOptions
import org.apache.hadoop.conf.Configuration

val hadoopConf = new Configuration()
val options = Map(
  "avroSchema" -> customSchemaJson,
  "recordName" -> "MyRecord",
  "compression" -> "snappy"
)

val avroOptions = new AvroOptions(options, hadoopConf)

// Access configuration values
val customSchema = avroOptions.schema
val recordName = avroOptions.recordName
val compressionCodec = avroOptions.compression

Schema Configuration

Options for specifying and customizing Avro schemas during read and write operations.

Custom Schema Option

val schema: Option[String] // Optional user-provided schema in JSON format

Usage Examples:

val customSchema = """{
  "type": "record",
  "name": "CustomUser",
  "namespace": "com.example",
  "fields": [
    {"name": "userId", "type": "long"},
    {"name": "userName", "type": "string"},
    {"name": "metadata", "type": ["null", "string"], "default": null}
  ]
}"""

// Read with custom schema
val df = spark.read
  .format("avro")
  .option("avroSchema", customSchema)
  .load("path/to/data.avro")

// Write with custom schema
df.write
  .format("avro")
  .option("avroSchema", customSchema)
  .save("path/to/output")

Record Naming Options

val recordName: String      // Top level record name (default: "topLevelRecord")
val recordNamespace: String // Record namespace (default: "")

Usage Examples:

// Configure record naming for write operations
df.write
  .format("avro")
  .option("recordName", "UserProfile")
  .option("recordNamespace", "com.company.data")
  .save("path/to/named_output")

// Example of generated Avro schema with custom naming
// Result schema will have:
// {
//   "type": "record",
//   "name": "UserProfile",
//   "namespace": "com.company.data",
//   "fields": [...]
// }\n```\n\n### File Extension Handling\n\nControls how Avro files are identified and processed based on file extensions.\n\n#### ignoreExtension Option\n\n```scala { .api }\nval ignoreExtension: Boolean // Control file extension filtering in read operations\n```\n\n**Usage Examples:**\n\n```scala\n// Read all files regardless of extension\nval df1 = spark.read\n  .format(\"avro\")\n  .option(\"ignoreExtension\", \"true\")\n  .load(\"path/to/mixed_files\")\n\n// Only read .avro files (default behavior)\nval df2 = spark.read\n  .format(\"avro\")\n  .option(\"ignoreExtension\", \"false\")\n  .load(\"path/to/mixed_files\")\n\n// Use Hadoop configuration property\n// Set via spark.conf.set() or hadoop configuration\nspark.conf.set(\"avro.mapred.ignore.inputs.without.extension\", \"false\")\n```\n\n### Compression Configuration\n\nOptions for configuring Avro file compression during write operations.\n\n#### Compression Option\n\n```scala { .api }\nval compression: String // Compression codec (default: \"snappy\")\n```\n\n**Supported Compression Codecs:**\n- `uncompressed` - No compression\n- `snappy` - Google Snappy compression (default)\n- `deflate` - DEFLATE compression\n- `bzip2` - Bzip2 compression\n- `xz` - XZ compression\n\n**Usage Examples:**\n\n```scala\n// Write with different compression codecs\nval df = spark.table(\"source_data\")\n\n// Snappy compression (default)\ndf.write\n  .format(\"avro\")\n  .option(\"compression\", \"snappy\")\n  .save(\"path/to/snappy_output\")\n\n// No compression\ndf.write\n  .format(\"avro\")\n  .option(\"compression\", \"uncompressed\")\n  .save(\"path/to/uncompressed_output\")\n\n// DEFLATE compression with custom level\ndf.write\n  .format(\"avro\")\n  .option(\"compression\", \"deflate\")\n  .save(\"path/to/deflate_output\")\n\n// Using Spark configuration\nspark.conf.set(\"spark.sql.avro.compression.codec\", \"bzip2\")\ndf.write.format(\"avro\").save(\"path/to/bzip2_output\")\n```\n\n## Spark Configuration Properties\n\nSpark-level configuration properties that affect Avro operations globally.\n\n### Built-in Configuration Properties\n\n```scala { .api }\n// Spark SQL configuration properties for Avro\n\"spark.sql.avro.compression.codec\"           // Default compression codec\n\"spark.sql.avro.deflate.level\"               // DEFLATE compression level\n\"spark.sql.legacy.replaceDatabricksSparkAvro.enabled\" // Databricks compatibility\n```\n\n**Configuration Examples:**\n\n```scala\n// Set global Avro compression\nspark.conf.set(\"spark.sql.avro.compression.codec\", \"deflate\")\n\n// Set DEFLATE compression level (1-9, higher = better compression)\nspark.conf.set(\"spark.sql.avro.deflate.level\", \"6\")\n\n// Enable Databricks Spark Avro compatibility\nspark.conf.set(\"spark.sql.legacy.replaceDatabricksSparkAvro.enabled\", \"true\")\n\n// Hadoop-level Avro configuration\nspark.conf.set(\"avro.mapred.ignore.inputs.without.extension\", \"false\")\n```\n\n### Hadoop Configuration Integration\n\nAvroOptions integrates with Hadoop Configuration for system-wide settings.\n\n```scala { .api }\n// Hadoop configuration properties affecting Avro operations\n\"avro.mapred.ignore.inputs.without.extension\" // File extension handling\n```\n\n**Usage Examples:**\n\n```scala\nimport org.apache.hadoop.conf.Configuration\n\n// Access Hadoop configuration\nval hadoopConf = spark.sparkContext.hadoopConfiguration\n\n// Set Hadoop-level Avro properties\nhadoopConf.setBoolean(\"avro.mapred.ignore.inputs.without.extension\", false)\n\n// Create AvroOptions with Hadoop configuration\nval options = new AvroOptions(\n  Map(\"compression\" -> \"snappy\", \"recordName\" -> \"MyRecord\"),\n  hadoopConf\n)\n```\n\n## Advanced Configuration Patterns\n\n### Dynamic Configuration\n\nConfiguring Avro options dynamically based on data characteristics.\n\n```scala\n// Dynamic compression based on data size\nval dataSize = spark.table(\"source_data\").count()\nval compressionCodec = if (dataSize > 1000000) \"bzip2\" else \"snappy\"\n\ndf.write\n  .format(\"avro\")\n  .option(\"compression\", compressionCodec)\n  .save(\"path/to/output\")\n\n// Dynamic schema based on DataFrame structure\nval schema = df.schema\nval avroSchema = SchemaConverters.toAvroType(\n  schema,\n  recordName = s\"${tableName}Record\",\n  nameSpace = \"com.company.generated\"\n)\n\ndf.write\n  .format(\"avro\")\n  .option(\"avroSchema\", avroSchema.toString)\n  .save(\"path/to/schema_output\")\n```\n\n### Configuration Validation\n\nValidating configuration options before operations.\n\n```scala\n// Validate compression codec\nval supportedCodecs = Set(\"uncompressed\", \"snappy\", \"deflate\", \"bzip2\", \"xz\")\nval requestedCodec = \"gzip\" // Invalid codec\n\nif (!supportedCodecs.contains(requestedCodec)) {\n  throw new IllegalArgumentException(s\"Unsupported compression codec: $requestedCodec\")\n}\n\n// Validate schema JSON format\nval schemaJson = \"\"\"invalid json\"\"\"\ntry {\n  new Schema.Parser().parse(schemaJson)\n} catch {\n  case e: Exception =>\n    throw new IllegalArgumentException(s\"Invalid Avro schema: ${e.getMessage}\")\n}\n```\n\n### Performance Tuning Configuration\n\nOptimal configuration settings for different use cases.\n\n```scala\n// High throughput configuration (large files, streaming)\ndf.write\n  .format(\"avro\")\n  .option(\"compression\", \"snappy\")  // Fast compression\n  .option(\"recordName\", \"StreamRecord\")\n  .mode(\"append\")\n  .save(\"path/to/streaming_output\")\n\n// Storage optimization configuration (archival, cold storage)\ndf.write\n  .format(\"avro\")\n  .option(\"compression\", \"bzip2\")   // High compression ratio\n  .option(\"recordName\", \"ArchivalRecord\")\n  .option(\"recordNamespace\", \"com.company.archive\")\n  .mode(\"overwrite\")\n  .save(\"path/to/archived_output\")\n\n// Schema evolution configuration\ndf.write\n  .format(\"avro\")\n  .option(\"compression\", \"deflate\")\n  .option(\"avroSchema\", evolutionCompatibleSchema)\n  .mode(\"append\")\n  .save(\"path/to/versioned_output\")
  .save("path/to/named_output")

// Generated Avro schema will have:
// "name": "UserProfile"
// "namespace": "com.company.data"

File Handling Configuration

Options for controlling how Avro files are processed and interpreted.

File Extension Handling

val ignoreExtension: Boolean // Whether to ignore .avro file extension requirement

Configuration Logic:

// Priority order for ignoreExtension:
// 1. Explicit option value
// 2. Hadoop config: avro.mapred.ignore.inputs.without.extension  
// 3. Default: false (require .avro extension)

Usage Examples:

// Read files without .avro extension
val df1 = spark.read
  .format("avro")
  .option("ignoreExtension", "true")
  .load("path/to/files_without_extension")

// Use Hadoop configuration default
val hadoopConf = spark.sparkContext.hadoopConfiguration
hadoopConf.setBoolean("avro.mapred.ignore.inputs.without.extension", true)

val df2 = spark.read
  .format("avro")
  .load("path/to/mixed_files")

Compression Configuration

Comprehensive compression options for write operations with multiple codec support.

Compression Codec Selection

val compression: String // Compression codec for write operations

Supported Compression Codecs:

"uncompressed" // No compression
"snappy"       // Default - balanced compression and speed
"deflate"      // Good compression ratio, configurable level
"bzip2"        // High compression ratio, slower processing
"xz"           // High compression ratio, slower processing

Configuration Priority:

// Priority order for compression:
// 1. Explicit option value
// 2. Spark config: spark.sql.avro.compression.codec
// 3. Default: "snappy"

Usage Examples:

// Set compression via option
df.write
  .format("avro")
  .option("compression", "deflate")
  .save("path/to/compressed_output")

// Set global default compression
spark.conf.set("spark.sql.avro.compression.codec", "bzip2")

// Configure deflate compression level
spark.conf.set("spark.sql.avro.deflate.level", "9") // Max compression
df.write
  .format("avro")
  .option("compression", "deflate")
  .save("path/to/max_compressed_output")

Advanced Configuration Options

Additional configuration options for specialized use cases.

Corrupt File Handling

// Handle corrupt files during read operations
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")

val robustDF = spark.read
  .format("avro")
  .load("path/to/potentially_corrupt_files")

File Discovery Options

// Control file discovery behavior
val options = Map(
  "ignoreExtension" -> "true",
  "recursiveFileLookup" -> "true"  // Spark SQL option
)

val df = spark.read
  .format("avro")
  .options(options)
  .load("path/to/nested_directories")

Configuration Validation

The AvroOptions class performs validation and provides helpful error messages for invalid configurations.

Validation Examples:

// Invalid compression codec
try {
  df.write
    .format("avro")
    .option("compression", "invalid_codec")
    .save("output")
} catch {
  case e: IllegalArgumentException => 
    println(s"Invalid compression codec: ${e.getMessage}")
}

// Schema validation during parse
try {
  val invalidSchema = """{"type": "invalid_type"}"""
  spark.read
    .format("avro")
    .option("avroSchema", invalidSchema)
    .load("data.avro")
} catch {
  case e: Exception =>
    println(s"Schema parse error: ${e.getMessage}")
}

Integration with Spark Configuration

AvroOptions integrates with Spark's configuration system for global defaults.

Spark Configuration Properties

// Global Avro configuration
spark.conf.set("spark.sql.avro.compression.codec", "snappy")
spark.conf.set("spark.sql.avro.deflate.level", "6")

// File handling configuration  
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
spark.conf.set("spark.sql.files.ignoreMissingFiles", "true")

Hadoop Configuration Integration

// Access Hadoop configuration
val hadoopConf = spark.sparkContext.hadoopConfiguration

// Set Avro-specific Hadoop properties
hadoopConf.setBoolean("avro.mapred.ignore.inputs.without.extension", true)

// Configuration is automatically used by AvroOptions
val df = spark.read.format("avro").load("data")

Complete Configuration Example

Comprehensive example showing all configuration options:

import org.apache.spark.sql.{SparkSession, SaveMode}

val spark = SparkSession.builder()
  .appName("Avro Configuration Example")
  .config("spark.sql.avro.compression.codec", "snappy")
  .config("spark.sql.avro.deflate.level", "6")
  .getOrCreate()

val customSchema = """{
  "type": "record",
  "name": "ProcessedEvent",
  "namespace": "com.company.events",
  "fields": [
    {"name": "eventId", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "payload", "type": ["null", "string"], "default": null}
  ]
}"""

val readOptions = Map(
  "avroSchema" -> customSchema,
  "ignoreExtension" -> "true"
)

val writeOptions = Map(
  "avroSchema" -> customSchema,
  "recordName" -> "ProcessedEvent", 
  "recordNamespace" -> "com.company.events",
  "compression" -> "deflate"
)

// Read with configuration
val inputDF = spark.read
  .format("avro")
  .options(readOptions)
  .load("path/to/input")

// Process data
val processedDF = inputDF.filter($"timestamp" > 1609459200000L)

// Write with configuration
processedDF.write
  .format("avro")
  .options(writeOptions)
  .mode(SaveMode.Overwrite)
  .save("path/to/output")