0
# Configuration Options
1
2
This document covers all configuration options available for the Spark Avro connector across reading, writing, and binary conversion operations.
3
4
## Read Configuration Options
5
6
### Schema Options
7
8
```scala { .api }
9
// Schema specification options
10
option("avroSchema", "JSON schema string") // Custom Avro schema for reading
11
option("avroSchemaUrl", "file:///path/to/schema.avsc") // Schema file location
12
```
13
14
**Usage Examples:**
15
```scala
16
// Using inline schema
17
val customSchema = """
18
{
19
"type": "record",
20
"name": "User",
21
"fields": [
22
{"name": "id", "type": "long"},
23
{"name": "name", "type": "string"},
24
{"name": "email", "type": ["null", "string"], "default": null}
25
]
26
}
27
"""
28
29
val df = spark.read
30
.format("avro")
31
.option("avroSchema", customSchema)
32
.load("path/to/files")
33
34
// Using schema file
35
val df2 = spark.read
36
.format("avro")
37
.option("avroSchemaUrl", "file:///schemas/user.avsc")
38
.load("path/to/files")
39
40
// Using HDFS schema location
41
val df3 = spark.read
42
.format("avro")
43
.option("avroSchemaUrl", "hdfs://namenode:port/schemas/user.avsc")
44
.load("path/to/files")
45
```
46
47
### Parse Mode Options
48
49
```scala { .api }
50
// Error handling during parsing
51
option("mode", "FAILFAST|PERMISSIVE|DROPMALFORMED")
52
```
53
54
**Mode Descriptions:**
55
- **FAILFAST**: Throw exception on any parsing error (default)
56
- **PERMISSIVE**: Set malformed records to null, continue processing
57
- **DROPMALFORMED**: Skip malformed records entirely
58
59
```scala
60
// Strict parsing - fail on any error
61
val strictDF = spark.read
62
.format("avro")
63
.option("mode", "FAILFAST")
64
.load("path/to/files")
65
66
// Lenient parsing - continue with nulls for bad records
67
val lenientDF = spark.read
68
.format("avro")
69
.option("mode", "PERMISSIVE")
70
.load("path/to/files")
71
72
// Skip bad records entirely
73
val filteredDF = spark.read
74
.format("avro")
75
.option("mode", "DROPMALFORMED")
76
.load("path/to/files")
77
```
78
79
### Field Matching Options
80
81
```scala { .api }
82
// Field matching strategy
83
option("positionalFieldMatching", "true|false") // Default: false
84
```
85
86
```scala
87
// Match fields by name (default behavior)
88
val nameMatchedDF = spark.read
89
.format("avro")
90
.option("positionalFieldMatching", "false")
91
.load("path/to/files")
92
93
// Match fields by position - useful for schema evolution
94
val positionMatchedDF = spark.read
95
.format("avro")
96
.option("positionalFieldMatching", "true")
97
.load("path/to/files")
98
```
99
100
### DateTime Handling Options
101
102
```scala { .api }
103
// Calendar rebase mode for date/timestamp values
104
option("datetimeRebaseMode", "EXCEPTION|LEGACY|CORRECTED")
105
```
106
107
**Rebase Modes:**
108
- **EXCEPTION**: Throw exception when rebasing is needed (safest)
109
- **LEGACY**: Use legacy Julian calendar behavior
110
- **CORRECTED**: Use proleptic Gregorian calendar (recommended)
111
112
```scala
113
val df = spark.read
114
.format("avro")
115
.option("datetimeRebaseMode", "CORRECTED")
116
.load("path/to/files")
117
```
118
119
### Union Type Options
120
121
```scala { .api }
122
// Union type field naming strategy
123
option("enableStableIdentifiersForUnionType", "true|false") // Default: false
124
```
125
126
```scala
127
// Default union field names: member0, member1, etc.
128
val defaultUnionDF = spark.read
129
.format("avro")
130
.option("enableStableIdentifiersForUnionType", "false")
131
.load("path/to/files")
132
133
// Stable union field names based on type: member_string, member_int, etc.
134
val stableUnionDF = spark.read
135
.format("avro")
136
.option("enableStableIdentifiersForUnionType", "true")
137
.load("path/to/files")
138
```
139
140
### File Handling Options
141
142
```scala { .api }
143
// File extension handling (deprecated - use pathGlobFilter instead)
144
option("ignoreExtension", "true|false") // Default: true
145
146
// Standard Spark options also supported
147
option("pathGlobFilter", "*.avro") // File pattern matching
148
option("recursiveFileLookup", "true|false") // Recursive directory scanning
149
option("ignoreCorruptFiles", "true|false") // Skip corrupt files
150
option("modifiedBefore", "2023-01-01") // Filter by modification time
151
option("modifiedAfter", "2022-01-01") // Filter by modification time
152
```
153
154
```scala
155
// Modern approach using pathGlobFilter
156
val filteredDF = spark.read
157
.format("avro")
158
.option("pathGlobFilter", "*.avro")
159
.option("recursiveFileLookup", "true")
160
.load("path/to/nested/directories")
161
162
// Legacy approach (deprecated)
163
val legacyDF = spark.read
164
.format("avro")
165
.option("ignoreExtension", "true")
166
.load("path/to/files")
167
```
168
169
## Write Configuration Options
170
171
### Schema Options
172
173
```scala { .api }
174
// Output schema specification
175
option("avroSchema", "JSON schema string") // Custom output schema
176
option("recordName", "string") // Top-level record name
177
option("recordNamespace", "string") // Record namespace
178
```
179
180
```scala
181
val outputSchema = """
182
{
183
"type": "record",
184
"name": "OutputRecord",
185
"namespace": "com.example.data",
186
"fields": [
187
{"name": "id", "type": "long"},
188
{"name": "value", "type": "string"}
189
]
190
}
191
"""
192
193
df.write
194
.format("avro")
195
.option("avroSchema", outputSchema)
196
.save("path/to/output")
197
198
// Alternative: specify record name and namespace separately
199
df.write
200
.format("avro")
201
.option("recordName", "MyRecord")
202
.option("recordNamespace", "com.example")
203
.save("path/to/output")
204
```
205
206
### Compression Options
207
208
```scala { .api }
209
// Compression codec selection
210
option("compression", "uncompressed|snappy|deflate|bzip2|xz|zstandard")
211
```
212
213
**Compression Codec Characteristics:**
214
- **snappy**: Fast compression/decompression, moderate ratio (default)
215
- **deflate**: Good compression ratio, configurable level
216
- **bzip2**: High compression ratio, slower performance
217
- **xz**: High compression ratio, slower performance
218
- **zstandard**: Best balance of speed and compression ratio
219
- **uncompressed**: No compression, fastest I/O
220
221
```scala
222
// Fast compression (default)
223
df.write
224
.format("avro")
225
.option("compression", "snappy")
226
.save("path/to/output")
227
228
// Best compression ratio
229
df.write
230
.format("avro")
231
.option("compression", "zstandard")
232
.save("path/to/output")
233
234
// No compression
235
df.write
236
.format("avro")
237
.option("compression", "uncompressed")
238
.save("path/to/output")
239
```
240
241
### Field Matching Options
242
243
```scala { .api }
244
// Field matching strategy for writing
245
option("positionalFieldMatching", "true|false") // Default: false
246
```
247
248
```scala
249
// Write with positional field matching
250
df.write
251
.format("avro")
252
.option("positionalFieldMatching", "true")
253
.save("path/to/output")
254
```
255
256
## Binary Conversion Options
257
258
Options used with `from_avro()` and `to_avro()` functions:
259
260
```scala { .api }
261
import scala.collection.JavaConverters._
262
263
// Available options for from_avro()
264
val fromAvroOptions: java.util.Map[String, String] = Map(
265
"mode" -> "FAILFAST|PERMISSIVE|DROPMALFORMED",
266
"datetimeRebaseMode" -> "EXCEPTION|LEGACY|CORRECTED",
267
"enableStableIdentifiersForUnionType" -> "true|false"
268
).asJava
269
```
270
271
```scala
272
import org.apache.spark.sql.avro.functions._
273
import scala.collection.JavaConverters._
274
275
val options = Map(
276
"mode" -> "PERMISSIVE",
277
"datetimeRebaseMode" -> "CORRECTED",
278
"enableStableIdentifiersForUnionType" -> "true"
279
).asJava
280
281
val decodedDF = df.select(
282
from_avro(col("avro_data"), avroSchema, options).as("decoded")
283
)
284
```
285
286
## Global Configuration
287
288
### Spark SQL Configuration
289
290
```scala { .api }
291
// Set global defaults via Spark configuration
292
spark.conf.set("spark.sql.avro.compression.codec", "snappy")
293
spark.conf.set("spark.sql.avro.deflate.level", "6")
294
spark.conf.set("spark.sql.avro.datetimeRebaseModeInRead", "CORRECTED")
295
```
296
297
```scala
298
// Configure global defaults
299
spark.conf.set("spark.sql.avro.compression.codec", "zstandard")
300
spark.conf.set("spark.sql.avro.datetimeRebaseModeInRead", "CORRECTED")
301
302
// These defaults will be used when options are not explicitly specified
303
val df = spark.read.format("avro").load("path/to/files")
304
```
305
306
### Hadoop Configuration
307
308
```scala
309
// For advanced use cases, configure Hadoop-level settings
310
val hadoopConf = spark.sparkContext.hadoopConfiguration
311
312
// Avro-specific Hadoop configurations
313
hadoopConf.setBoolean("avro.mapred.ignore.inputs.without.extension", false)
314
hadoopConf.set("avro.schema.input.key", customSchemaJson)
315
```
316
317
## Performance Tuning Options
318
319
### File Size Optimization
320
321
```scala
322
// Control output file size
323
df.coalesce(10) // Reduce number of output files
324
.write
325
.format("avro")
326
.option("compression", "snappy")
327
.save("path/to/output")
328
329
// For large datasets, use repartition for better parallelism
330
df.repartition(100)
331
.write
332
.format("avro")
333
.save("path/to/output")
334
```
335
336
### Memory Management
337
338
```scala
339
// For large schema conversions, consider caching
340
val convertedDF = df.select(
341
from_avro(col("large_avro_data"), complexSchema).as("converted")
342
).cache()
343
344
// Persist decoded data for multiple operations
345
convertedDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
346
```
347
348
## Error Handling Configuration
349
350
### Comprehensive Error Handling Setup
351
352
```scala
353
val robustReadConfig = spark.read
354
.format("avro")
355
.option("mode", "PERMISSIVE") // Continue on parse errors
356
.option("ignoreCorruptFiles", "true") // Skip corrupt files
357
.option("datetimeRebaseMode", "CORRECTED") // Handle date/time properly
358
.option("pathGlobFilter", "*.avro") // Only process .avro files
359
360
val df = robustReadConfig.load("path/to/potentially/problematic/files")
361
362
// Filter out rows that failed to parse
363
val cleanDF = df.filter(col("decoded_data").isNotNull)
364
```
365
366
### Write with Error Recovery
367
368
```scala
369
try {
370
df.write
371
.format("avro")
372
.mode("overwrite")
373
.option("compression", "snappy")
374
.save("path/to/output")
375
} catch {
376
case e: AnalysisException if e.getMessage.contains("schema") =>
377
// Handle schema-related errors
378
println(s"Schema error: ${e.getMessage}")
379
// Retry with different schema or options
380
case e: Exception =>
381
// Handle other errors
382
println(s"Write failed: ${e.getMessage}")
383
}
384
```
385
386
## Configuration Best Practices
387
388
### Production Configuration Template
389
390
```scala
391
// Robust production configuration for reading
392
def createRobustAvroReader(spark: SparkSession): DataFrameReader = {
393
spark.read
394
.format("avro")
395
.option("mode", "PERMISSIVE")
396
.option("ignoreCorruptFiles", "true")
397
.option("datetimeRebaseMode", "CORRECTED")
398
.option("pathGlobFilter", "*.avro")
399
.option("recursiveFileLookup", "true")
400
}
401
402
// Optimized production configuration for writing
403
def createOptimizedAvroWriter(df: DataFrame): DataFrameWriter[Row] = {
404
df.coalesce(math.max(1, df.rdd.getNumPartitions / 4)) // Reduce small files
405
.write
406
.format("avro")
407
.option("compression", "zstandard") // Best compression
408
.mode("overwrite")
409
}
410
```
411
412
### Schema Evolution Configuration
413
414
```scala
415
// Configuration for schema evolution scenarios
416
def createEvolutionSafeReader(
417
spark: SparkSession,
418
readerSchema: String
419
): DataFrameReader = {
420
spark.read
421
.format("avro")
422
.option("avroSchema", readerSchema) // Use evolved schema
423
.option("positionalFieldMatching", "false") // Match by name
424
.option("mode", "PERMISSIVE") // Handle missing fields gracefully
425
.option("enableStableIdentifiersForUnionType", "true") // Stable union handling
426
}
427
```
428
429
### Streaming Configuration
430
431
```scala
432
// Configuration for streaming Avro data
433
val streamingAvroDF = spark.readStream
434
.format("kafka")
435
.option("kafka.bootstrap.servers", "localhost:9092")
436
.option("subscribe", "avro-topic")
437
.load()
438
.select(
439
from_avro(
440
col("value"),
441
avroSchema,
442
Map(
443
"mode" -> "PERMISSIVE",
444
"datetimeRebaseMode" -> "CORRECTED"
445
).asJava
446
).as("data")
447
)
448
```