or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddatasource.mdfunctions.mdindex.mdschema-conversion.md
tile.json

configuration.mddocs/

Configuration Options

Apache Spark Avro provides extensive configuration options for controlling Avro processing behavior, including compression settings, schema handling, parsing modes, and performance optimizations.

Read Options

Configuration options for reading Avro files and deserializing Avro data.

Error Handling

val readOptions = Map(
  "mode" -> "PERMISSIVE"  // PERMISSIVE | DROPMALFORMED | FAILFAST
)

Available Modes:

  • PERMISSIVE (default): Sets corrupt records to null and continues processing
  • DROPMALFORMED: Ignores corrupt records completely
  • FAILFAST: Throws exception on first corrupt record

Usage Example:

// Strict parsing - fail on any malformed data
val strictDf = spark.read.format("avro")
  .option("mode", "FAILFAST")
  .load("sensitive-data.avro")

// Permissive parsing - continue with nulls for bad records
val lenientDf = spark.read.format("avro")
  .option("mode", "PERMISSIVE")
  .load("messy-data.avro")

Schema Options

val schemaOptions = Map(
  "avroSchema" -> jsonSchemaString,           // Override input schema
  "ignoreExtension" -> "true",                // Read files regardless of extension
  "recursiveFieldMaxDepth" -> "10"            // Maximum recursion depth for nested fields
)

Schema Override Example:

val fixedSchema = """{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"}
  ]
}"""

val df = spark.read.format("avro")
  .option("avroSchema", fixedSchema)
  .option("ignoreExtension", "true")
  .load("data-files/*")

Union Type Handling

val unionOptions = Map(
  "enableStableIdentifiersForUnionType" -> "true",        // Use stable identifiers for union types
  "stableIdentifierPrefixForUnionType" -> "union_"    // Prefix for stable union identifiers
)

Usage Example:

import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._

val options: JMap[String, String] = Map(
  "enableStableIdentifiersForUnionType" -> "true",
  "stableIdentifierPrefixForUnionType" -> "variant_"
).asJava

val df = df.select(from_avro(col("union_data"), unionSchema, options).as("parsed"))

DateTime Handling

val datetimeOptions = Map(
  "datetimeRebaseMode" -> "CORRECTED"  // EXCEPTION | CORRECTED | LEGACY
)

Rebase Modes:

  • EXCEPTION: Throw exception for dates before 1582-10-15
  • CORRECTED: Apply Gregorian calendar corrections
  • LEGACY: Use legacy Julian calendar handling

Write Options

Configuration options for writing Avro files and serializing data to Avro format.

Compression

val compressionOptions = Map(
  "compression" -> "snappy"  // uncompressed | deflate | snappy | bzip2 | xz | zstandard
)

Available Compression Codecs:

CodecPerformanceCompression RatioUse Case
uncompressedFastestNoneDevelopment, high-speed networks
snappyVery FastGoodGeneral purpose, balanced performance
deflateFastBetterStandard compression
bzip2SlowBestArchival, storage-optimized
xzSlowestExcellentLong-term storage
zstandardFastExcellentModern high-performance compression

AvroCompressionCodec Java Enum

The compression codecs are defined as a Java enum with additional utility methods:

public enum AvroCompressionCodec {
  UNCOMPRESSED, DEFLATE, SNAPPY, BZIP2, XZ, ZSTANDARD;
  
  public String getCodecName();
  public boolean getSupportCompressionLevel();
  public String lowerCaseName();
  public static AvroCompressionCodec fromString(String s);
}

Enum Methods:

  • getCodecName(): Returns the Avro codec name constant
  • getSupportCompressionLevel(): Returns true if codec supports compression levels
  • lowerCaseName(): Returns lowercase name for the codec
  • fromString(String): Parses codec from string (case-insensitive)

Usage Example:

// High-performance compression
df.write.format("avro")
  .option("compression", "snappy")
  .save("fast-access-data")

// Maximum compression for archival
df.write.format("avro")
  .option("compression", "zstandard")
  .save("archived-data")

Schema Generation

val schemaGenOptions = Map(
  "recordName" -> "MyRecord",              // Name for generated record types
  "recordNamespace" -> "com.example",      // Namespace for generated schemas
  "avroSchema" -> customSchemaJson         // Use custom output schema
)

Custom Schema Example:

val outputSchema = """{
  "type": "record",
  "name": "ExportRecord",
  "namespace": "com.company.exports",
  "fields": [
    {"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-micros"}},
    {"name": "user_id", "type": "string"},
    {"name": "metrics", "type": {"type": "map", "values": "double"}}
  ]
}"""

