0
# File DataSource
1
2
The Avro DataSource provides native Spark integration for reading and writing Avro files through the DataSource V2 API. It offers optimized performance with features like predicate pushdown, column pruning, and automatic schema inference.
3
4
## Reading Avro Files
5
6
### Basic Reading
7
8
```scala { .api }
9
val df = spark.read.format("avro").load(path: String)
10
val df = spark.read.format("avro").load(paths: String*)
11
```
12
13
**Usage Example:**
14
```scala
15
// Read single file
16
val df = spark.read.format("avro").load("path/to/file.avro")
17
18
// Read multiple files
19
val df = spark.read.format("avro").load("file1.avro", "file2.avro")
20
21
// Read directory of Avro files
22
val df = spark.read.format("avro").load("path/to/avro/directory")
23
```
24
25
### Reading with Schema
26
27
Specify a schema for consistent structure across files:
28
29
```scala { .api }
30
val df = spark.read.format("avro")
31
.option("avroSchema", jsonSchema)
32
.load(path)
33
```
34
35
**Usage Example:**
36
```scala
37
val userSchema = """{
38
"type": "record",
39
"name": "User",
40
"fields": [
41
{"name": "id", "type": "long"},
42
{"name": "name", "type": "string"},
43
{"name": "email", "type": ["null", "string"], "default": null}
44
]
45
}"""
46
47
val df = spark.read.format("avro")
48
.option("avroSchema", userSchema)
49
.load("users/*.avro")
50
```
51
52
### Reading with Options
53
54
```scala { .api }
55
val df = spark.read.format("avro")
56
.option("ignoreExtension", "true")
57
.option("mode", "PERMISSIVE")
58
.load(path)
59
```
60
61
**Common Read Options:**
62
- `ignoreExtension`: Read files regardless of extension (default: false)
63
- `mode`: Error handling mode (PERMISSIVE, DROPMALFORMED, FAILFAST)
64
- `avroSchema`: Override schema for reading
65
- `recursiveFieldMaxDepth`: Maximum recursion depth for nested fields
66
67
## Writing Avro Files
68
69
### Basic Writing
70
71
```scala { .api }
72
df.write.format("avro").save(path: String)
73
```
74
75
**Usage Example:**
76
```scala
77
val df = spark.range(1000).select($"id", ($"id" * 2).as("value"))
78
df.write.format("avro").save("output/numbers.avro")
79
```
80
81
### Writing with Compression
82
83
```scala { .api }
84
df.write.format("avro")
85
.option("compression", codec)
86
.save(path)
87
```
88
89
**Available Compression Codecs:**
90
- `uncompressed` (default)
91
- `snappy`
92
- `deflate`
93
- `bzip2`
94
- `xz`
95
- `zstandard`
96
97
**Usage Example:**
98
```scala
99
df.write.format("avro")
100
.option("compression", "snappy")
101
.save("output/compressed.avro")
102
```
103
104
### Writing with Custom Schema
105
106
```scala { .api }
107
df.write.format("avro")
108
.option("avroSchema", jsonSchema)
109
.save(path)
110
```
111
112
**Usage Example:**
113
```scala
114
val outputSchema = """{
115
"type": "record",
116
"name": "Output",
117
"namespace": "com.example",
118
"fields": [
119
{"name": "id", "type": "long"},
120
{"name": "value", "type": "double"}
121
]
122
}"""
123
124
df.write.format("avro")
125
.option("avroSchema", outputSchema)
126
.save("output/custom-schema.avro")
127
```
128
129
### Partitioned Writing
130
131
```scala { .api }
132
df.write.format("avro")
133
.partitionBy(columns: String*)
134
.save(path)
135
```
136
137
**Usage Example:**
138
```scala
139
val salesDf = spark.read.format("avro").load("sales.avro")
140
salesDf.write.format("avro")
141
.partitionBy("year", "month")
142
.option("compression", "snappy")
143
.save("partitioned-sales")
144
```
145
146
## Advanced Features
147
148
### Schema Evolution
149
150
The DataSource supports schema evolution when reading multiple files:
151
152
```scala
153
// Reads files with different but compatible schemas
154
val df = spark.read.format("avro")
155
.option("mode", "PERMISSIVE")
156
.load("evolving-schema/*.avro")
157
```
158
159
### Predicate Pushdown
160
161
Automatically pushes down filter predicates to reduce I/O:
162
163
```scala
164
val filteredDf = spark.read.format("avro")
165
.load("large-dataset.avro")
166
.filter($"age" > 21) // Filter pushed down to file scanning
167
```
168
169
### Column Pruning
170
171
Only reads required columns from files:
172
173
```scala
174
val projectedDf = spark.read.format("avro")
175
.load("users.avro")
176
.select("name", "email") // Only reads name and email columns
177
```
178
179
## SQL Integration
180
181
Use Avro files directly in SQL queries:
182
183
```scala
184
// Create temporary view
185
spark.read.format("avro").load("users.avro").createOrReplaceTempView("users")
186
187
// Query with SQL
188
spark.sql("SELECT name, age FROM users WHERE age > 18")
189
190
// Create external table
191
spark.sql("""
192
CREATE TABLE user_data
193
USING avro
194
LOCATION 'path/to/avro/files'
195
""")
196
```
197
198
## DataSource Configuration
199
200
### Write Options
201
202
```scala { .api }
203
Map(
204
"compression" -> "snappy", // Compression codec
205
"avroSchema" -> jsonSchema, // Custom output schema
206
"recordName" -> "MyRecord", // Record name for schema generation
207
"recordNamespace" -> "com.example" // Namespace for generated schema
208
)
209
```
210
211
### Read Options
212
213
```scala { .api }
214
Map(
215
"ignoreExtension" -> "true", // Ignore file extensions
216
"mode" -> "PERMISSIVE", // Error handling mode
217
"avroSchema" -> jsonSchema, // Override input schema
218
"recursiveFieldMaxDepth" -> "5" // Max recursion depth
219
)
220
```
221
222
## Performance Considerations
223
224
### Optimal File Sizes
225
226
- Target 128MB-1GB per file for best performance
227
- Use partitioning for large datasets
228
- Enable compression for storage efficiency
229
230
### Schema Inference
231
232
```scala
233
// Expensive - scans all files
234
val df1 = spark.read.format("avro").load("many-files/*.avro")
235
236
// Efficient - uses provided schema
237
val df2 = spark.read.format("avro")
238
.option("avroSchema", knownSchema)
239
.load("many-files/*.avro")
240
```
241
242
### Compression Trade-offs
243
244
- **Snappy**: Fast compression/decompression, moderate compression ratio
245
- **Deflate**: Better compression ratio, slower than Snappy
246
- **ZStandard**: Best compression ratio, good performance
247
- **Uncompressed**: Fastest I/O, largest storage requirements