0
# File Operations
1
2
This document covers reading and writing Avro files using the Spark Avro data source.
3
4
## Reading Avro Files
5
6
### Basic Reading
7
8
```scala
9
val df = spark.read
10
.format("avro")
11
.load("path/to/avro/files")
12
```
13
14
### Reading with Schema Evolution
15
16
```scala
17
val evolvedSchema = """
18
{
19
"type": "record",
20
"name": "User",
21
"fields": [
22
{"name": "id", "type": "long"},
23
{"name": "name", "type": "string"},
24
{"name": "email", "type": ["null", "string"], "default": null}
25
]
26
}
27
"""
28
29
val df = spark.read
30
.format("avro")
31
.option("avroSchema", evolvedSchema)
32
.load("path/to/files")
33
```
34
35
### Reading Options
36
37
```scala
38
val df = spark.read
39
.format("avro")
40
.option("mode", "PERMISSIVE") // FAILFAST, PERMISSIVE, DROPMALFORMED
41
.option("ignoreCorruptFiles", "true")
42
.option("recursiveFileLookup", "true")
43
.load("path/to/files")
44
```
45
46
## Writing Avro Files
47
48
### Basic Writing
49
50
```scala { .api }
51
def write: DataFrameWriter[Row]
52
def format(source: String): DataFrameWriter[Row]
53
def option(key: String, value: String): DataFrameWriter[Row]
54
def save(path: String): Unit
55
```
56
57
```scala
58
df.write
59
.format("avro")
60
.save("path/to/output")
61
```
62
63
### Writing with Custom Schema
64
65
```scala
66
val outputSchema = """
67
{
68
"type": "record",
69
"name": "OutputRecord",
70
"namespace": "com.example",
71
"fields": [
72
{"name": "id", "type": "long"},
73
{"name": "data", "type": "string"}
74
]
75
}
76
"""
77
78
df.write
79
.format("avro")
80
.option("avroSchema", outputSchema)
81
.save("path/to/output")
82
```
83
84
### Writing with Compression
85
86
```scala
87
df.write
88
.format("avro")
89
.option("compression", "snappy") // snappy, deflate, bzip2, xz, zstandard, uncompressed
90
.save("path/to/output")
91
```
92
93
### Writing with Custom Record Naming
94
95
```scala
96
df.write
97
.format("avro")
98
.option("recordName", "MyRecord")
99
.option("recordNamespace", "com.example.data")
100
.save("path/to/output")
101
```
102
103
## Configuration Options
104
105
### Read Options
106
107
```scala { .api }
108
// Schema-related options
109
option("avroSchema", "JSON schema string") // Custom schema for reading
110
option("avroSchemaUrl", "path/to/schema.avsc") // Schema file location
111
112
// Parse mode options
113
option("mode", "FAILFAST|PERMISSIVE|DROPMALFORMED") // Error handling mode
114
115
// Field matching options
116
option("positionalFieldMatching", "true|false") // Match by position vs name
117
118
// DateTime handling
119
option("datetimeRebaseMode", "EXCEPTION|LEGACY|CORRECTED") // Calendar rebase mode
120
121
// Union type handling
122
option("enableStableIdentifiersForUnionType", "true|false") // Stable union field names
123
124
// File handling (deprecated)
125
option("ignoreExtension", "true|false") // Ignore .avro extension requirement
126
```
127
128
### Write Options
129
130
```scala { .api }
131
// Schema options
132
option("avroSchema", "JSON schema string") // Output schema specification
133
option("recordName", "string") // Top-level record name (default: "topLevelRecord")
134
option("recordNamespace", "string") // Record namespace (default: "")
135
136
// Compression options
137
option("compression", "codec") // snappy, deflate, bzip2, xz, zstandard, uncompressed
138
139
// Field matching
140
option("positionalFieldMatching", "true|false") // Position-based field matching
141
```
142
143
## Schema Inference
144
145
The Avro data source automatically infers schema from Avro file headers:
146
147
```scala
148
val inferredSchema = spark.read
149
.format("avro")
150
.load("path/to/files")
151
.schema
152
153
println(inferredSchema.treeString)
154
```
155
156
### Schema Evolution Support
157
158
```scala
159
// Original schema: {id: long, name: string}
160
// Evolved schema adds optional email field
161
val evolvedDF = spark.read
162
.format("avro")
163
.option("avroSchema", evolvedSchemaJson)
164
.load("original-files")
165
166
// New field will have null values for old records
167
evolvedDF.select("id", "name", "email").show()
168
```
169
170
## Performance Considerations
171
172
### Partitioning
173
```scala
174
df.write
175
.format("avro")
176
.partitionBy("year", "month")
177
.save("partitioned-output")
178
```
179
180
### Compression Performance
181
```scala
182
// Snappy: fast compression/decompression, moderate compression ratio
183
df.write.format("avro").option("compression", "snappy").save("output")
184
185
// Deflate: slower but better compression ratio
186
df.write.format("avro").option("compression", "deflate").save("output")
187
188
// ZSTD: best compression ratio, good performance
189
df.write.format("avro").option("compression", "zstandard").save("output")
190
```
191
192
### Splittable Files
193
Avro files are splittable by default, enabling parallel processing:
194
195
```scala
196
val parallelDF = spark.read
197
.format("avro")
198
.load("large-avro-files/*") // Automatically splits across partitions
199
```
200
201
## Error Handling
202
203
### Parse Modes
204
```scala
205
// FAILFAST: Throw exception on any parsing error
206
val strictDF = spark.read
207
.format("avro")
208
.option("mode", "FAILFAST")
209
.load("path/to/files")
210
211
// PERMISSIVE: Set malformed records to null, continue processing
212
val permissiveDF = spark.read
213
.format("avro")
214
.option("mode", "PERMISSIVE")
215
.load("path/to/files")
216
217
// DROPMALFORMED: Skip malformed records entirely
218
val droppedDF = spark.read
219
.format("avro")
220
.option("mode", "DROPMALFORMED")
221
.load("path/to/files")
222
```
223
224
### Corrupt File Handling
225
```scala
226
val df = spark.read
227
.format("avro")
228
.option("ignoreCorruptFiles", "true") // Skip corrupt files
229
.load("path/to/files")
230
```
231
232
## SQL Integration
233
234
### Creating Tables
235
```sql
236
-- Create table using Avro data source
237
CREATE TABLE avro_table
238
USING avro
239
OPTIONS (path "path/to/avro/files")
240
241
-- Create table with custom schema
242
CREATE TABLE custom_avro_table
243
USING avro
244
OPTIONS (
245
path "path/to/files",
246
avroSchema '{
247
"type": "record",
248
"name": "Record",
249
"fields": [
250
{"name": "id", "type": "long"},
251
{"name": "value", "type": "string"}
252
]
253
}'
254
)
255
```
256
257
### Querying
258
```sql
259
SELECT * FROM avro_table WHERE id > 100;
260
261
INSERT INTO avro_table VALUES (1, 'example');
262
```