CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-avro-2-12

Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format with automatic schema evolution support and built-in compression capabilities

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration Options

The Spark Avro connector provides comprehensive configuration options through the AvroOptions class and related constants. These options control various aspects of Avro processing including compression, schema handling, field matching, and error handling.

AvroOptions Class

The main configuration class for Avro operations.

class AvroOptions(
  parameters: CaseInsensitiveMap[String],
  conf: Configuration
) extends FileSourceOptions(parameters) {
  
  val schema: Option[Schema]
  val positionalFieldMatching: Boolean
  val recordName: String
  val recordNamespace: String
  val compression: String
  val parseMode: ParseMode
  val datetimeRebaseModeInRead: String
  val useStableIdForUnionType: Boolean
}

Factory Method

object AvroOptions {
  def apply(parameters: Map[String, String]): AvroOptions
}

Usage Example:

import org.apache.spark.sql.avro.AvroOptions

val options = AvroOptions(Map(
  "compression" -> "snappy",
  "recordName" -> "MyRecord",
  "recordNamespace" -> "com.example.avro"
))

println(s"Compression: ${options.compression}")
println(s"Record Name: ${options.recordName}")

Configuration Constants

All configuration option keys are defined as constants in the AvroOptions companion object.

object AvroOptions {
  val COMPRESSION: String
  val RECORD_NAME: String  
  val RECORD_NAMESPACE: String
  val AVRO_SCHEMA: String
  val AVRO_SCHEMA_URL: String
  val POSITIONAL_FIELD_MATCHING: String
  val DATETIME_REBASE_MODE: String
  val MODE: String
  val IGNORE_EXTENSION: String  // Deprecated
  val STABLE_ID_FOR_UNION_TYPE: String
}

Schema Configuration

avroSchema

Specify a custom Avro schema for reading or writing.

Option Key: AvroOptions.AVRO_SCHEMA ("avroSchema")

Usage in Reading:

val customSchema = """
{
  "type": "record",
  "name": "User", 
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": ["null", "string"], "default": null}
  ]
}
"""

val df = spark.read
  .format("avro")
  .option(AvroOptions.AVRO_SCHEMA, customSchema)
  .load("path/to/avro/files")

Usage in Writing:

df.write
  .format("avro")
  .option(AvroOptions.AVRO_SCHEMA, customSchema)
  .save("path/to/output")

avroSchemaUrl

Load Avro schema from a URL or file path.

Option Key: AvroOptions.AVRO_SCHEMA_URL ("avroSchemaUrl")

val df = spark.read
  .format("avro")
  .option(AvroOptions.AVRO_SCHEMA_URL, "hdfs://cluster/schemas/user.avsc")
  .load("path/to/avro/files")

// Also works with local files
val df2 = spark.read
  .format("avro") 
  .option(AvroOptions.AVRO_SCHEMA_URL, "file:///local/path/schema.avsc")
  .load("path/to/avro/files")

Record Configuration

recordName

Set the top-level record name for writing Avro files.

Option Key: AvroOptions.RECORD_NAME ("recordName")
Default: "topLevelRecord"

df.write
  .format("avro")
  .option(AvroOptions.RECORD_NAME, "UserRecord")
  .save("path/to/output")

recordNamespace

Set the namespace for the top-level record.

Option Key: AvroOptions.RECORD_NAMESPACE ("recordNamespace")
Default: "" (empty string)

df.write
  .format("avro")
  .option(AvroOptions.RECORD_NAME, "User")
  .option(AvroOptions.RECORD_NAMESPACE, "com.example.avro")
  .save("path/to/output")

Compression Configuration

compression

Set the compression codec for writing Avro files.

Option Key: AvroOptions.COMPRESSION ("compression")
Default: Value from spark.sql.avro.compression.codec or "snappy"

Supported Codecs:

  • uncompressed: No compression
  • snappy: Snappy compression (default)
  • deflate: Deflate compression
  • bzip2: Bzip2 compression
  • xz: XZ compression
  • zstandard: Zstandard compression