df.write.format("avro")
  .option("avroSchema", outputSchema)
  .option("compression", "snappy")
  .save("structured-export")

Function Options

Configuration options for from_avro, to_avro, and schema_of_avro functions.

Parsing Options

val functionOptions: java.util.Map[String, String] = Map(
  "mode" -> "PERMISSIVE",
  "datetimeRebaseMode" -> "CORRECTED",
  "enableStableIdentifiersForUnionType" -> "false",
  "stableIdentifierPrefixForUnionType" -> "",
  "recursiveFieldMaxDepth" -> "5"
).asJava

Complete Function Usage:

import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._
import org.apache.spark.sql.avro.functions._

val parseOptions: JMap[String, String] = Map(
  "mode" -> "FAILFAST",
  "datetimeRebaseMode" -> "CORRECTED",
  "enableStableIdentifiersForUnionType" -> "true",
  "recursiveFieldMaxDepth" -> "10"
).asJava

val decodedDf = df.select(
  from_avro(col("avro_binary"), avroSchema, parseOptions).as("decoded_data")
)

val schemaOptions: JMap[String, String] = Map(
  "enableStableIdentifiersForUnionType" -> "true",
  "recursiveFieldMaxDepth" -> "8"
).asJava

val schemaDf = spark.sql(
  "SELECT schema_of_avro('"+complexSchema+"', map('enableStableIdentifiersForUnionType', 'true')) AS spark_schema"
)

Performance Tuning Options

File Size Optimization

// Control output file size through partitioning
df.coalesce(numPartitions)
  .write.format("avro")
  .option("compression", "snappy")
  .save(path)

// Optimal partition size: 128MB - 1GB per file
val optimalPartitions = (df.count() * avgRowSizeBytes / targetPartitionSizeBytes).toInt

Schema Inference Control

// Disable schema inference by providing explicit schema
val df = spark.read.format("avro")
  .option("avroSchema", knownSchema)  // Avoids scanning all files
  .load("large-dataset/*.avro")

Memory Management

val memoryOptions = Map(
  "recursiveFieldMaxDepth" -> "3",      // Limit nesting to control memory usage
  "mode" -> "DROPMALFORMED"             // Skip problematic records to maintain memory
)

Option Constants

For type-safe configuration, use the predefined constants:

// Common option keys (from AvroOptions)
object AvroOptionKeys {
  val IGNORE_EXTENSION = "ignoreExtension"
  val MODE = "mode"
  val RECORD_NAME = "recordName"
  val COMPRESSION = "compression"
  val AVRO_SCHEMA = "avroSchema"
  val AVRO_SCHEMA_URL = "avroSchemaUrl"
  val RECORD_NAMESPACE = "recordNamespace"
  val POSITIONAL_FIELD_MATCHING = "positionalFieldMatching"
  val DATETIME_REBASE_MODE = "datetimeRebaseMode"
  val STABLE_ID_FOR_UNION_TYPE = "enableStableIdentifiersForUnionType"
  val STABLE_ID_PREFIX_FOR_UNION_TYPE = "stableIdentifierPrefixForUnionType"
  val RECURSIVE_FIELD_MAX_DEPTH = "recursiveFieldMaxDepth"
}

Configuration Examples

Production Reading Configuration

val productionReadConfig = Map(
  "mode" -> "FAILFAST",                    // Strict error handling
  "ignoreExtension" -> "false",            // Validate file extensions
  "recursiveFieldMaxDepth" -> "5",         // Reasonable nesting limit
  "datetimeRebaseMode" -> "CORRECTED"      // Handle historical dates correctly
)

val df = spark.read.format("avro")
  .options(productionReadConfig)
  .load("production-data/*.avro")

High-Performance Writing Configuration

val highPerfWriteConfig = Map(
  "compression" -> "snappy",               // Fast compression
  "recordName" -> "OptimizedRecord",       // Descriptive record name
  "recordNamespace" -> "com.company.data"  // Proper namespace
)

df.coalesce(200)  // Optimize partition count
  .write.format("avro")
  .options(highPerfWriteConfig)
  .save("optimized-output")

Development/Testing Configuration

val devConfig = Map(
  "mode" -> "PERMISSIVE",                  // Lenient error handling
  "ignoreExtension" -> "true",             // Flexible file reading
  "compression" -> "uncompressed"          // Fast writes for testing
)

val testDf = spark.read.format("avro")
  .options(devConfig)
  .load("test-data/*")