0
# File Operations
1
2
The Spark Avro connector provides DataSource API integration for reading and writing Avro files directly through Spark SQL with automatic schema inference, partitioning support, and optimized I/O performance.
3
4
## Reading Avro Files
5
6
### Basic File Reading
7
8
```scala { .api }
9
spark.read.format("avro").load(path: String): DataFrame
10
spark.read.format("avro").load(paths: String*): DataFrame
11
```
12
13
**Basic Usage:**
14
15
```scala
16
import org.apache.spark.sql.SparkSession
17
18
val spark = SparkSession.builder()
19
.appName("AvroReader")
20
.getOrCreate()
21
22
// Read single file or directory
23
val df = spark.read.format("avro").load("path/to/file.avro")
24
25
// Read multiple paths
26
val df2 = spark.read.format("avro").load(
27
"path/to/file1.avro",
28
"path/to/file2.avro",
29
"path/to/directory"
30
)
31
```
32
33
### Schema Specification
34
35
```scala { .api }
36
spark.read.format("avro").schema(schema: StructType): DataFrameReader
37
```
38
39
**Usage with Custom Schema:**
40
41
```scala
42
import org.apache.spark.sql.types._
43
44
val customSchema = StructType(Seq(
45
StructField("id", LongType, nullable = false),
46
StructField("name", StringType, nullable = true),
47
StructField("email", StringType, nullable = true)
48
))
49
50
val df = spark.read
51
.format("avro")
52
.schema(customSchema)
53
.load("path/to/avro/files")
54
```
55
56
### Reading Options
57
58
```scala { .api }
59
spark.read.format("avro").option(key: String, value: String): DataFrameReader
60
spark.read.format("avro").options(options: Map[String, String]): DataFrameReader
61
```
62
63
**Available Options:**
64
65
- `avroSchema`: Specify evolved Avro schema for reading
66
- `avroSchemaUrl`: URL to load Avro schema from
67
- `mode`: Parse mode for handling corrupt records (`PERMISSIVE`, `DROPMALFORMED`, `FAILFAST`)
68
- `positionalFieldMatching`: Match fields by position instead of name
69
- `datetimeRebaseMode`: Rebase DATE/TIMESTAMP values (`EXCEPTION`, `LEGACY`, `CORRECTED`)
70
71
**Usage Example:**
72
73
```scala
74
val df = spark.read
75
.format("avro")
76
.option("mode", "DROPMALFORMED")
77
.option("avroSchema", evolvedSchemaJson)
78
.option("positionalFieldMatching", "true")
79
.load("path/to/avro/files")
80
```
81
82
## Writing Avro Files
83
84
### Basic File Writing
85
86
```scala { .api }
87
DataFrame.write.format("avro").save(path: String): Unit
88
DataFrame.write.format("avro").save(): DataFrameWriter[Row]
89
```
90
91
**Basic Usage:**
92
93
```scala
94
// Write DataFrame to Avro files
95
df.write
96
.format("avro")
97
.save("path/to/output")
98
99
// Write with mode specification
100
df.write
101
.format("avro")
102
.mode("overwrite")
103
.save("path/to/output")
104
```
105
106
### Write Modes
107
108
```scala { .api }
109
DataFrame.write.format("avro").mode(saveMode: String): DataFrameWriter[Row]
110
DataFrame.write.format("avro").mode(saveMode: SaveMode): DataFrameWriter[Row]
111
```
112
113
Supported modes:
114
- `overwrite`: Overwrite existing data
115
- `append`: Append to existing data
116
- `ignore`: Ignore if data exists
117
- `error` (default): Throw error if data exists
118
119
### Partitioning
120
121
```scala { .api }
122
DataFrame.write.format("avro").partitionBy(colNames: String*): DataFrameWriter[Row]
123
```
124
125
**Usage Example:**
126
127
```scala
128
df.write
129
.format("avro")
130
.partitionBy("year", "month")
131
.save("path/to/partitioned/output")
132
```
133
134
### Writing Options
135
136
```scala { .api }
137
DataFrame.write.format("avro").option(key: String, value: String): DataFrameWriter[Row]
138
DataFrame.write.format("avro").options(options: Map[String, String]): DataFrameWriter[Row]
139
```
140
141
**Available Options:**
142
143
- `compression`: Compression codec (`uncompressed`, `snappy`, `deflate`, `bzip2`, `xz`, `zstandard`)
144
- `recordName`: Top-level record name (default: "topLevelRecord")
145
- `recordNamespace`: Record namespace (default: "")
146
- `avroSchema`: Custom output Avro schema
147
148
**Usage Example:**
149
150
```scala
151
df.write
152
.format("avro")
153
.option("compression", "snappy")
154
.option("recordName", "UserRecord")
155
.option("recordNamespace", "com.example.avro")
156
.save("path/to/compressed/output")
157
```
158
159
## Advanced Features
160
161
### Schema Evolution
162
163
Reading with evolved schemas allows you to process Avro files with compatible but different schemas:
164
165
```scala
166
val evolvedSchema = """
167
{
168
"type": "record",
169
"name": "User",
170
"fields": [
171
{"name": "id", "type": "long"},
172
{"name": "name", "type": "string"},
173
{"name": "email", "type": "string"},
174
{"name": "created_at", "type": "long", "default": 0}
175
]
176
}
177
"""
178
179
val df = spark.read
180
.format("avro")
181
.option("avroSchema", evolvedSchema)
182
.load("path/to/older/avro/files")
183
```
184
185
### Predicate Pushdown
186
187
The connector supports predicate pushdown for efficient querying:
188
189
```scala
190
val filteredDF = spark.read
191
.format("avro")
192
.load("path/to/large/dataset")
193
.filter($"created_date" >= "2023-01-01")
194
.filter($"status" === "active")
195
```
196
197
### Compression Support
198
199
Automatic compression detection on read and configurable compression on write:
200
201
```scala
202
// Writing with different compression codecs
203
df.write.format("avro").option("compression", "snappy").save("snappy-output")
204
df.write.format("avro").option("compression", "deflate").save("deflate-output")
205
df.write.format("avro").option("compression", "bzip2").save("bzip2-output")
206
```
207
208
## Error Handling
209
210
### Corrupt Record Handling
211
212
```scala
213
// Drop corrupt records
214
val cleanDF = spark.read
215
.format("avro")
216
.option("mode", "DROPMALFORMED")
217
.load("path/to/files")
218
219
// Collect corrupt records in a column
220
val dfWithCorrupt = spark.read
221
.format("avro")
222
.option("mode", "PERMISSIVE")
223
.option("columnNameOfCorruptRecord", "_corrupt_record")
224
.load("path/to/files")
225
```
226
227
### File Extension Handling
228
229
```scala
230
// Include files without .avro extension
231
val df = spark.read
232
.format("avro")
233
.option("ignoreExtension", "true") // deprecated - use pathGlobFilter instead
234
.load("path/to/mixed/files")
235
236
// Modern approach using pathGlobFilter
237
val df2 = spark.read
238
.format("avro")
239
.option("pathGlobFilter", "*.data")
240
.load("path/to/files")
241
```
242
243
## DataSource V2 Integration
244
245
The connector also provides DataSource V2 implementation for enhanced performance:
246
247
```scala { .api }
248
class AvroDataSourceV2 extends FileDataSourceV2 {
249
def shortName(): String // Returns "avro"
250
def getTable(options: CaseInsensitiveStringMap): Table
251
def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table
252
}
253
```
254
255
The DataSource V2 API provides the same functionality with improved performance characteristics and better integration with Spark's Catalyst optimizer.
256
257
### Supporting V2 Classes
258
259
```scala { .api }
260
class AvroTable extends FileTable
261
class AvroScan extends FileScan
262
class AvroScanBuilder extends FileScanBuilder
263
class AvroPartitionReaderFactory extends FilePartitionReaderFactory
264
class AvroWrite extends FileWrite
265
```
266
267
These classes work together to provide the complete DataSource V2 implementation, handling table metadata, scan planning, partition reading, and write operations.