// Different compression options
df.write.format("avro").option(AvroOptions.COMPRESSION, "uncompressed").save("uncompressed")
df.write.format("avro").option(AvroOptions.COMPRESSION, "snappy").save("snappy") 
df.write.format("avro").option(AvroOptions.COMPRESSION, "deflate").save("deflate")
df.write.format("avro").option(AvroOptions.COMPRESSION, "bzip2").save("bzip2")
df.write.format("avro").option(AvroOptions.COMPRESSION, "xz").save("xz")
df.write.format("avro").option(AvroOptions.COMPRESSION, "zstandard").save("zstandard")

Field Matching Configuration

positionalFieldMatching

Control how fields are matched between Spark and Avro schemas.

Option Key: AvroOptions.POSITIONAL_FIELD_MATCHING ("positionalFieldMatching")
Default: false

Values:

  • false: Match fields by name (default)
  • true: Match fields by position
// Match by position instead of name
val df = spark.read
  .format("avro")
  .option(AvroOptions.POSITIONAL_FIELD_MATCHING, "true")
  .load("path/to/avro/files")

// Useful when field names don't match but structure is identical
val sparkSchema = StructType(Seq(
  StructField("user_id", LongType),      // Position 0
  StructField("full_name", StringType),  // Position 1  
  StructField("email_addr", StringType)  // Position 2
))

// Avro schema has different field names but same positions:
// {"name": "id", "type": "long"}        // Position 0
// {"name": "name", "type": "string"}    // Position 1
// {"name": "email", "type": "string"}   // Position 2

Error Handling Configuration

mode

Control how parsing errors are handled.

Option Key: AvroOptions.MODE ("mode")
Default: FAILFAST

Values:

  • FAILFAST: Throw exception on first error (default)
  • PERMISSIVE: Set corrupt records to null, continue processing
  • DROPMALFORMED: Drop corrupt records, continue processing
// Drop malformed records
val df = spark.read
  .format("avro")
  .option(AvroOptions.MODE, "DROPMALFORMED") 
  .load("path/to/avro/files")

// Keep corrupt records as null
val df2 = spark.read
  .format("avro")
  .option(AvroOptions.MODE, "PERMISSIVE")
  .load("path/to/avro/files")

Date/Time Configuration

datetimeRebaseMode

Control rebasing of DATE and TIMESTAMP values between Julian and Proleptic Gregorian calendars.

Option Key: AvroOptions.DATETIME_REBASE_MODE ("datetimeRebaseMode")
Default: Value from spark.sql.avro.datetimeRebaseModeInRead

Values:

  • EXCEPTION: Throw exception for dates requiring rebasing
  • LEGACY: Use legacy Julian calendar behavior
  • CORRECTED: Apply Proleptic Gregorian calendar correction
val df = spark.read
  .format("avro")
  .option(AvroOptions.DATETIME_REBASE_MODE, "CORRECTED")
  .load("path/to/historical/avro/files")

Union Type Configuration

enableStableIdentifiersForUnionType

Control field naming for Avro union types when converting to Spark SQL.

Option Key: AvroOptions.STABLE_ID_FOR_UNION_TYPE ("enableStableIdentifiersForUnionType")
Default: false

Values:

  • false: Use dynamic field names (default)
  • true: Use stable, consistent field names based on type
val unionSchema = """
{
  "type": "record",
  "name": "Event",
  "fields": [
    {"name": "data", "type": [
      {"type": "record", "name": "UserEvent", "fields": [{"name": "userId", "type": "long"}]},
      {"type": "record", "name": "SystemEvent", "fields": [{"name": "systemId", "type": "string"}]}
    ]}
  ]  
}
"""

val df = spark.read
  .format("avro")
  .option(AvroOptions.AVRO_SCHEMA, unionSchema)
  .option(AvroOptions.STABLE_ID_FOR_UNION_TYPE, "true")
  .load("path/to/union/data")

// Results in stable field names like "data.member_userevent" and "data.member_systemevent"

