0
# Spark Avro
1
2
Apache Spark SQL connector for reading and writing Avro data. The spark-avro library provides comprehensive functionality for working with Apache Avro format in Spark SQL applications, including serialization, deserialization, and schema conversion capabilities for high-performance big data processing scenarios where Avro's schema evolution and compact binary serialization are essential.
3
4
## Package Information
5
6
- **Package Name**: spark-avro_2.11
7
- **Package Type**: maven
8
- **Language**: Scala (with Java support classes)
9
- **Maven Coordinates**: `org.apache.spark:spark-avro_2.11:2.4.8`
10
- **Installation**:
11
- Maven: Add dependency to pom.xml
12
- Spark Submit: Use `--packages org.apache.spark:spark-avro_2.11:2.4.8`
13
- Spark Shell: Use `--packages org.apache.spark:spark-avro_2.11:2.4.8`
14
15
**Maven Dependency:**
16
```xml
17
<dependency>
18
<groupId>org.apache.spark</groupId>
19
<artifactId>spark-avro_2.11</artifactId>
20
<version>2.4.8</version>
21
</dependency>
22
```
23
24
**Note:** The spark-avro module is external and not included in `spark-submit` or `spark-shell` by default.
25
26
## Core Imports
27
28
```scala
29
import org.apache.spark.sql.avro._
30
```
31
32
For DataFrame operations:
33
34
```scala
35
import org.apache.spark.sql.{SparkSession, DataFrame}
36
import org.apache.spark.sql.functions.col
37
```
38
39
## Basic Usage
40
41
```scala
42
import org.apache.spark.sql.{SparkSession, DataFrame}
43
import org.apache.spark.sql.avro._
44
import org.apache.spark.sql.functions.col
45
46
// Initialize SparkSession
47
val spark = SparkSession.builder()
48
.appName("Spark Avro Example")
49
.config("spark.sql.extensions", "org.apache.spark.sql.avro.AvroExtensions")
50
.getOrCreate()
51
52
import spark.implicits._
53
54
// Read from Avro files
55
val df = spark.read
56
.format("avro")
57
.load("path/to/avro/files")
58
59
// Write to Avro files
60
df.write
61
.format("avro")
62
.mode("overwrite")
63
.save("path/to/output")
64
65
// Convert binary Avro data in columns
66
val avroSchema = """{"type": "record", "name": "User", "fields": [
67
{"name": "name", "type": "string"},
68
{"name": "age", "type": "int"}
69
]}"""
70
71
val transformedDF = df.select(
72
from_avro(col("avroData"), avroSchema).as("userData"),
73
to_avro(col("structData")).as("binaryAvro")
74
)
75
```
76
77
## Architecture
78
79
Spark Avro is built around several key components:
80
81
- **Data Source Integration**: `AvroFileFormat` implements Spark's `FileFormat` and `DataSourceRegister` interfaces
82
- **Schema Conversion**: `SchemaConverters` provides bidirectional mapping between Avro and Spark SQL schemas
83
- **Serialization Layer**: `AvroSerializer` and `AvroDeserializer` handle data conversion between Catalyst and Avro formats
84
- **Column Functions**: `from_avro` and `to_avro` expressions for working with binary Avro data in DataFrame columns
85
- **Configuration**: `AvroOptions` provides comprehensive configuration for read/write operations
86
- **Output Management**: Custom output writers with metadata support for efficient Avro file generation
87
88
## Capabilities
89
90
### Column Functions
91
92
Core functions for working with Avro binary data in DataFrame columns, enabling conversion between Catalyst internal format and Avro binary format within SQL queries and DataFrame operations.
93
94
```scala { .api }
95
def from_avro(data: Column, jsonFormatSchema: String): Column
96
def to_avro(data: Column): Column
97
```
98
99
[Column Functions](./column-functions.md)
100
101
### Data Source Operations
102
103
Comprehensive file-based data source functionality for reading from and writing to Avro files using Spark's standard DataFrame read/write API with automatic schema inference and configurable options.
104
105
```scala { .api }
106
// Reading
107
spark.read.format("avro").option(key, value).load(path)
108
109
// Writing
110
df.write.format("avro").mode(saveMode).option(key, value).save(path)
111
```
112
113
[Data Source Operations](./data-source.md)
114
115
### Schema Conversion
116
117
Utilities for converting between Apache Avro schemas and Spark SQL schemas, supporting complex nested types, logical types, and schema evolution patterns.
118
119
```scala { .api }
120
object SchemaConverters {
121
def toSqlType(avroSchema: Schema): SchemaType
122
def toAvroType(catalystType: DataType, nullable: Boolean = false,
123
recordName: String = "topLevelRecord", nameSpace: String = ""): Schema
124
}
125
126
case class SchemaType(dataType: DataType, nullable: Boolean)
127
```
128
129
[Schema Conversion](./schema-conversion.md)
130
131
### Configuration Options
132
133
Comprehensive configuration system for customizing Avro read and write operations, including schema specification, compression settings, and file handling options.
134
135
```scala { .api }
136
class AvroOptions(parameters: Map[String, String], conf: Configuration) {
137
val schema: Option[String]
138
val recordName: String
139
val recordNamespace: String
140
val ignoreExtension: Boolean
141
val compression: String
142
}
143
```
144
145
[Configuration](./configuration.md)
146
147
## Types
148
149
```scala { .api }
150
// Core Spark SQL types
151
import org.apache.spark.sql.types._
152
import org.apache.spark.sql.Column
153
154
// Avro schema types
155
import org.apache.avro.Schema
156
157
// Configuration and options
158
import org.apache.hadoop.conf.Configuration
159
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
160
161
// Exception types
162
case class IncompatibleSchemaException(msg: String) extends Exception(msg)
163
```