Avro data source for Apache Spark SQL that provides functionality to read from and write to Avro files with DataFrames and Datasets
npx @tessl/cli install tessl/maven-org-apache-spark--spark-avro_2-13@4.0.00
# Apache Spark Avro
1
2
Apache Spark Avro provides comprehensive support for reading from and writing to Apache Avro files in Spark SQL. It enables seamless conversion between Avro binary format and Spark DataFrames/Datasets, with built-in functions for data transformation and robust schema handling.
3
4
## Package Information
5
6
- **Package Name**: spark-avro_2.13
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Installation**: Add to your Maven POM or SBT build file:
10
11
**Maven:**
12
```xml
13
<dependency>
14
<groupId>org.apache.spark</groupId>
15
<artifactId>spark-avro_2.13</artifactId>
16
<version>4.0.0</version>
17
</dependency>
18
```
19
20
**SBT:**
21
```scala
22
libraryDependencies += "org.apache.spark" %% "spark-avro" % "4.0.0"
23
```
24
25
## Core Imports
26
27
```scala
28
import org.apache.spark.sql.avro.functions._
29
import org.apache.spark.sql.functions.col
30
```
31
32
For deprecated functions (not recommended):
33
```scala
34
import org.apache.spark.sql.avro.{from_avro, to_avro}
35
```
36
37
Schema conversion utilities:
38
```scala
39
import org.apache.spark.sql.avro.SchemaConverters
40
```
41
42
Python API:
43
```python
44
from pyspark.sql.avro.functions import from_avro, to_avro
45
```
46
47
## Basic Usage
48
49
```scala
50
import org.apache.spark.sql.SparkSession
51
import org.apache.spark.sql.avro.functions._
52
53
val spark = SparkSession.builder()
54
.appName("AvroExample")
55
.getOrCreate()
56
57
// Reading Avro files
58
val df = spark.read.format("avro").load("path/to/avro/files")
59
60
// Writing Avro files
61
df.write.format("avro").save("path/to/output")
62
63
// Converting binary Avro data to DataFrame columns
64
val avroSchema = """{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}"""
65
val decodedDf = df.select(from_avro(col("avro_data"), avroSchema).as("user"))
66
67
// Converting DataFrame columns to binary Avro
68
val encodedDf = df.select(to_avro(col("user_struct")).as("avro_data"))
69
70
// Getting schema information
71
val schemaDf = spark.sql("SELECT schema_of_avro('"+avroSchema+"') AS avro_schema")
72
```
73
74
**Python Example:**
75
```python
76
from pyspark.sql import SparkSession
77
from pyspark.sql.avro.functions import from_avro, to_avro
78
from pyspark.sql.functions import col
79
80
spark = SparkSession.builder.appName("AvroExample").getOrCreate()
81
82
# Reading Avro files
83
df = spark.read.format("avro").load("path/to/avro/files")
84
85
# Writing Avro files
86
df.write.format("avro").save("path/to/output")
87
88
# Converting binary Avro data to DataFrame columns
89
avro_schema = '{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}'
90
decoded_df = df.select(from_avro(col("avro_data"), avro_schema).alias("user"))
91
92
# Converting DataFrame columns to binary Avro
93
encoded_df = df.select(to_avro(col("user_struct")).alias("avro_data"))
94
```
95
96
## Architecture
97
98
Apache Spark Avro is built around several key components:
99
100
- **Function API**: High-level functions (`from_avro`, `to_avro`, `schema_of_avro`) for data conversion and schema operations
101
- **DataSource V2**: Native Spark integration for reading/writing Avro files with predicate pushdown and column pruning
102
- **Schema Conversion**: Bidirectional conversion between Avro schemas and Spark SQL data types
103
- **Expression System**: Internal Catalyst expressions that power the public functions
104
- **Compression Support**: Multiple compression codecs (Snappy, Deflate, XZ, ZStandard, etc.)
105
- **Configuration Options**: Extensive options for controlling parsing behavior, schema handling, and optimization
106
107
## Capabilities
108
109
### Avro Functions
110
111
Core functions for converting between binary Avro data and Spark SQL columns. These functions handle schema-based serialization and deserialization with comprehensive type mapping.
112
113
```scala { .api }
114
def from_avro(data: Column, jsonFormatSchema: String): Column
115
def from_avro(data: Column, jsonFormatSchema: String, options: java.util.Map[String, String]): Column
116
def to_avro(data: Column): Column
117
def to_avro(data: Column, jsonFormatSchema: String): Column
118
def schema_of_avro(jsonFormatSchema: String): Column
119
def schema_of_avro(jsonFormatSchema: String, options: java.util.Map[String, String]): Column
120
```
121
122
```python { .api }
123
def from_avro(data: ColumnOrName, jsonFormatSchema: str, options: Optional[Dict[str, str]] = None) -> Column
124
def to_avro(data: ColumnOrName, jsonFormatSchema: str = "") -> Column
125
```
126
127
[Avro Functions](./functions.md)
128
129
### File DataSource
130
131
Native Spark DataSource V2 implementation for reading and writing Avro files with optimized performance and advanced features like schema inference and predicate pushdown.
132
133
```scala { .api }
134
// Reading
135
spark.read.format("avro").load(path)
136
spark.read.format("avro").option("avroSchema", schema).load(path)
137
138
// Writing
139
df.write.format("avro").save(path)
140
df.write.format("avro").option("compression", "snappy").save(path)
141
```
142
143
[File DataSource](./datasource.md)
144
145
### Schema Conversion
146
147
Developer API for converting between Avro schemas and Spark SQL data types, supporting complex nested structures and logical types.
148
149
```scala { .api }
150
object SchemaConverters {
151
def toSqlType(avroSchema: org.apache.avro.Schema): SchemaType
152
def toSqlType(avroSchema: org.apache.avro.Schema, useStableIdForUnionType: Boolean,
153
stableIdPrefixForUnionType: String, recursiveFieldMaxDepth: Int): SchemaType
154
def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): org.apache.avro.Schema
155
}
156
157
case class SchemaType(dataType: DataType, nullable: Boolean)
158
```
159
160
[Schema Conversion](./schema-conversion.md)
161
162
### Configuration Options
163
164
Comprehensive configuration options for controlling Avro processing behavior, including compression, schema handling, and parsing modes.
165
166
```scala { .api }
167
// Common options
168
Map(
169
"compression" -> "snappy",
170
"avroSchema" -> jsonSchema,
171
"mode" -> "PERMISSIVE",
172
"ignoreExtension" -> "true"
173
)
174
```
175
176
[Configuration Options](./configuration.md)
177
178
## Types
179
180
```scala { .api }
181
// Schema conversion result
182
case class SchemaType(dataType: DataType, nullable: Boolean)
183
```
184
185
```java { .api }
186
// Compression codec enumeration
187
public enum AvroCompressionCodec {
188
UNCOMPRESSED("null", false),
189
DEFLATE("deflate", true),
190
SNAPPY("snappy", false),
191
BZIP2("bzip2", false),
192
XZ("xz", true),
193
ZSTANDARD("zstandard", true);
194
195
public String getCodecName();
196
public boolean getSupportCompressionLevel();
197
public String lowerCaseName();
198
public static AvroCompressionCodec fromString(String s);
199
}
200
```