or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

column-functions.mdconfiguration.mddata-source.mdindex.mdschema-conversion.md
tile.json

data-source.mddocs/

Data Source Operations

Comprehensive file-based data source functionality for reading from and writing to Avro files using Spark's standard DataFrame read/write API. Supports automatic schema inference, configurable compression, and efficient distributed processing of Avro data.

Capabilities

Reading Avro Files

Read Avro files using Spark's DataFrameReader with automatic schema inference and configurable options.

// Basic read operations
spark.read.format("avro").load(path: String): DataFrame
spark.read.format("avro").load(paths: String*): DataFrame

// With options
spark.read.format("avro")
  .option(key: String, value: String)
  .load(path: String): DataFrame

Usage Examples:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Avro Reader Example")
  .getOrCreate()

// Read single file
val df1 = spark.read
  .format("avro")
  .load("path/to/data.avro")

// Read multiple files
val df2 = spark.read
  .format("avro")
  .load("path/to/dir1", "path/to/dir2")

// Read with glob pattern
val df3 = spark.read
  .format("avro")
  .load("path/to/data/*.avro")

// Read with custom schema
val df4 = spark.read
  .format("avro")
  .option("avroSchema", customSchemaJson)
  .load("path/to/data.avro")

// Read ignoring file extensions
val df5 = spark.read
  .format("avro")
  .option("ignoreExtension", "true")
  .load("path/to/files_without_extension")

Writing Avro Files

Write DataFrames to Avro files with configurable compression and schema options.

// Basic write operations
df.write.format("avro").save(path: String): Unit
df.write.format("avro").mode(saveMode: SaveMode).save(path: String): Unit

// With options
df.write.format("avro")
  .option(key: String, value: String)
  .mode(saveMode: SaveMode)
  .save(path: String): Unit

Usage Examples:

import org.apache.spark.sql.SaveMode

val df = spark.table("source_data")

// Basic write
df.write
  .format("avro")
  .save("path/to/output")

// Write with compression
df.write
  .format("avro")
  .option("compression", "snappy")
  .mode(SaveMode.Overwrite)
  .save("path/to/compressed_output")

// Write with custom record name and namespace
df.write
  .format("avro")
  .option("recordName", "MyRecord")
  .option("recordNamespace", "com.example.data")
  .mode(SaveMode.Append)
  .save("path/to/named_output")

// Write with custom schema
val customSchema = """{
  "type": "record",
  "name": "CustomRecord",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "value", "type": "string"}
  ]
}"""

df.write
  .format("avro")
  .option("avroSchema", customSchema)
  .save("path/to/custom_schema_output")

Data Source Options\n\nComprehensive list of options available for Avro read and write operations.\n\n### Available Options Table\n\n| Option | Default | Description | Scope | Example |\n|--------|---------|-------------|-------|---------|\n| avroSchema | None | Custom Avro schema in JSON format | Read/Write | \"{\\\"type\\\": \\\"record\\\", ...}\" |\n| recordName | \"topLevelRecord\" | Top-level record name in Avro schema | Write | \"UserRecord\" |\n| recordNamespace | \"\" | Record namespace in Avro schema | Write | \"com.example.data\" |\n| ignoreExtension | true | Ignore file extensions when reading | Read | \"false\" |\n| compression | \"snappy\" | Compression codec for write operations | Write | \"deflate\" |\n\n### Schema Inference

Automatic schema detection from Avro files with support for schema evolution.

// Schema inference behavior
val inferredSchema = spark.read.format("avro").load(path).schema

Schema Inference Examples:

// Inspect inferred schema
val df = spark.read.format("avro").load("path/to/data.avro")
df.printSchema()

// Schema inference with multiple files (uses first readable file)
val multiFileDF = spark.read
  .format("avro")
  .load("path/to/multiple/*.avro")

// Handle corrupt files during schema inference
val robustDF = spark.read
  .format("avro")
  .option("ignoreCorruptFiles", "true")
  .load("path/to/possibly_corrupt/*.avro")

File Splitting and Parallelism

Avro files are splittable, enabling efficient distributed processing.

// Avro files support splitting for parallel processing
val parallelDF = spark.read
  .format("avro")
  .load("large_avro_file.avro")
  
// Spark automatically splits files across partitions
val partitionCount = parallelDF.rdd.getNumPartitions

Supported Read Options

Configuration options for customizing read behavior:

// Available read options
.option("avroSchema", jsonSchemaString)        // Custom schema
.option("ignoreExtension", "true|false")       // Ignore .avro extension requirement
.option("ignoreCorruptFiles", "true|false")    // Skip corrupt files during processing

Supported Write Options

Configuration options for customizing write behavior:

// Available write options
.option("avroSchema", jsonSchemaString)          // Custom output schema
.option("recordName", recordName)                // Top-level record name
.option("recordNamespace", namespace)            // Record namespace
.option("compression", compressionCodec)         // Compression: snappy, deflate, bzip2, xz, uncompressed

Compression Support

Multiple compression codecs are supported for write operations:

Supported Compression Codecs:

// Compression options
"snappy"      // Default - balanced compression and speed
"deflate"     // Good compression ratio, configurable level
"bzip2"       // High compression ratio, slower
"xz"          // High compression ratio, slower
"uncompressed" // No compression

Compression Examples:

// Snappy compression (default)
df.write.format("avro").save("snappy_output")

// Deflate with custom level (configured via Spark config)
spark.conf.set("spark.sql.avro.deflate.level", "6")
df.write
  .format("avro")
  .option("compression", "deflate")
  .save("deflate_output")

// High compression with bzip2
df.write
  .format("avro")
  .option("compression", "bzip2")
  .save("bzip2_output")

Error Handling

Common error scenarios and handling strategies:

// Handle missing files
try {
  val df = spark.read.format("avro").load("missing_file.avro")
} catch {
  case e: org.apache.spark.sql.AnalysisException => 
    println(s"File not found: ${e.getMessage}")
}

// Handle schema evolution conflicts
try {
  val df = spark.read
    .format("avro")
    .option("avroSchema", strictSchema)
    .load("evolved_schema_files/*.avro")
} catch {
  case e: Exception => 
    println(s"Schema compatibility issue: ${e.getMessage}")
}