CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-avro-2-12

Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format with automatic schema evolution support and built-in compression capabilities

Pending
Overview
Eval results
Files

schema-conversion.mddocs/

Schema Conversion

The Spark Avro connector provides utilities for converting between Avro schemas and Spark SQL schemas through the SchemaConverters object. This enables seamless interoperability between Avro and Spark data type systems, supporting complex nested types and logical type mappings.

SchemaConverters Object

The SchemaConverters object provides bidirectional schema conversion capabilities.

@DeveloperApi
object SchemaConverters {
  case class SchemaType(dataType: DataType, nullable: Boolean)
  
  def toSqlType(avroSchema: Schema): SchemaType
  def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType
  def toAvroType(
    catalystType: DataType,
    nullable: Boolean, 
    recordName: String,
    nameSpace: String
  ): Schema
}

Converting Avro to Spark SQL Schema

Basic Conversion

def toSqlType(avroSchema: Schema): SchemaType

Parameters:

  • avroSchema: The Apache Avro schema to convert

Returns: SchemaType containing the Spark SQL DataType and nullability

Usage Example:

import org.apache.spark.sql.avro.SchemaConverters
import org.apache.avro.Schema

val avroSchemaJson = """
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": ["null", "string"], "default": null},
    {"name": "age", "type": "int"},
    {"name": "active", "type": "boolean"}
  ]
}
"""

val avroSchema = new Schema.Parser().parse(avroSchemaJson)
val schemaType = SchemaConverters.toSqlType(avroSchema)

println(s"Spark SQL DataType: ${schemaType.dataType}")
println(s"Is Nullable: ${schemaType.nullable}")

// Result:
// Spark SQL DataType: StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(email,StringType,true), StructField(age,IntegerType,false), StructField(active,BooleanType,false))
// Is Nullable: false

Conversion with Options

def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType

Additional Parameters:

  • options: Configuration options affecting conversion behavior

Supported Options:

  • enableStableIdentifiersForUnionType: Use stable field names for union types

Usage Example:

val unionSchema = """
{
  "type": "record",
  "name": "Event",
  "fields": [
    {"name": "data", "type": [
      {"type": "record", "name": "UserEvent", "fields": [{"name": "userId", "type": "long"}]},
      {"type": "record", "name": "SystemEvent", "fields": [{"name": "systemId", "type": "string"}]}
    ]}
  ]
}
"""

val schema = new Schema.Parser().parse(unionSchema)
val options = Map("enableStableIdentifiersForUnionType" -> "true")
val schemaType = SchemaConverters.toSqlType(schema, options)

Converting Spark SQL to Avro Schema

Basic Conversion

def toAvroType(
  catalystType: DataType,
  nullable: Boolean,
  recordName: String, 
  nameSpace: String
): Schema

Parameters:

  • catalystType: Spark SQL DataType to convert
  • nullable: Whether the root type should be nullable
  • recordName: Name for the top-level record
  • nameSpace: Namespace for the Avro schema

Returns: Apache Avro Schema object

Usage Example:

import org.apache.spark.sql.types._

val sparkSchema = StructType(Seq(
  StructField("user_id", LongType, nullable = false),
  StructField("username", StringType, nullable = false),  
  StructField("email", StringType, nullable = true),
  StructField("created_at", TimestampType, nullable = false),
  StructField("preferences", MapType(StringType, StringType), nullable = true)
))

val avroSchema = SchemaConverters.toAvroType(
  catalystType = sparkSchema,
  nullable = false,
  recordName = "UserRecord",
  nameSpace = "com.example.avro"
)

println(avroSchema.toString(true))

Complex Type Conversion

Working with nested structures and arrays:

val complexSchema = StructType(Seq(
  StructField("order_id", StringType, nullable = false),
  StructField("customer", StructType(Seq(
    StructField("id", LongType, nullable = false),
    StructField("name", StringType, nullable = false),
    StructField("addresses", ArrayType(StructType(Seq(
      StructField("street", StringType, nullable = false),
      StructField("city", StringType, nullable = false),
      StructField("zip", StringType, nullable = true)
    )), containsNull = false), nullable = true)
  )), nullable = false),
  StructField("items", ArrayType(StructType(Seq(
    StructField("sku", StringType, nullable = false),
    StructField("quantity", IntegerType, nullable = false),
    StructField("price", DecimalType(10, 2), nullable = false)
  )), containsNull = false), nullable = false)
))

val avroSchema = SchemaConverters.toAvroType(
  complexSchema,
  nullable = false,
  recordName = "Order",
  nameSpace = "com.ecommerce.orders"
)

Type Mapping Reference

Primitive Types

Spark SQL TypeAvro TypeNotes
BooleanTypebooleanDirect mapping
IntegerTypeintDirect mapping
LongTypelongDirect mapping
FloatTypefloatDirect mapping
DoubleTypedoubleDirect mapping
StringTypestringDirect mapping
BinaryTypebytesDirect mapping

