Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format with automatic schema evolution support and built-in compression capabilities
—
The Spark Avro connector provides utilities for converting between Avro schemas and Spark SQL schemas through the SchemaConverters object. This enables seamless interoperability between Avro and Spark data type systems, supporting complex nested types and logical type mappings.
The SchemaConverters object provides bidirectional schema conversion capabilities.
@DeveloperApi
object SchemaConverters {
case class SchemaType(dataType: DataType, nullable: Boolean)
def toSqlType(avroSchema: Schema): SchemaType
def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType
def toAvroType(
catalystType: DataType,
nullable: Boolean,
recordName: String,
nameSpace: String
): Schema
}def toSqlType(avroSchema: Schema): SchemaTypeParameters:
avroSchema: The Apache Avro schema to convertReturns: SchemaType containing the Spark SQL DataType and nullability
Usage Example:
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.avro.Schema
val avroSchemaJson = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null},
{"name": "age", "type": "int"},
{"name": "active", "type": "boolean"}
]
}
"""
val avroSchema = new Schema.Parser().parse(avroSchemaJson)
val schemaType = SchemaConverters.toSqlType(avroSchema)
println(s"Spark SQL DataType: ${schemaType.dataType}")
println(s"Is Nullable: ${schemaType.nullable}")
// Result:
// Spark SQL DataType: StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(email,StringType,true), StructField(age,IntegerType,false), StructField(active,BooleanType,false))
// Is Nullable: falsedef toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaTypeAdditional Parameters:
options: Configuration options affecting conversion behaviorSupported Options:
enableStableIdentifiersForUnionType: Use stable field names for union typesUsage Example:
val unionSchema = """
{
"type": "record",
"name": "Event",
"fields": [
{"name": "data", "type": [
{"type": "record", "name": "UserEvent", "fields": [{"name": "userId", "type": "long"}]},
{"type": "record", "name": "SystemEvent", "fields": [{"name": "systemId", "type": "string"}]}
]}
]
}
"""
val schema = new Schema.Parser().parse(unionSchema)
val options = Map("enableStableIdentifiersForUnionType" -> "true")
val schemaType = SchemaConverters.toSqlType(schema, options)def toAvroType(
catalystType: DataType,
nullable: Boolean,
recordName: String,
nameSpace: String
): SchemaParameters:
catalystType: Spark SQL DataType to convertnullable: Whether the root type should be nullablerecordName: Name for the top-level recordnameSpace: Namespace for the Avro schemaReturns: Apache Avro Schema object
Usage Example:
import org.apache.spark.sql.types._
val sparkSchema = StructType(Seq(
StructField("user_id", LongType, nullable = false),
StructField("username", StringType, nullable = false),
StructField("email", StringType, nullable = true),
StructField("created_at", TimestampType, nullable = false),
StructField("preferences", MapType(StringType, StringType), nullable = true)
))
val avroSchema = SchemaConverters.toAvroType(
catalystType = sparkSchema,
nullable = false,
recordName = "UserRecord",
nameSpace = "com.example.avro"
)
println(avroSchema.toString(true))Working with nested structures and arrays:
val complexSchema = StructType(Seq(
StructField("order_id", StringType, nullable = false),
StructField("customer", StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("addresses", ArrayType(StructType(Seq(
StructField("street", StringType, nullable = false),
StructField("city", StringType, nullable = false),
StructField("zip", StringType, nullable = true)
)), containsNull = false), nullable = true)
)), nullable = false),
StructField("items", ArrayType(StructType(Seq(
StructField("sku", StringType, nullable = false),
StructField("quantity", IntegerType, nullable = false),
StructField("price", DecimalType(10, 2), nullable = false)
)), containsNull = false), nullable = false)
))
val avroSchema = SchemaConverters.toAvroType(
complexSchema,
nullable = false,
recordName = "Order",
nameSpace = "com.ecommerce.orders"
)| Spark SQL Type | Avro Type | Notes |
|---|---|---|
| BooleanType | boolean | Direct mapping |
| IntegerType | int | Direct mapping |
| LongType | long | Direct mapping |
| FloatType | float | Direct mapping |
| DoubleType | double | Direct mapping |
| StringType | string | Direct mapping |
| BinaryType | bytes | Direct mapping |
| Spark SQL Type | Avro Type | Notes |
|---|---|---|
| StructType | record | Field names and types preserved |
| ArrayType | array | Element type converted recursively |
| MapType | map | Key must be string, value type converted |
| Spark SQL Type | Avro Type | Notes |
|---|---|---|
| TimestampType | long (timestamp-micros) | Logical type for microsecond precision |
| DateType | int (date) | Logical type for date values |
| DecimalType | bytes (decimal) | Logical type with precision/scale |
Spark SQL nullable fields are converted to Avro union types:
// Spark SQL: StructField("email", StringType, nullable = true)
// Avro: {"name": "email", "type": ["null", "string"], "default": null}import org.apache.spark.sql.types.DecimalType
val decimalSchema = StructType(Seq(
StructField("price", DecimalType(10, 2), nullable = false),
StructField("tax", DecimalType(5, 4), nullable = true)
))
val avroSchema = SchemaConverters.toAvroType(
decimalSchema,
nullable = false,
recordName = "PriceInfo",
nameSpace = "com.example"
)
// Results in Avro schema with decimal logical typesval timeSchema = StructType(Seq(
StructField("created_at", TimestampType, nullable = false),
StructField("event_date", DateType, nullable = false),
StructField("updated_at", TimestampType, nullable = true)
))
val avroSchema = SchemaConverters.toAvroType(
timeSchema,
nullable = false,
recordName = "TimeRecord",
nameSpace = "com.example"
)
// Results in:
// - TimestampType -> long with timestamp-micros logical type
// - DateType -> int with date logical typeBefore conversion, verify that all data types are supported:
import org.apache.spark.sql.avro.AvroUtils
def validateSchema(schema: StructType): Boolean = {
schema.fields.forall(field => AvroUtils.supportsDataType(field.dataType))
}
val isSupported = validateSchema(yourSparkSchema)
if (!isSupported) {
throw new IllegalArgumentException("Schema contains unsupported data types")
}When converting schemas for evolution scenarios:
// Original schema
val v1Schema = StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("name", StringType, nullable = false)
))
// Evolved schema (backward compatible)
val v2Schema = StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("email", StringType, nullable = true), // New optional field
StructField("created_at", TimestampType, nullable = true) // New optional field
))
val v1Avro = SchemaConverters.toAvroType(v1Schema, false, "User", "com.example")
val v2Avro = SchemaConverters.toAvroType(v2Schema, false, "User", "com.example")
// v2Avro can read data written with v1Avro schemaimport org.apache.spark.sql.types.CalendarIntervalType
val unsupportedSchema = StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("interval", CalendarIntervalType, nullable = true) // Not supported
))
try {
val avroSchema = SchemaConverters.toAvroType(
unsupportedSchema,
nullable = false,
recordName = "Test",
nameSpace = "com.example"
)
} catch {
case e: IllegalArgumentException =>
println(s"Conversion failed: ${e.getMessage}")
}val invalidAvroJson = """{"type": "invalid"}"""
try {
val avroSchema = new Schema.Parser().parse(invalidAvroJson)
val sparkType = SchemaConverters.toSqlType(avroSchema)
} catch {
case e: org.apache.avro.SchemaParseException =>
println(s"Invalid Avro schema: ${e.getMessage}")
}Converting existing DataFrame schema to Avro for writing:
val df = spark.table("users")
val sparkSchema = df.schema
val avroSchema = SchemaConverters.toAvroType(
sparkSchema,
nullable = false,
recordName = "UserRecord",
nameSpace = "com.company.users"
)
// Use the schema for writing
df.write
.format("avro")
.option("avroSchema", avroSchema.toString)
.save("users_with_schema")Using converted schemas with external schema registries:
def registerSchema(subject: String, sparkSchema: StructType): Int = {
val avroSchema = SchemaConverters.toAvroType(
sparkSchema,
nullable = false,
recordName = subject.capitalize + "Record",
nameSpace = "com.company.schemas"
)
// Register with schema registry (pseudo-code)
schemaRegistry.register(subject, avroSchema)
}
val schemaId = registerSchema("user-events", df.schema)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-avro-2-12