Deprecated Options

ignoreExtension (Deprecated)

Option Key: AvroOptions.IGNORE_EXTENSION ("ignoreExtension")
Status: Deprecated in Spark 3.0, use pathGlobFilter instead

// Deprecated way
val df = spark.read
  .format("avro")
  .option(AvroOptions.IGNORE_EXTENSION, "true")
  .load("path/to/files")

// Modern way
val df2 = spark.read
  .format("avro")
  .option("pathGlobFilter", "*.data")  // Or other pattern
  .load("path/to/files")

Global Configuration

Some Avro behavior can be controlled through Spark SQL configuration:

spark.sql.avro.compression.codec

Default compression codec for Avro files.

spark.conf.set("spark.sql.avro.compression.codec", "zstandard")

// All subsequent Avro writes will use zstandard compression by default
df.write.format("avro").save("path/to/output")

spark.sql.avro.deflate.level

Compression level for deflate codec (1-9).

spark.conf.set("spark.sql.avro.deflate.level", "6")

df.write
  .format("avro")
  .option(AvroOptions.COMPRESSION, "deflate")
  .save("path/to/output")

spark.sql.avro.datetimeRebaseModeInRead

Default datetime rebase mode for reading.

spark.conf.set("spark.sql.avro.datetimeRebaseModeInRead", "CORRECTED")

Option Combinations

Complete Example

Combining multiple options for complex scenarios:

val complexDF = spark.read
  .format("avro")
  .option(AvroOptions.AVRO_SCHEMA_URL, "hdfs://cluster/schemas/evolved-schema.avsc")
  .option(AvroOptions.MODE, "PERMISSIVE")
  .option(AvroOptions.POSITIONAL_FIELD_MATCHING, "false")
  .option(AvroOptions.DATETIME_REBASE_MODE, "CORRECTED")
  .option(AvroOptions.STABLE_ID_FOR_UNION_TYPE, "true")
  .option("pathGlobFilter", "*.avro")
  .load("path/to/avro/files")

complexDF.write
  .format("avro")
  .option(AvroOptions.COMPRESSION, "zstandard")
  .option(AvroOptions.RECORD_NAME, "ProcessedEvent")
  .option(AvroOptions.RECORD_NAMESPACE, "com.company.events")
  .mode("overwrite")
  .save("path/to/processed/output")

Schema Evolution Scenario

Configuration for reading old data with new schema:

val evolvedSchema = """
{
  "type": "record",
  "name": "UserV2",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "phone", "type": ["null", "string"], "default": null},
    {"name": "created_at", "type": ["null", "long"], "default": null}
  ]
}
"""

val migratedDF = spark.read
  .format("avro")
  .option(AvroOptions.AVRO_SCHEMA, evolvedSchema)
  .option(AvroOptions.MODE, "PERMISSIVE")  // Handle missing fields gracefully
  .option(AvroOptions.DATETIME_REBASE_MODE, "CORRECTED")
  .load("path/to/legacy/user/data")

Performance Considerations

Compression Trade-offs

// Fastest compression (good for temporary data)
df.write.format("avro").option(AvroOptions.COMPRESSION, "snappy").save("temp")

// Best compression ratio (good for archival)  
df.write.format("avro").option(AvroOptions.COMPRESSION, "zstandard").save("archive")

// No compression (fastest write, largest files)
df.write.format("avro").option(AvroOptions.COMPRESSION, "uncompressed").save("staging")

Schema Caching

When repeatedly using the same schema:

// Cache schema in broadcast variable for reuse
val schemaBC = spark.sparkContext.broadcast(complexSchema)

// Use in multiple operations
df1.write.format("avro").option(AvroOptions.AVRO_SCHEMA, schemaBC.value).save("output1")
df2.write.format("avro").option(AvroOptions.AVRO_SCHEMA, schemaBC.value).save("output2")

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-avro-2-12

docs

binary-functions.md

configuration.md

file-operations.md

index.md

schema-conversion.md

tile.json