Complex Types

Spark SQL TypeAvro TypeNotes
StructTyperecordField names and types preserved
ArrayTypearrayElement type converted recursively
MapTypemapKey must be string, value type converted

Special Types

Spark SQL TypeAvro TypeNotes
TimestampTypelong (timestamp-micros)Logical type for microsecond precision
DateTypeint (date)Logical type for date values
DecimalTypebytes (decimal)Logical type with precision/scale

Nullable Types

Spark SQL nullable fields are converted to Avro union types:

// Spark SQL: StructField("email", StringType, nullable = true)
// Avro: {"name": "email", "type": ["null", "string"], "default": null}

Working with Logical Types

Decimal Types

import org.apache.spark.sql.types.DecimalType

val decimalSchema = StructType(Seq(
  StructField("price", DecimalType(10, 2), nullable = false),
  StructField("tax", DecimalType(5, 4), nullable = true)
))

val avroSchema = SchemaConverters.toAvroType(
  decimalSchema,
  nullable = false,
  recordName = "PriceInfo",
  nameSpace = "com.example"
)

// Results in Avro schema with decimal logical types

Timestamp and Date Types

val timeSchema = StructType(Seq(
  StructField("created_at", TimestampType, nullable = false),
  StructField("event_date", DateType, nullable = false),
  StructField("updated_at", TimestampType, nullable = true)
))

val avroSchema = SchemaConverters.toAvroType(
  timeSchema,
  nullable = false,
  recordName = "TimeRecord", 
  nameSpace = "com.example"
)

// Results in:
// - TimestampType -> long with timestamp-micros logical type
// - DateType -> int with date logical type

Schema Validation and Compatibility

Checking Data Type Support

Before conversion, verify that all data types are supported:

import org.apache.spark.sql.avro.AvroUtils

def validateSchema(schema: StructType): Boolean = {
  schema.fields.forall(field => AvroUtils.supportsDataType(field.dataType))
}

val isSupported = validateSchema(yourSparkSchema)
if (!isSupported) {
  throw new IllegalArgumentException("Schema contains unsupported data types")
}

Schema Evolution Considerations

When converting schemas for evolution scenarios:

// Original schema
val v1Schema = StructType(Seq(
  StructField("id", LongType, nullable = false),
  StructField("name", StringType, nullable = false)
))

// Evolved schema (backward compatible)
val v2Schema = StructType(Seq(
  StructField("id", LongType, nullable = false),
  StructField("name", StringType, nullable = false),
  StructField("email", StringType, nullable = true), // New optional field
  StructField("created_at", TimestampType, nullable = true) // New optional field
))

val v1Avro = SchemaConverters.toAvroType(v1Schema, false, "User", "com.example")
val v2Avro = SchemaConverters.toAvroType(v2Schema, false, "User", "com.example") 

// v2Avro can read data written with v1Avro schema

Error Handling

Unsupported Type Conversion

import org.apache.spark.sql.types.CalendarIntervalType

val unsupportedSchema = StructType(Seq(
  StructField("id", LongType, nullable = false),
  StructField("interval", CalendarIntervalType, nullable = true) // Not supported
))

try {
  val avroSchema = SchemaConverters.toAvroType(
    unsupportedSchema,
    nullable = false,
    recordName = "Test",
    nameSpace = "com.example"
  )
} catch {
  case e: IllegalArgumentException =>
    println(s"Conversion failed: ${e.getMessage}")
}

Schema Parse Errors

val invalidAvroJson = """{"type": "invalid"}"""

try {
  val avroSchema = new Schema.Parser().parse(invalidAvroJson)
  val sparkType = SchemaConverters.toSqlType(avroSchema)
} catch {
  case e: org.apache.avro.SchemaParseException =>
    println(s"Invalid Avro schema: ${e.getMessage}")
}

Practical Examples

DataFrame Schema Conversion

Converting existing DataFrame schema to Avro for writing:

val df = spark.table("users")
val sparkSchema = df.schema

val avroSchema = SchemaConverters.toAvroType(
  sparkSchema,
  nullable = false,
  recordName = "UserRecord",
  nameSpace = "com.company.users"
)

// Use the schema for writing
df.write
  .format("avro")
  .option("avroSchema", avroSchema.toString)
  .save("users_with_schema")

Schema Registry Integration

Using converted schemas with external schema registries:

def registerSchema(subject: String, sparkSchema: StructType): Int = {
  val avroSchema = SchemaConverters.toAvroType(
    sparkSchema,
    nullable = false,
    recordName = subject.capitalize + "Record",
    nameSpace = "com.company.schemas"
  )
  
  // Register with schema registry (pseudo-code)
  schemaRegistry.register(subject, avroSchema)
}

val schemaId = registerSchema("user-events", df.schema)

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-avro-2-12

docs

binary-functions.md

configuration.md

file-operations.md

index.md

schema-conversion.md

tile.json