0
# Configuration Options
1
2
The Spark Avro connector provides comprehensive configuration options through the `AvroOptions` class and related constants. These options control various aspects of Avro processing including compression, schema handling, field matching, and error handling.
3
4
## AvroOptions Class
5
6
The main configuration class for Avro operations.
7
8
```scala { .api }
9
class AvroOptions(
10
parameters: CaseInsensitiveMap[String],
11
conf: Configuration
12
) extends FileSourceOptions(parameters) {
13
14
val schema: Option[Schema]
15
val positionalFieldMatching: Boolean
16
val recordName: String
17
val recordNamespace: String
18
val compression: String
19
val parseMode: ParseMode
20
val datetimeRebaseModeInRead: String
21
val useStableIdForUnionType: Boolean
22
}
23
```
24
25
### Factory Method
26
27
```scala { .api }
28
object AvroOptions {
29
def apply(parameters: Map[String, String]): AvroOptions
30
}
31
```
32
33
**Usage Example:**
34
35
```scala
36
import org.apache.spark.sql.avro.AvroOptions
37
38
val options = AvroOptions(Map(
39
"compression" -> "snappy",
40
"recordName" -> "MyRecord",
41
"recordNamespace" -> "com.example.avro"
42
))
43
44
println(s"Compression: ${options.compression}")
45
println(s"Record Name: ${options.recordName}")
46
```
47
48
## Configuration Constants
49
50
All configuration option keys are defined as constants in the `AvroOptions` companion object.
51
52
```scala { .api }
53
object AvroOptions {
54
val COMPRESSION: String
55
val RECORD_NAME: String
56
val RECORD_NAMESPACE: String
57
val AVRO_SCHEMA: String
58
val AVRO_SCHEMA_URL: String
59
val POSITIONAL_FIELD_MATCHING: String
60
val DATETIME_REBASE_MODE: String
61
val MODE: String
62
val IGNORE_EXTENSION: String // Deprecated
63
val STABLE_ID_FOR_UNION_TYPE: String
64
}
65
```
66
67
## Schema Configuration
68
69
### avroSchema
70
71
Specify a custom Avro schema for reading or writing.
72
73
**Option Key:** `AvroOptions.AVRO_SCHEMA` (`"avroSchema"`)
74
75
**Usage in Reading:**
76
```scala
77
val customSchema = """
78
{
79
"type": "record",
80
"name": "User",
81
"fields": [
82
{"name": "id", "type": "long"},
83
{"name": "name", "type": "string"},
84
{"name": "email", "type": ["null", "string"], "default": null}
85
]
86
}
87
"""
88
89
val df = spark.read
90
.format("avro")
91
.option(AvroOptions.AVRO_SCHEMA, customSchema)
92
.load("path/to/avro/files")
93
```
94
95
**Usage in Writing:**
96
```scala
97
df.write
98
.format("avro")
99
.option(AvroOptions.AVRO_SCHEMA, customSchema)
100
.save("path/to/output")
101
```
102
103
### avroSchemaUrl
104
105
Load Avro schema from a URL or file path.
106
107
**Option Key:** `AvroOptions.AVRO_SCHEMA_URL` (`"avroSchemaUrl"`)
108
109
```scala
110
val df = spark.read
111
.format("avro")
112
.option(AvroOptions.AVRO_SCHEMA_URL, "hdfs://cluster/schemas/user.avsc")
113
.load("path/to/avro/files")
114
115
// Also works with local files
116
val df2 = spark.read
117
.format("avro")
118
.option(AvroOptions.AVRO_SCHEMA_URL, "file:///local/path/schema.avsc")
119
.load("path/to/avro/files")
120
```
121
122
## Record Configuration
123
124
### recordName
125
126
Set the top-level record name for writing Avro files.
127
128
**Option Key:** `AvroOptions.RECORD_NAME` (`"recordName"`)
129
**Default:** `"topLevelRecord"`
130
131
```scala
132
df.write
133
.format("avro")
134
.option(AvroOptions.RECORD_NAME, "UserRecord")
135
.save("path/to/output")
136
```
137
138
### recordNamespace
139
140
Set the namespace for the top-level record.
141
142
**Option Key:** `AvroOptions.RECORD_NAMESPACE` (`"recordNamespace"`)
143
**Default:** `""` (empty string)
144
145
```scala
146
df.write
147
.format("avro")
148
.option(AvroOptions.RECORD_NAME, "User")
149
.option(AvroOptions.RECORD_NAMESPACE, "com.example.avro")
150
.save("path/to/output")
151
```
152
153
## Compression Configuration
154
155
### compression
156
157
Set the compression codec for writing Avro files.
158
159
**Option Key:** `AvroOptions.COMPRESSION` (`"compression"`)
160
**Default:** Value from `spark.sql.avro.compression.codec` or `"snappy"`
161
162
**Supported Codecs:**
163
- `uncompressed`: No compression
164
- `snappy`: Snappy compression (default)
165
- `deflate`: Deflate compression
166
- `bzip2`: Bzip2 compression
167
- `xz`: XZ compression
168
- `zstandard`: Zstandard compression
169
170
```scala
171
// Different compression options
172
df.write.format("avro").option(AvroOptions.COMPRESSION, "uncompressed").save("uncompressed")
173
df.write.format("avro").option(AvroOptions.COMPRESSION, "snappy").save("snappy")
174
df.write.format("avro").option(AvroOptions.COMPRESSION, "deflate").save("deflate")
175
df.write.format("avro").option(AvroOptions.COMPRESSION, "bzip2").save("bzip2")
176
df.write.format("avro").option(AvroOptions.COMPRESSION, "xz").save("xz")
177
df.write.format("avro").option(AvroOptions.COMPRESSION, "zstandard").save("zstandard")
178
```
179
180
## Field Matching Configuration
181
182
### positionalFieldMatching
183
184
Control how fields are matched between Spark and Avro schemas.
185
186
**Option Key:** `AvroOptions.POSITIONAL_FIELD_MATCHING` (`"positionalFieldMatching"`)
187
**Default:** `false`
188
189
**Values:**
190
- `false`: Match fields by name (default)
191
- `true`: Match fields by position
192
193
```scala
194
// Match by position instead of name
195
val df = spark.read
196
.format("avro")
197
.option(AvroOptions.POSITIONAL_FIELD_MATCHING, "true")
198
.load("path/to/avro/files")
199
200
// Useful when field names don't match but structure is identical
201
val sparkSchema = StructType(Seq(
202
StructField("user_id", LongType), // Position 0
203
StructField("full_name", StringType), // Position 1
204
StructField("email_addr", StringType) // Position 2
205
))
206
207
// Avro schema has different field names but same positions:
208
// {"name": "id", "type": "long"} // Position 0
209
// {"name": "name", "type": "string"} // Position 1
210
// {"name": "email", "type": "string"} // Position 2
211
```
212
213
## Error Handling Configuration
214
215
### mode
216
217
Control how parsing errors are handled.
218
219
**Option Key:** `AvroOptions.MODE` (`"mode"`)
220
**Default:** `FAILFAST`
221
222
**Values:**
223
- `FAILFAST`: Throw exception on first error (default)
224
- `PERMISSIVE`: Set corrupt records to null, continue processing
225
- `DROPMALFORMED`: Drop corrupt records, continue processing
226
227
```scala
228
// Drop malformed records
229
val df = spark.read
230
.format("avro")
231
.option(AvroOptions.MODE, "DROPMALFORMED")
232
.load("path/to/avro/files")
233
234
// Keep corrupt records as null
235
val df2 = spark.read
236
.format("avro")
237
.option(AvroOptions.MODE, "PERMISSIVE")
238
.load("path/to/avro/files")
239
```
240
241
## Date/Time Configuration
242
243
### datetimeRebaseMode
244
245
Control rebasing of DATE and TIMESTAMP values between Julian and Proleptic Gregorian calendars.
246
247
**Option Key:** `AvroOptions.DATETIME_REBASE_MODE` (`"datetimeRebaseMode"`)
248
**Default:** Value from `spark.sql.avro.datetimeRebaseModeInRead`
249
250
**Values:**
251
- `EXCEPTION`: Throw exception for dates requiring rebasing
252
- `LEGACY`: Use legacy Julian calendar behavior
253
- `CORRECTED`: Apply Proleptic Gregorian calendar correction
254
255
```scala
256
val df = spark.read
257
.format("avro")
258
.option(AvroOptions.DATETIME_REBASE_MODE, "CORRECTED")
259
.load("path/to/historical/avro/files")
260
```
261
262
## Union Type Configuration
263
264
### enableStableIdentifiersForUnionType
265
266
Control field naming for Avro union types when converting to Spark SQL.
267
268
**Option Key:** `AvroOptions.STABLE_ID_FOR_UNION_TYPE` (`"enableStableIdentifiersForUnionType"`)
269
**Default:** `false`
270
271
**Values:**
272
- `false`: Use dynamic field names (default)
273
- `true`: Use stable, consistent field names based on type
274
275
```scala
276
val unionSchema = """
277
{
278
"type": "record",
279
"name": "Event",
280
"fields": [
281
{"name": "data", "type": [
282
{"type": "record", "name": "UserEvent", "fields": [{"name": "userId", "type": "long"}]},
283
{"type": "record", "name": "SystemEvent", "fields": [{"name": "systemId", "type": "string"}]}
284
]}
285
]
286
}
287
"""
288
289
val df = spark.read
290
.format("avro")
291
.option(AvroOptions.AVRO_SCHEMA, unionSchema)
292
.option(AvroOptions.STABLE_ID_FOR_UNION_TYPE, "true")
293
.load("path/to/union/data")
294
295
// Results in stable field names like "data.member_userevent" and "data.member_systemevent"
296
```
297
298
## Deprecated Options
299
300
### ignoreExtension (Deprecated)
301
302
**Option Key:** `AvroOptions.IGNORE_EXTENSION` (`"ignoreExtension"`)
303
**Status:** Deprecated in Spark 3.0, use `pathGlobFilter` instead
304
305
```scala
306
// Deprecated way
307
val df = spark.read
308
.format("avro")
309
.option(AvroOptions.IGNORE_EXTENSION, "true")
310
.load("path/to/files")
311
312
// Modern way
313
val df2 = spark.read
314
.format("avro")
315
.option("pathGlobFilter", "*.data") // Or other pattern
316
.load("path/to/files")
317
```
318
319
## Global Configuration
320
321
Some Avro behavior can be controlled through Spark SQL configuration:
322
323
### spark.sql.avro.compression.codec
324
325
Default compression codec for Avro files.
326
327
```scala
328
spark.conf.set("spark.sql.avro.compression.codec", "zstandard")
329
330
// All subsequent Avro writes will use zstandard compression by default
331
df.write.format("avro").save("path/to/output")
332
```
333
334
### spark.sql.avro.deflate.level
335
336
Compression level for deflate codec (1-9).
337
338
```scala
339
spark.conf.set("spark.sql.avro.deflate.level", "6")
340
341
df.write
342
.format("avro")
343
.option(AvroOptions.COMPRESSION, "deflate")
344
.save("path/to/output")
345
```
346
347
### spark.sql.avro.datetimeRebaseModeInRead
348
349
Default datetime rebase mode for reading.
350
351
```scala
352
spark.conf.set("spark.sql.avro.datetimeRebaseModeInRead", "CORRECTED")
353
```
354
355
## Option Combinations
356
357
### Complete Example
358
359
Combining multiple options for complex scenarios:
360
361
```scala
362
val complexDF = spark.read
363
.format("avro")
364
.option(AvroOptions.AVRO_SCHEMA_URL, "hdfs://cluster/schemas/evolved-schema.avsc")
365
.option(AvroOptions.MODE, "PERMISSIVE")
366
.option(AvroOptions.POSITIONAL_FIELD_MATCHING, "false")
367
.option(AvroOptions.DATETIME_REBASE_MODE, "CORRECTED")
368
.option(AvroOptions.STABLE_ID_FOR_UNION_TYPE, "true")
369
.option("pathGlobFilter", "*.avro")
370
.load("path/to/avro/files")
371
372
complexDF.write
373
.format("avro")
374
.option(AvroOptions.COMPRESSION, "zstandard")
375
.option(AvroOptions.RECORD_NAME, "ProcessedEvent")
376
.option(AvroOptions.RECORD_NAMESPACE, "com.company.events")
377
.mode("overwrite")
378
.save("path/to/processed/output")
379
```
380
381
### Schema Evolution Scenario
382
383
Configuration for reading old data with new schema:
384
385
```scala
386
val evolvedSchema = """
387
{
388
"type": "record",
389
"name": "UserV2",
390
"fields": [
391
{"name": "id", "type": "long"},
392
{"name": "name", "type": "string"},
393
{"name": "email", "type": "string"},
394
{"name": "phone", "type": ["null", "string"], "default": null},
395
{"name": "created_at", "type": ["null", "long"], "default": null}
396
]
397
}
398
"""
399
400
val migratedDF = spark.read
401
.format("avro")
402
.option(AvroOptions.AVRO_SCHEMA, evolvedSchema)
403
.option(AvroOptions.MODE, "PERMISSIVE") // Handle missing fields gracefully
404
.option(AvroOptions.DATETIME_REBASE_MODE, "CORRECTED")
405
.load("path/to/legacy/user/data")
406
```
407
408
## Performance Considerations
409
410
### Compression Trade-offs
411
412
```scala
413
// Fastest compression (good for temporary data)
414
df.write.format("avro").option(AvroOptions.COMPRESSION, "snappy").save("temp")
415
416
// Best compression ratio (good for archival)
417
df.write.format("avro").option(AvroOptions.COMPRESSION, "zstandard").save("archive")
418
419
// No compression (fastest write, largest files)
420
df.write.format("avro").option(AvroOptions.COMPRESSION, "uncompressed").save("staging")
421
```
422
423
### Schema Caching
424
425
When repeatedly using the same schema:
426
427
```scala
428
// Cache schema in broadcast variable for reuse
429
val schemaBC = spark.sparkContext.broadcast(complexSchema)
430
431
// Use in multiple operations
432
df1.write.format("avro").option(AvroOptions.AVRO_SCHEMA, schemaBC.value).save("output1")
433
df2.write.format("avro").option(AvroOptions.AVRO_SCHEMA, schemaBC.value).save("output2")
434
```