0
# Data Source Operations
1
2
Comprehensive file-based data source functionality for reading from and writing to Avro files using Spark's standard DataFrame read/write API. Supports automatic schema inference, configurable compression, and efficient distributed processing of Avro data.
3
4
## Capabilities
5
6
### Reading Avro Files
7
8
Read Avro files using Spark's DataFrameReader with automatic schema inference and configurable options.
9
10
```scala { .api }
11
// Basic read operations
12
spark.read.format("avro").load(path: String): DataFrame
13
spark.read.format("avro").load(paths: String*): DataFrame
14
15
// With options
16
spark.read.format("avro")
17
.option(key: String, value: String)
18
.load(path: String): DataFrame
19
```
20
21
**Usage Examples:**
22
23
```scala
24
import org.apache.spark.sql.SparkSession
25
26
val spark = SparkSession.builder()
27
.appName("Avro Reader Example")
28
.getOrCreate()
29
30
// Read single file
31
val df1 = spark.read
32
.format("avro")
33
.load("path/to/data.avro")
34
35
// Read multiple files
36
val df2 = spark.read
37
.format("avro")
38
.load("path/to/dir1", "path/to/dir2")
39
40
// Read with glob pattern
41
val df3 = spark.read
42
.format("avro")
43
.load("path/to/data/*.avro")
44
45
// Read with custom schema
46
val df4 = spark.read
47
.format("avro")
48
.option("avroSchema", customSchemaJson)
49
.load("path/to/data.avro")
50
51
// Read ignoring file extensions
52
val df5 = spark.read
53
.format("avro")
54
.option("ignoreExtension", "true")
55
.load("path/to/files_without_extension")
56
```
57
58
### Writing Avro Files
59
60
Write DataFrames to Avro files with configurable compression and schema options.
61
62
```scala { .api }
63
// Basic write operations
64
df.write.format("avro").save(path: String): Unit
65
df.write.format("avro").mode(saveMode: SaveMode).save(path: String): Unit
66
67
// With options
68
df.write.format("avro")
69
.option(key: String, value: String)
70
.mode(saveMode: SaveMode)
71
.save(path: String): Unit
72
```
73
74
**Usage Examples:**
75
76
```scala
77
import org.apache.spark.sql.SaveMode
78
79
val df = spark.table("source_data")
80
81
// Basic write
82
df.write
83
.format("avro")
84
.save("path/to/output")
85
86
// Write with compression
87
df.write
88
.format("avro")
89
.option("compression", "snappy")
90
.mode(SaveMode.Overwrite)
91
.save("path/to/compressed_output")
92
93
// Write with custom record name and namespace
94
df.write
95
.format("avro")
96
.option("recordName", "MyRecord")
97
.option("recordNamespace", "com.example.data")
98
.mode(SaveMode.Append)
99
.save("path/to/named_output")
100
101
// Write with custom schema
102
val customSchema = """{
103
"type": "record",
104
"name": "CustomRecord",
105
"fields": [
106
{"name": "id", "type": "long"},
107
{"name": "value", "type": "string"}
108
]
109
}"""
110
111
df.write
112
.format("avro")
113
.option("avroSchema", customSchema)
114
.save("path/to/custom_schema_output")
115
```
116
117
## Data Source Options\n\nComprehensive list of options available for Avro read and write operations.\n\n### Available Options Table\n\n| Option | Default | Description | Scope | Example |\n|--------|---------|-------------|-------|---------|\n| `avroSchema` | None | Custom Avro schema in JSON format | Read/Write | `\"{\\\"type\\\": \\\"record\\\", ...}\"` |\n| `recordName` | `\"topLevelRecord\"` | Top-level record name in Avro schema | Write | `\"UserRecord\"` |\n| `recordNamespace` | `\"\"` | Record namespace in Avro schema | Write | `\"com.example.data\"` |\n| `ignoreExtension` | `true` | Ignore file extensions when reading | Read | `\"false\"` |\n| `compression` | `\"snappy\"` | Compression codec for write operations | Write | `\"deflate\"` |\n\n### Schema Inference
118
119
Automatic schema detection from Avro files with support for schema evolution.
120
121
```scala { .api }
122
// Schema inference behavior
123
val inferredSchema = spark.read.format("avro").load(path).schema
124
```
125
126
**Schema Inference Examples:**
127
128
```scala
129
// Inspect inferred schema
130
val df = spark.read.format("avro").load("path/to/data.avro")
131
df.printSchema()
132
133
// Schema inference with multiple files (uses first readable file)
134
val multiFileDF = spark.read
135
.format("avro")
136
.load("path/to/multiple/*.avro")
137
138
// Handle corrupt files during schema inference
139
val robustDF = spark.read
140
.format("avro")
141
.option("ignoreCorruptFiles", "true")
142
.load("path/to/possibly_corrupt/*.avro")
143
```
144
145
### File Splitting and Parallelism
146
147
Avro files are splittable, enabling efficient distributed processing.
148
149
```scala { .api }
150
// Avro files support splitting for parallel processing
151
val parallelDF = spark.read
152
.format("avro")
153
.load("large_avro_file.avro")
154
155
// Spark automatically splits files across partitions
156
val partitionCount = parallelDF.rdd.getNumPartitions
157
```
158
159
### Supported Read Options
160
161
Configuration options for customizing read behavior:
162
163
```scala { .api }
164
// Available read options
165
.option("avroSchema", jsonSchemaString) // Custom schema
166
.option("ignoreExtension", "true|false") // Ignore .avro extension requirement
167
.option("ignoreCorruptFiles", "true|false") // Skip corrupt files during processing
168
```
169
170
### Supported Write Options
171
172
Configuration options for customizing write behavior:
173
174
```scala { .api }
175
// Available write options
176
.option("avroSchema", jsonSchemaString) // Custom output schema
177
.option("recordName", recordName) // Top-level record name
178
.option("recordNamespace", namespace) // Record namespace
179
.option("compression", compressionCodec) // Compression: snappy, deflate, bzip2, xz, uncompressed
180
```
181
182
### Compression Support
183
184
Multiple compression codecs are supported for write operations:
185
186
**Supported Compression Codecs:**
187
188
```scala
189
// Compression options
190
"snappy" // Default - balanced compression and speed
191
"deflate" // Good compression ratio, configurable level
192
"bzip2" // High compression ratio, slower
193
"xz" // High compression ratio, slower
194
"uncompressed" // No compression
195
```
196
197
**Compression Examples:**
198
199
```scala
200
// Snappy compression (default)
201
df.write.format("avro").save("snappy_output")
202
203
// Deflate with custom level (configured via Spark config)
204
spark.conf.set("spark.sql.avro.deflate.level", "6")
205
df.write
206
.format("avro")
207
.option("compression", "deflate")
208
.save("deflate_output")
209
210
// High compression with bzip2
211
df.write
212
.format("avro")
213
.option("compression", "bzip2")
214
.save("bzip2_output")
215
```
216
217
### Error Handling
218
219
Common error scenarios and handling strategies:
220
221
```scala
222
// Handle missing files
223
try {
224
val df = spark.read.format("avro").load("missing_file.avro")
225
} catch {
226
case e: org.apache.spark.sql.AnalysisException =>
227
println(s"File not found: ${e.getMessage}")
228
}
229
230
// Handle schema evolution conflicts
231
try {
232
val df = spark.read
233
.format("avro")
234
.option("avroSchema", strictSchema)
235
.load("evolved_schema_files/*.avro")
236
} catch {
237
case e: Exception =>
238
println(s"Schema compatibility issue: ${e.getMessage}")
239
}
240
```