0
# Configuration Options
1
2
Apache Spark Avro provides extensive configuration options for controlling Avro processing behavior, including compression settings, schema handling, parsing modes, and performance optimizations.
3
4
## Read Options
5
6
Configuration options for reading Avro files and deserializing Avro data.
7
8
### Error Handling
9
10
```scala { .api }
11
val readOptions = Map(
12
"mode" -> "PERMISSIVE" // PERMISSIVE | DROPMALFORMED | FAILFAST
13
)
14
```
15
16
**Available Modes:**
17
- **PERMISSIVE** (default): Sets corrupt records to null and continues processing
18
- **DROPMALFORMED**: Ignores corrupt records completely
19
- **FAILFAST**: Throws exception on first corrupt record
20
21
**Usage Example:**
22
```scala
23
// Strict parsing - fail on any malformed data
24
val strictDf = spark.read.format("avro")
25
.option("mode", "FAILFAST")
26
.load("sensitive-data.avro")
27
28
// Permissive parsing - continue with nulls for bad records
29
val lenientDf = spark.read.format("avro")
30
.option("mode", "PERMISSIVE")
31
.load("messy-data.avro")
32
```
33
34
### Schema Options
35
36
```scala { .api }
37
val schemaOptions = Map(
38
"avroSchema" -> jsonSchemaString, // Override input schema
39
"ignoreExtension" -> "true", // Read files regardless of extension
40
"recursiveFieldMaxDepth" -> "10" // Maximum recursion depth for nested fields
41
)
42
```
43
44
**Schema Override Example:**
45
```scala
46
val fixedSchema = """{
47
"type": "record",
48
"name": "User",
49
"fields": [
50
{"name": "id", "type": "long"},
51
{"name": "name", "type": "string"}
52
]
53
}"""
54
55
val df = spark.read.format("avro")
56
.option("avroSchema", fixedSchema)
57
.option("ignoreExtension", "true")
58
.load("data-files/*")
59
```
60
61
### Union Type Handling
62
63
```scala { .api }
64
val unionOptions = Map(
65
"enableStableIdentifiersForUnionType" -> "true", // Use stable identifiers for union types
66
"stableIdentifierPrefixForUnionType" -> "union_" // Prefix for stable union identifiers
67
)
68
```
69
70
**Usage Example:**
71
```scala
72
import java.util.{Map => JMap}
73
import scala.jdk.CollectionConverters._
74
75
val options: JMap[String, String] = Map(
76
"enableStableIdentifiersForUnionType" -> "true",
77
"stableIdentifierPrefixForUnionType" -> "variant_"
78
).asJava
79
80
val df = df.select(from_avro(col("union_data"), unionSchema, options).as("parsed"))
81
```
82
83
### DateTime Handling
84
85
```scala { .api }
86
val datetimeOptions = Map(
87
"datetimeRebaseMode" -> "CORRECTED" // EXCEPTION | CORRECTED | LEGACY
88
)
89
```
90
91
**Rebase Modes:**
92
- **EXCEPTION**: Throw exception for dates before 1582-10-15
93
- **CORRECTED**: Apply Gregorian calendar corrections
94
- **LEGACY**: Use legacy Julian calendar handling
95
96
## Write Options
97
98
Configuration options for writing Avro files and serializing data to Avro format.
99
100
### Compression
101
102
```scala { .api }
103
val compressionOptions = Map(
104
"compression" -> "snappy" // uncompressed | deflate | snappy | bzip2 | xz | zstandard
105
)
106
```
107
108
**Available Compression Codecs:**
109
110
| Codec | Performance | Compression Ratio | Use Case |
111
|-------|-------------|-------------------|----------|
112
| uncompressed | Fastest | None | Development, high-speed networks |
113
| snappy | Very Fast | Good | General purpose, balanced performance |
114
| deflate | Fast | Better | Standard compression |
115
| bzip2 | Slow | Best | Archival, storage-optimized |
116
| xz | Slowest | Excellent | Long-term storage |
117
| zstandard | Fast | Excellent | Modern high-performance compression |
118
119
#### AvroCompressionCodec Java Enum
120
121
The compression codecs are defined as a Java enum with additional utility methods:
122
123
```java { .api }
124
public enum AvroCompressionCodec {
125
UNCOMPRESSED, DEFLATE, SNAPPY, BZIP2, XZ, ZSTANDARD;
126
127
public String getCodecName();
128
public boolean getSupportCompressionLevel();
129
public String lowerCaseName();
130
public static AvroCompressionCodec fromString(String s);
131
}
132
```
133
134
**Enum Methods:**
135
- `getCodecName()`: Returns the Avro codec name constant
136
- `getSupportCompressionLevel()`: Returns true if codec supports compression levels
137
- `lowerCaseName()`: Returns lowercase name for the codec
138
- `fromString(String)`: Parses codec from string (case-insensitive)
139
140
**Usage Example:**
141
```scala
142
// High-performance compression
143
df.write.format("avro")
144
.option("compression", "snappy")
145
.save("fast-access-data")
146
147
// Maximum compression for archival
148
df.write.format("avro")
149
.option("compression", "zstandard")
150
.save("archived-data")
151
```
152
153
### Schema Generation
154
155
```scala { .api }
156
val schemaGenOptions = Map(
157
"recordName" -> "MyRecord", // Name for generated record types
158
"recordNamespace" -> "com.example", // Namespace for generated schemas
159
"avroSchema" -> customSchemaJson // Use custom output schema
160
)
161
```
162
163
**Custom Schema Example:**
164
```scala
165
val outputSchema = """{
166
"type": "record",
167
"name": "ExportRecord",
168
"namespace": "com.company.exports",
169
"fields": [
170
{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-micros"}},
171
{"name": "user_id", "type": "string"},
172
{"name": "metrics", "type": {"type": "map", "values": "double"}}
173
]
174
}"""
175
176
df.write.format("avro")
177
.option("avroSchema", outputSchema)
178
.option("compression", "snappy")
179
.save("structured-export")
180
```
181
182
## Function Options
183
184
Configuration options for `from_avro`, `to_avro`, and `schema_of_avro` functions.
185
186
### Parsing Options
187
188
```scala { .api }
189
val functionOptions: java.util.Map[String, String] = Map(
190
"mode" -> "PERMISSIVE",
191
"datetimeRebaseMode" -> "CORRECTED",
192
"enableStableIdentifiersForUnionType" -> "false",
193
"stableIdentifierPrefixForUnionType" -> "",
194
"recursiveFieldMaxDepth" -> "5"
195
).asJava
196
```
197
198
**Complete Function Usage:**
199
```scala
200
import java.util.{Map => JMap}
201
import scala.jdk.CollectionConverters._
202
import org.apache.spark.sql.avro.functions._
203
204
val parseOptions: JMap[String, String] = Map(
205
"mode" -> "FAILFAST",
206
"datetimeRebaseMode" -> "CORRECTED",
207
"enableStableIdentifiersForUnionType" -> "true",
208
"recursiveFieldMaxDepth" -> "10"
209
).asJava
210
211
val decodedDf = df.select(
212
from_avro(col("avro_binary"), avroSchema, parseOptions).as("decoded_data")
213
)
214
215
val schemaOptions: JMap[String, String] = Map(
216
"enableStableIdentifiersForUnionType" -> "true",
217
"recursiveFieldMaxDepth" -> "8"
218
).asJava
219
220
val schemaDf = spark.sql(
221
"SELECT schema_of_avro('"+complexSchema+"', map('enableStableIdentifiersForUnionType', 'true')) AS spark_schema"
222
)
223
```
224
225
## Performance Tuning Options
226
227
### File Size Optimization
228
229
```scala { .api }
230
// Control output file size through partitioning
231
df.coalesce(numPartitions)
232
.write.format("avro")
233
.option("compression", "snappy")
234
.save(path)
235
236
// Optimal partition size: 128MB - 1GB per file
237
val optimalPartitions = (df.count() * avgRowSizeBytes / targetPartitionSizeBytes).toInt
238
```
239
240
### Schema Inference Control
241
242
```scala { .api }
243
// Disable schema inference by providing explicit schema
244
val df = spark.read.format("avro")
245
.option("avroSchema", knownSchema) // Avoids scanning all files
246
.load("large-dataset/*.avro")
247
```
248
249
### Memory Management
250
251
```scala { .api }
252
val memoryOptions = Map(
253
"recursiveFieldMaxDepth" -> "3", // Limit nesting to control memory usage
254
"mode" -> "DROPMALFORMED" // Skip problematic records to maintain memory
255
)
256
```
257
258
## Option Constants
259
260
For type-safe configuration, use the predefined constants:
261
262
```scala { .api }
263
// Common option keys (from AvroOptions)
264
object AvroOptionKeys {
265
val IGNORE_EXTENSION = "ignoreExtension"
266
val MODE = "mode"
267
val RECORD_NAME = "recordName"
268
val COMPRESSION = "compression"
269
val AVRO_SCHEMA = "avroSchema"
270
val AVRO_SCHEMA_URL = "avroSchemaUrl"
271
val RECORD_NAMESPACE = "recordNamespace"
272
val POSITIONAL_FIELD_MATCHING = "positionalFieldMatching"
273
val DATETIME_REBASE_MODE = "datetimeRebaseMode"
274
val STABLE_ID_FOR_UNION_TYPE = "enableStableIdentifiersForUnionType"
275
val STABLE_ID_PREFIX_FOR_UNION_TYPE = "stableIdentifierPrefixForUnionType"
276
val RECURSIVE_FIELD_MAX_DEPTH = "recursiveFieldMaxDepth"
277
}
278
```
279
280
## Configuration Examples
281
282
### Production Reading Configuration
283
284
```scala
285
val productionReadConfig = Map(
286
"mode" -> "FAILFAST", // Strict error handling
287
"ignoreExtension" -> "false", // Validate file extensions
288
"recursiveFieldMaxDepth" -> "5", // Reasonable nesting limit
289
"datetimeRebaseMode" -> "CORRECTED" // Handle historical dates correctly
290
)
291
292
val df = spark.read.format("avro")
293
.options(productionReadConfig)
294
.load("production-data/*.avro")
295
```
296
297
### High-Performance Writing Configuration
298
299
```scala
300
val highPerfWriteConfig = Map(
301
"compression" -> "snappy", // Fast compression
302
"recordName" -> "OptimizedRecord", // Descriptive record name
303
"recordNamespace" -> "com.company.data" // Proper namespace
304
)
305
306
df.coalesce(200) // Optimize partition count
307
.write.format("avro")
308
.options(highPerfWriteConfig)
309
.save("optimized-output")
310
```
311
312
### Development/Testing Configuration
313
314
```scala
315
val devConfig = Map(
316
"mode" -> "PERMISSIVE", // Lenient error handling
317
"ignoreExtension" -> "true", // Flexible file reading
318
"compression" -> "uncompressed" // Fast writes for testing
319
)
320
321
val testDf = spark.read.format("avro")
322
.options(devConfig)
323
.load("test-data/*")
324
```