0
# Avro Functions
1
2
The Avro functions provide high-level operations for converting between binary Avro data and Spark SQL columns. These functions handle schema-based serialization and deserialization with comprehensive type mapping and error handling.
3
4
Available in both Scala and Python APIs with equivalent functionality.
5
6
## Core Functions
7
8
### from_avro
9
10
Converts binary Avro data to Catalyst rows using a specified Avro schema.
11
12
```scala { .api }
13
def from_avro(data: Column, jsonFormatSchema: String): Column
14
```
15
16
```python { .api }
17
def from_avro(data: ColumnOrName, jsonFormatSchema: str, options: Optional[Dict[str, str]] = None) -> Column
18
```
19
20
**Parameters:**
21
- `data`: Column containing binary Avro data
22
- `jsonFormatSchema`: Avro schema in JSON string format
23
- `options` (Python only): Optional dictionary of parsing options
24
25
**Returns:** Column with deserialized Catalyst data matching the Avro schema
26
27
**Usage Example:**
28
29
Scala:
30
```scala
31
import org.apache.spark.sql.avro.functions._
32
import org.apache.spark.sql.functions.col
33
34
val avroSchema = """{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}"""
35
val df = spark.read.format("avro").load("users.avro")
36
val decodedDf = df.select(from_avro(col("binary_data"), avroSchema).as("user"))
37
```
38
39
Python:
40
```python
41
from pyspark.sql.avro.functions import from_avro
42
from pyspark.sql.functions import col
43
44
avro_schema = '{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}'
45
df = spark.read.format("avro").load("users.avro")
46
decoded_df = df.select(from_avro(col("binary_data"), avro_schema).alias("user"))
47
```
48
49
### from_avro (with options)
50
51
Converts binary Avro data to Catalyst rows with additional parsing options.
52
53
```scala { .api }
54
def from_avro(data: Column, jsonFormatSchema: String, options: java.util.Map[String, String]): Column
55
```
56
57
**Parameters:**
58
- `data`: Column containing binary Avro data
59
- `jsonFormatSchema`: Avro schema in JSON string format
60
- `options`: Map of parsing options (mode, dateTimeRebaseMode, etc.)
61
62
**Returns:** Column with deserialized Catalyst data
63
64
**Usage Example:**
65
```scala
66
import java.util.{Map => JMap}
67
import scala.jdk.CollectionConverters._
68
69
val options: JMap[String, String] = Map(
70
"mode" -> "PERMISSIVE",
71
"dateTimeRebaseMode" -> "CORRECTED"
72
).asJava
73
74
val decodedDf = df.select(from_avro(col("binary_data"), avroSchema, options).as("user"))
75
```
76
77
### to_avro
78
79
Converts Catalyst data to binary Avro format using the data's inferred schema.
80
81
```scala { .api }
82
def to_avro(data: Column): Column
83
```
84
85
```python { .api }
86
def to_avro(data: ColumnOrName, jsonFormatSchema: str = "") -> Column
87
```
88
89
**Parameters:**
90
- `data`: Column containing Catalyst data to serialize
91
- `jsonFormatSchema` (Python): Optional output Avro schema in JSON string format
92
93
**Returns:** Column with binary Avro data
94
95
**Usage Example:**
96
97
Scala:
98
```scala
99
val encodedDf = df.select(to_avro(col("user_struct")).as("avro_data"))
100
```
101
102
Python:
103
```python
104
from pyspark.sql.avro.functions import to_avro
105
from pyspark.sql.functions import col
106
107
encoded_df = df.select(to_avro(col("user_struct")).alias("avro_data"))
108
```
109
110
### to_avro (with schema)
111
112
Converts Catalyst data to binary Avro format using a specified output schema.
113
114
```scala { .api }
115
def to_avro(data: Column, jsonFormatSchema: String): Column
116
```
117
118
**Parameters:**
119
- `data`: Column containing Catalyst data to serialize
120
- `jsonFormatSchema`: Target Avro schema in JSON string format
121
122
**Returns:** Column with binary Avro data conforming to the specified schema
123
124
**Usage Example:**
125
```scala
126
val outputSchema = """{"type":"record","name":"UserOutput","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}"""
127
val encodedDf = df.select(to_avro(col("user_struct"), outputSchema).as("avro_data"))
128
```
129
130
### schema_of_avro
131
132
Returns the DDL-formatted Spark SQL schema corresponding to an Avro schema.
133
134
```scala { .api }
135
def schema_of_avro(jsonFormatSchema: String): Column
136
```
137
138
**Parameters:**
139
- `jsonFormatSchema`: Avro schema in JSON string format
140
141
**Returns:** Column containing the DDL schema string
142
143
**Usage Example:**
144
```scala
145
val schemaDf = spark.sql("SELECT schema_of_avro('"+avroSchema+"') AS spark_schema")
146
schemaDf.show(false)
147
// +--------------------------------------------------+
148
// |spark_schema |
149
// +--------------------------------------------------+
150
// |struct<name:string,age:int> |
151
// +--------------------------------------------------+
152
```
153
154
### schema_of_avro (with options)
155
156
Returns the DDL-formatted Spark SQL schema with additional parsing options.
157
158
```scala { .api }
159
def schema_of_avro(jsonFormatSchema: String, options: java.util.Map[String, String]): Column
160
```
161
162
**Parameters:**
163
- `jsonFormatSchema`: Avro schema in JSON string format
164
- `options`: Map of schema conversion options
165
166
**Returns:** Column containing the DDL schema string
167
168
**Usage Example:**
169
```scala
170
val options: JMap[String, String] = Map(
171
"enableStableIdentifiersForUnionType" -> "true",
172
"recursiveFieldMaxDepth" -> "10"
173
).asJava
174
175
val schemaDf = spark.sql("SELECT schema_of_avro('"+avroSchema+"', map('enableStableIdentifiersForUnionType', 'true')) AS spark_schema")
176
```
177
178
**Note:** `schema_of_avro` functions are only available in Scala API. Python users should use Spark SQL to access these functions:
179
180
```python
181
# Python - use SQL to access schema_of_avro
182
schema_df = spark.sql(f"SELECT schema_of_avro('{avro_schema}') AS spark_schema")
183
```
184
185
## Deprecated Functions
186
187
### Legacy Package Functions
188
189
The following functions are deprecated and maintained for backward compatibility. Use the functions from `org.apache.spark.sql.avro.functions` instead.
190
191
```scala { .api }
192
// DEPRECATED: Use org.apache.spark.sql.avro.functions.from_avro instead
193
@deprecated("Please use 'org.apache.spark.sql.avro.functions.from_avro' instead.", "3.0.0")
194
def from_avro(data: Column, jsonFormatSchema: String): Column
195
196
// DEPRECATED: Use org.apache.spark.sql.avro.functions.to_avro instead
197
@deprecated("Please use 'org.apache.spark.sql.avro.functions.to_avro' instead.", "3.0.0")
198
def to_avro(data: Column): Column
199
```
200
201
## Error Handling
202
203
All functions support different error handling modes through the `mode` option:
204
205
- **PERMISSIVE** (default): Sets corrupt records to null and continues processing
206
- **DROPMALFORMED**: Ignores corrupt records completely
207
- **FAILFAST**: Throws exception on first corrupt record
208
209
```scala
210
val options: JMap[String, String] = Map("mode" -> "FAILFAST").asJava
211
val strictDf = df.select(from_avro(col("binary_data"), avroSchema, options).as("user"))
212
```
213
214
## Type Mapping
215
216
The functions automatically handle conversion between Avro types and Spark SQL types:
217
218
- **Avro primitive types** → Spark SQL primitive types (string, int, long, float, double, boolean, bytes)
219
- **Avro records** → Spark SQL structs
220
- **Avro arrays** → Spark SQL arrays
221
- **Avro maps** → Spark SQL maps
222
- **Avro unions** → Spark SQL structs with nullable fields
223
- **Avro enums** → Spark SQL strings
224
- **Avro logical types** → Appropriate Spark SQL types (timestamps, dates, decimals)
225
226
## Exception Handling
227
228
The Avro functions may throw specific exceptions for schema and type incompatibilities:
229
230
### Exception Classes
231
232
```scala { .api }
233
// Schema incompatibility errors (internal)
234
class IncompatibleSchemaException extends RuntimeException
235
236
// Unsupported Avro type errors (internal)
237
class UnsupportedAvroTypeException extends RuntimeException
238
```
239
240
**Common Error Scenarios:**
241
- **IncompatibleSchemaException**: Thrown when provided schema is incompatible with data
242
- **UnsupportedAvroTypeException**: Thrown when encountering unsupported Avro types
243
- **AnalysisException**: Thrown for invalid schema JSON or malformed parameters
244
- **SparkException**: Thrown for runtime errors during processing
245
246
**Error Handling Best Practices:**
247
```scala
248
import org.apache.spark.sql.AnalysisException
249
import org.apache.spark.SparkException
250
251
try {
252
val result = df.select(from_avro(col("data"), invalidSchema).as("parsed"))
253
result.collect()
254
} catch {
255
case e: AnalysisException =>
256
println(s"Schema validation failed: ${e.getMessage}")
257
case e: SparkException =>
258
println(s"Runtime processing error: ${e.getMessage}")
259
case e: Exception =>
260
println(s"Unexpected error: ${e.getMessage}")
261
}
262
```