Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format with automatic schema evolution support and built-in compression capabilities
npx @tessl/cli install tessl/maven-org-apache-spark--spark-avro-2-12@3.5.00
# Apache Spark Avro Connector
1
2
Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format, enabling efficient reading, writing, and processing of Avro files with automatic schema evolution support and built-in compression capabilities.
3
4
## Package Information
5
6
- **Package Name**: spark-avro_2.12
7
- **Package Type**: Maven
8
- **Coordinate**: org.apache.spark:spark-avro_2.12
9
- **Version**: 3.5.6
10
- **Language**: Scala (with Java interoperability)
11
- **Installation**: Add to your build.gradle or pom.xml:
12
13
Maven:
14
```xml
15
<dependency>
16
<groupId>org.apache.spark</groupId>
17
<artifactId>spark-avro_2.12</artifactId>
18
<version>3.5.6</version>
19
</dependency>
20
```
21
22
SBT:
23
```scala
24
libraryDependencies += "org.apache.spark" %% "spark-avro" % "3.5.6"
25
```
26
27
## Core Imports
28
29
```scala
30
import org.apache.spark.sql.avro.functions._
31
import org.apache.spark.sql.SparkSession
32
import org.apache.spark.sql.DataFrame
33
```
34
35
For DataSource API:
36
```scala
37
import org.apache.spark.sql.avro.AvroOptions
38
```
39
40
## Basic Usage
41
42
### Reading Avro Files
43
44
```scala
45
import org.apache.spark.sql.SparkSession
46
47
val spark = SparkSession.builder()
48
.appName("AvroExample")
49
.getOrCreate()
50
51
// Read Avro files using DataSource API
52
val df = spark.read
53
.format("avro")
54
.load("path/to/avro/files")
55
56
df.show()
57
```
58
59
### Writing Avro Files
60
61
```scala
62
// Write DataFrame to Avro format
63
df.write
64
.format("avro")
65
.option("compression", "snappy")
66
.save("path/to/output")
67
```
68
69
### Processing Binary Avro Data
70
71
```scala
72
import org.apache.spark.sql.avro.functions._
73
import org.apache.spark.sql.functions._
74
75
// Convert binary Avro column to structured data
76
val schema = """
77
{
78
"type": "record",
79
"name": "User",
80
"fields": [
81
{"name": "name", "type": "string"},
82
{"name": "age", "type": "int"}
83
]
84
}
85
"""
86
87
val decodedDF = df.select(
88
from_avro(col("avro_bytes"), schema).as("user_data")
89
)
90
91
// Convert structured data to binary Avro
92
val encodedDF = df.select(
93
to_avro(struct(col("name"), col("age"))).as("avro_bytes")
94
)
95
```
96
97
## Architecture
98
99
The Spark Avro connector is built around several key components:
100
101
- **DataSource Integration**: Native Spark SQL DataSource v1 and v2 implementations for seamless file I/O
102
- **Function API**: SQL functions for binary Avro data transformation (`from_avro`, `to_avro`)
103
- **Schema Conversion**: Bidirectional schema mapping between Avro and Spark SQL data types
104
- **Serialization Engine**: High-performance Catalyst expression-based serializers and deserializers
105
- **Configuration System**: Comprehensive options for compression, schema evolution, and compatibility settings
106
107
## Capabilities
108
109
### File I/O Operations
110
111
DataSource API integration for reading and writing Avro files directly through Spark SQL with automatic schema inference and support for partitioned datasets.
112
113
```scala { .api }
114
// DataSource read/write through format("avro")
115
spark.read.format("avro"): DataFrameReader
116
DataFrame.write.format("avro"): DataFrameWriter[Row]
117
```
118
119
[File Operations](./file-operations.md)
120
121
### Binary Data Processing
122
123
Core functions for converting between binary Avro data and Spark SQL structures, enabling processing of Avro-encoded columns within DataFrames.
124
125
```scala { .api }
126
def from_avro(data: Column, jsonFormatSchema: String): Column
127
def from_avro(data: Column, jsonFormatSchema: String, options: java.util.Map[String, String]): Column
128
def to_avro(data: Column): Column
129
def to_avro(data: Column, jsonFormatSchema: String): Column
130
```
131
132
[Binary Data Functions](./binary-functions.md)
133
134
### Schema Conversion
135
136
Utilities for converting between Avro schemas and Spark SQL schemas, supporting complex nested types and logical type mappings.
137
138
```scala { .api }
139
object SchemaConverters {
140
case class SchemaType(dataType: DataType, nullable: Boolean)
141
142
def toSqlType(avroSchema: Schema): SchemaType
143
def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType
144
def toAvroType(
145
catalystType: DataType,
146
nullable: Boolean,
147
recordName: String,
148
nameSpace: String
149
): Schema
150
}
151
```
152
153
[Schema Conversion](./schema-conversion.md)
154
155
### Configuration Options
156
157
Comprehensive configuration system for controlling Avro processing behavior including compression, schema evolution, field matching, and error handling.
158
159
```scala { .api }
160
object AvroOptions {
161
val COMPRESSION: String
162
val RECORD_NAME: String
163
val RECORD_NAMESPACE: String
164
val AVRO_SCHEMA: String
165
val AVRO_SCHEMA_URL: String
166
val POSITIONAL_FIELD_MATCHING: String
167
val DATETIME_REBASE_MODE: String
168
169
def apply(parameters: Map[String, String]): AvroOptions
170
}
171
```
172
173
[Configuration](./configuration.md)
174
175
### Utility Functions
176
177
Core utility functions for schema inference, type validation, and write preparation.
178
179
```scala { .api }
180
object AvroUtils {
181
def inferSchema(
182
spark: SparkSession,
183
options: Map[String, String],
184
files: Seq[FileStatus]
185
): Option[StructType]
186
187
def supportsDataType(dataType: DataType): Boolean
188
189
def prepareWrite(
190
sqlConf: SQLConf,
191
job: Job,
192
options: Map[String, String],
193
dataSchema: StructType
194
): OutputWriterFactory
195
}
196
```
197
198
## Types
199
200
### Core Expression Types
201
202
```scala { .api }
203
class AvroDataToCatalyst(
204
child: Expression,
205
jsonFormatSchema: String,
206
options: Map[String, String]
207
) extends UnaryExpression
208
209
class CatalystDataToAvro(
210
child: Expression,
211
jsonFormatSchema: Option[String]
212
) extends UnaryExpression
213
```
214
215
### Option Types
216
217
```scala { .api }
218
class AvroOptions(
219
parameters: CaseInsensitiveMap[String],
220
conf: Configuration
221
) extends FileSourceOptions(parameters) {
222
val schema: Option[Schema]
223
val positionalFieldMatching: Boolean
224
val recordName: String
225
val recordNamespace: String
226
val compression: String
227
val parseMode: ParseMode
228
val datetimeRebaseModeInRead: String
229
}
230
```
231
232
### Utility Types
233
234
```scala { .api }
235
// Note: Part of AvroUtils object
236
case class AvroMatchedField(
237
catalystField: StructField,
238
catalystPosition: Int,
239
avroField: Schema.Field
240
)
241
242
class AvroSchemaHelper(
243
avroSchema: Schema,
244
catalystSchema: StructType,
245
avroPath: Seq[String],
246
catalystPath: Seq[String],
247
positionalFieldMatch: Boolean
248
) {
249
val matchedFields: Seq[AvroMatchedField]
250
def getAvroField(fieldName: String, catalystPos: Int): Option[Schema.Field]
251
def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit
252
def validateNoExtraAvroFields(): Unit
253
}
254
```
255
256
## Error Handling
257
258
The connector throws specific exceptions for schema incompatibilities and processing errors:
259
260
```scala { .api }
261
// Private to avro package
262
private[avro] class IncompatibleSchemaException(
263
msg: String,
264
ex: Throwable = null
265
) extends Exception(msg, ex)
266
267
private[avro] class UnsupportedAvroTypeException(msg: String) extends Exception(msg)
268
```
269
270
Common error scenarios:
271
- Schema mismatch between Avro and Spark SQL schemas
272
- Invalid JSON schema format
273
- Unsupported data type conversions
274
- Corrupt or unreadable Avro files
275
276
## Deprecated Functionality
277
278
### Package Object Functions (Deprecated since 3.0.0)
279
280
The package object provides deprecated convenience functions that delegate to the main functions API:
281
282
```scala { .api }
283
package org.apache.spark.sql.avro {
284
@deprecated("Please use 'org.apache.spark.sql.avro.functions.from_avro' instead.", "3.0.0")
285
def from_avro(data: Column, jsonFormatSchema: String): Column
286
287
@deprecated("Please use 'org.apache.spark.sql.avro.functions.to_avro' instead.", "3.0.0")
288
def to_avro(data: Column): Column
289
}
290
```
291
292
**Migration Example:**
293
```scala
294
// Old deprecated usage
295
import org.apache.spark.sql.avro._
296
val result = from_avro(col("data"), schema)
297
298
// New recommended usage
299
import org.apache.spark.sql.avro.functions._
300
val result = from_avro(col("data"), schema)
301
```
302
303
### Java Integration Components
304
305
For Java-based MapReduce integrations:
306
307
```scala { .api }
308
class SparkAvroKeyOutputFormat extends OutputFormat[AvroKey[GenericRecord], NullWritable]
309
```