or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddatasource.mdfunctions.mdindex.mdschema-conversion.md
tile.json

datasource.mddocs/

File DataSource

The Avro DataSource provides native Spark integration for reading and writing Avro files through the DataSource V2 API. It offers optimized performance with features like predicate pushdown, column pruning, and automatic schema inference.

Reading Avro Files

Basic Reading

val df = spark.read.format("avro").load(path: String)
val df = spark.read.format("avro").load(paths: String*)

Usage Example:

// Read single file
val df = spark.read.format("avro").load("path/to/file.avro")

// Read multiple files
val df = spark.read.format("avro").load("file1.avro", "file2.avro")

// Read directory of Avro files
val df = spark.read.format("avro").load("path/to/avro/directory")

Reading with Schema

Specify a schema for consistent structure across files:

val df = spark.read.format("avro")
  .option("avroSchema", jsonSchema)
  .load(path)

Usage Example:

val userSchema = """{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": ["null", "string"], "default": null}
  ]
}"""

val df = spark.read.format("avro")
  .option("avroSchema", userSchema)
  .load("users/*.avro")

Reading with Options

val df = spark.read.format("avro")
  .option("ignoreExtension", "true")
  .option("mode", "PERMISSIVE")
  .load(path)

Common Read Options:

  • ignoreExtension: Read files regardless of extension (default: false)
  • mode: Error handling mode (PERMISSIVE, DROPMALFORMED, FAILFAST)
  • avroSchema: Override schema for reading
  • recursiveFieldMaxDepth: Maximum recursion depth for nested fields

Writing Avro Files

Basic Writing

df.write.format("avro").save(path: String)

Usage Example:

val df = spark.range(1000).select($"id", ($"id" * 2).as("value"))
df.write.format("avro").save("output/numbers.avro")

Writing with Compression

df.write.format("avro")
  .option("compression", codec)
  .save(path)

Available Compression Codecs:

  • uncompressed (default)
  • snappy
  • deflate
  • bzip2
  • xz
  • zstandard

Usage Example:

df.write.format("avro")
  .option("compression", "snappy")
  .save("output/compressed.avro")

Writing with Custom Schema

df.write.format("avro")
  .option("avroSchema", jsonSchema)
  .save(path)

Usage Example:

val outputSchema = """{
  "type": "record",
  "name": "Output",
  "namespace": "com.example",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "value", "type": "double"}
  ]
}"""

df.write.format("avro")
  .option("avroSchema", outputSchema)
  .save("output/custom-schema.avro")

Partitioned Writing

df.write.format("avro")
  .partitionBy(columns: String*)
  .save(path)

Usage Example:

val salesDf = spark.read.format("avro").load("sales.avro")
salesDf.write.format("avro")
  .partitionBy("year", "month")
  .option("compression", "snappy")
  .save("partitioned-sales")

Advanced Features

Schema Evolution

The DataSource supports schema evolution when reading multiple files:

// Reads files with different but compatible schemas
val df = spark.read.format("avro")
  .option("mode", "PERMISSIVE")
  .load("evolving-schema/*.avro")

Predicate Pushdown

Automatically pushes down filter predicates to reduce I/O:

val filteredDf = spark.read.format("avro")
  .load("large-dataset.avro")
  .filter($"age" > 21)  // Filter pushed down to file scanning

Column Pruning

Only reads required columns from files:

val projectedDf = spark.read.format("avro")
  .load("users.avro")
  .select("name", "email")  // Only reads name and email columns

SQL Integration

Use Avro files directly in SQL queries:

// Create temporary view
spark.read.format("avro").load("users.avro").createOrReplaceTempView("users")

// Query with SQL
spark.sql("SELECT name, age FROM users WHERE age > 18")

// Create external table
spark.sql("""
  CREATE TABLE user_data
  USING avro
  LOCATION 'path/to/avro/files'
""")

DataSource Configuration

Write Options

Map(
  "compression" -> "snappy",           // Compression codec
  "avroSchema" -> jsonSchema,          // Custom output schema
  "recordName" -> "MyRecord",          // Record name for schema generation
  "recordNamespace" -> "com.example"   // Namespace for generated schema  
)

Read Options

Map(
  "ignoreExtension" -> "true",         // Ignore file extensions
  "mode" -> "PERMISSIVE",              // Error handling mode
  "avroSchema" -> jsonSchema,          // Override input schema
  "recursiveFieldMaxDepth" -> "5"      // Max recursion depth
)

Performance Considerations

Optimal File Sizes

  • Target 128MB-1GB per file for best performance
  • Use partitioning for large datasets
  • Enable compression for storage efficiency

Schema Inference

// Expensive - scans all files
val df1 = spark.read.format("avro").load("many-files/*.avro")

// Efficient - uses provided schema
val df2 = spark.read.format("avro")
  .option("avroSchema", knownSchema)
  .load("many-files/*.avro")

Compression Trade-offs

  • Snappy: Fast compression/decompression, moderate compression ratio
  • Deflate: Better compression ratio, slower than Snappy
  • ZStandard: Best compression ratio, good performance
  • Uncompressed: Fastest I/O, largest storage requirements