CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-avro-2-12

Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format with automatic schema evolution support and built-in compression capabilities

Pending
Overview
Eval results
Files

file-operations.mddocs/

File Operations

The Spark Avro connector provides DataSource API integration for reading and writing Avro files directly through Spark SQL with automatic schema inference, partitioning support, and optimized I/O performance.

Reading Avro Files

Basic File Reading

spark.read.format("avro").load(path: String): DataFrame
spark.read.format("avro").load(paths: String*): DataFrame

Basic Usage:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("AvroReader")
  .getOrCreate()

// Read single file or directory
val df = spark.read.format("avro").load("path/to/file.avro")

// Read multiple paths
val df2 = spark.read.format("avro").load(
  "path/to/file1.avro",
  "path/to/file2.avro",
  "path/to/directory"
)

Schema Specification

spark.read.format("avro").schema(schema: StructType): DataFrameReader

Usage with Custom Schema:

import org.apache.spark.sql.types._

val customSchema = StructType(Seq(
  StructField("id", LongType, nullable = false),
  StructField("name", StringType, nullable = true),
  StructField("email", StringType, nullable = true)
))

val df = spark.read
  .format("avro")
  .schema(customSchema)
  .load("path/to/avro/files")

Reading Options

spark.read.format("avro").option(key: String, value: String): DataFrameReader
spark.read.format("avro").options(options: Map[String, String]): DataFrameReader

Available Options:

  • avroSchema: Specify evolved Avro schema for reading
  • avroSchemaUrl: URL to load Avro schema from
  • mode: Parse mode for handling corrupt records (PERMISSIVE, DROPMALFORMED, FAILFAST)
  • positionalFieldMatching: Match fields by position instead of name
  • datetimeRebaseMode: Rebase DATE/TIMESTAMP values (EXCEPTION, LEGACY, CORRECTED)

Usage Example:

val df = spark.read
  .format("avro")
  .option("mode", "DROPMALFORMED")
  .option("avroSchema", evolvedSchemaJson)
  .option("positionalFieldMatching", "true")
  .load("path/to/avro/files")

Writing Avro Files

Basic File Writing

DataFrame.write.format("avro").save(path: String): Unit
DataFrame.write.format("avro").save(): DataFrameWriter[Row]

Basic Usage:

// Write DataFrame to Avro files
df.write
  .format("avro")
  .save("path/to/output")

// Write with mode specification
df.write
  .format("avro")
  .mode("overwrite")
  .save("path/to/output")

Write Modes

DataFrame.write.format("avro").mode(saveMode: String): DataFrameWriter[Row]
DataFrame.write.format("avro").mode(saveMode: SaveMode): DataFrameWriter[Row]

Supported modes:

  • overwrite: Overwrite existing data
  • append: Append to existing data
  • ignore: Ignore if data exists
  • error (default): Throw error if data exists

Partitioning

DataFrame.write.format("avro").partitionBy(colNames: String*): DataFrameWriter[Row]

Usage Example:

df.write
  .format("avro")
  .partitionBy("year", "month")
  .save("path/to/partitioned/output")

Writing Options

DataFrame.write.format("avro").option(key: String, value: String): DataFrameWriter[Row]
DataFrame.write.format("avro").options(options: Map[String, String]): DataFrameWriter[Row]

Available Options:

  • compression: Compression codec (uncompressed, snappy, deflate, bzip2, xz, zstandard)
  • recordName: Top-level record name (default: "topLevelRecord")
  • recordNamespace: Record namespace (default: "")
  • avroSchema: Custom output Avro schema

Usage Example:

df.write
  .format("avro")
  .option("compression", "snappy")
  .option("recordName", "UserRecord")
  .option("recordNamespace", "com.example.avro")
  .save("path/to/compressed/output")

Advanced Features

Schema Evolution

Reading with evolved schemas allows you to process Avro files with compatible but different schemas:

val evolvedSchema = """
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "created_at", "type": "long", "default": 0}
  ]
}
"""

val df = spark.read
  .format("avro")
  .option("avroSchema", evolvedSchema)
  .load("path/to/older/avro/files")

Predicate Pushdown

The connector supports predicate pushdown for efficient querying:

val filteredDF = spark.read
  .format("avro")
  .load("path/to/large/dataset")
  .filter($"created_date" >= "2023-01-01")
  .filter($"status" === "active")

Compression Support

Automatic compression detection on read and configurable compression on write:

// Writing with different compression codecs
df.write.format("avro").option("compression", "snappy").save("snappy-output")
df.write.format("avro").option("compression", "deflate").save("deflate-output")
df.write.format("avro").option("compression", "bzip2").save("bzip2-output")

Error Handling

Corrupt Record Handling

// Drop corrupt records
val cleanDF = spark.read
  .format("avro")  
  .option("mode", "DROPMALFORMED")
  .load("path/to/files")

// Collect corrupt records in a column
val dfWithCorrupt = spark.read
  .format("avro")
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "_corrupt_record")
  .load("path/to/files")

File Extension Handling

// Include files without .avro extension
val df = spark.read
  .format("avro")
  .option("ignoreExtension", "true")  // deprecated - use pathGlobFilter instead
  .load("path/to/mixed/files")

// Modern approach using pathGlobFilter
val df2 = spark.read
  .format("avro")
  .option("pathGlobFilter", "*.data")
  .load("path/to/files")

DataSource V2 Integration

The connector also provides DataSource V2 implementation for enhanced performance:

class AvroDataSourceV2 extends FileDataSourceV2 {
  def shortName(): String  // Returns "avro"
  def getTable(options: CaseInsensitiveStringMap): Table
  def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table
}

The DataSource V2 API provides the same functionality with improved performance characteristics and better integration with Spark's Catalyst optimizer.

Supporting V2 Classes

class AvroTable extends FileTable
class AvroScan extends FileScan  
class AvroScanBuilder extends FileScanBuilder
class AvroPartitionReaderFactory extends FilePartitionReaderFactory
class AvroWrite extends FileWrite

These classes work together to provide the complete DataSource V2 implementation, handling table metadata, scan planning, partition reading, and write operations.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-avro-2-12

docs

binary-functions.md

configuration.md

file-operations.md

index.md

schema-conversion.md

tile.json