0
# Configuration Options
1
2
Comprehensive configuration system for customizing Avro read and write operations. The AvroOptions class provides case-insensitive parameter handling with intelligent defaults and integration with Hadoop configuration.
3
4
## Capabilities
5
6
### AvroOptions Class
7
8
Central configuration class for Avro read and write operations with case-insensitive parameter handling.
9
10
```scala { .api }
11
class AvroOptions(
12
@transient val parameters: CaseInsensitiveMap[String],
13
@transient val conf: Configuration
14
) extends Logging with Serializable {
15
16
def this(parameters: Map[String, String], conf: Configuration) = {
17
this(CaseInsensitiveMap(parameters), conf)
18
}
19
20
// Configuration properties
21
val schema: Option[String] // Optional user-provided schema in JSON format
22
val recordName: String // Top level record name (default: "topLevelRecord")
23
val recordNamespace: String // Record namespace (default: "")
24
val ignoreExtension: Boolean // Ignore file extensions in read operations
25
val compression: String // Compression codec for write operations
26
}
27
```
28
29
**Usage Examples:**
30
31
```scala
32
import org.apache.spark.sql.avro.AvroOptions
33
import org.apache.hadoop.conf.Configuration
34
35
val hadoopConf = new Configuration()
36
val options = Map(
37
"avroSchema" -> customSchemaJson,
38
"recordName" -> "MyRecord",
39
"compression" -> "snappy"
40
)
41
42
val avroOptions = new AvroOptions(options, hadoopConf)
43
44
// Access configuration values
45
val customSchema = avroOptions.schema
46
val recordName = avroOptions.recordName
47
val compressionCodec = avroOptions.compression
48
```
49
50
### Schema Configuration
51
52
Options for specifying and customizing Avro schemas during read and write operations.
53
54
#### Custom Schema Option
55
56
```scala { .api }
57
val schema: Option[String] // Optional user-provided schema in JSON format
58
```
59
60
**Usage Examples:**
61
62
```scala
63
val customSchema = """{
64
"type": "record",
65
"name": "CustomUser",
66
"namespace": "com.example",
67
"fields": [
68
{"name": "userId", "type": "long"},
69
{"name": "userName", "type": "string"},
70
{"name": "metadata", "type": ["null", "string"], "default": null}
71
]
72
}"""
73
74
// Read with custom schema
75
val df = spark.read
76
.format("avro")
77
.option("avroSchema", customSchema)
78
.load("path/to/data.avro")
79
80
// Write with custom schema
81
df.write
82
.format("avro")
83
.option("avroSchema", customSchema)
84
.save("path/to/output")
85
```
86
87
#### Record Naming Options
88
89
```scala { .api }
90
val recordName: String // Top level record name (default: "topLevelRecord")
91
val recordNamespace: String // Record namespace (default: "")
92
```
93
94
**Usage Examples:**
95
96
```scala
97
// Configure record naming for write operations
98
df.write
99
.format("avro")
100
.option("recordName", "UserProfile")
101
.option("recordNamespace", "com.company.data")
102
.save("path/to/named_output")
103
104
// Example of generated Avro schema with custom naming
105
// Result schema will have:
106
// {
107
// "type": "record",
108
// "name": "UserProfile",
109
// "namespace": "com.company.data",
110
// "fields": [...]
111
// }\n```\n\n### File Extension Handling\n\nControls how Avro files are identified and processed based on file extensions.\n\n#### ignoreExtension Option\n\n```scala { .api }\nval ignoreExtension: Boolean // Control file extension filtering in read operations\n```\n\n**Usage Examples:**\n\n```scala\n// Read all files regardless of extension\nval df1 = spark.read\n .format(\"avro\")\n .option(\"ignoreExtension\", \"true\")\n .load(\"path/to/mixed_files\")\n\n// Only read .avro files (default behavior)\nval df2 = spark.read\n .format(\"avro\")\n .option(\"ignoreExtension\", \"false\")\n .load(\"path/to/mixed_files\")\n\n// Use Hadoop configuration property\n// Set via spark.conf.set() or hadoop configuration\nspark.conf.set(\"avro.mapred.ignore.inputs.without.extension\", \"false\")\n```\n\n### Compression Configuration\n\nOptions for configuring Avro file compression during write operations.\n\n#### Compression Option\n\n```scala { .api }\nval compression: String // Compression codec (default: \"snappy\")\n```\n\n**Supported Compression Codecs:**\n- `uncompressed` - No compression\n- `snappy` - Google Snappy compression (default)\n- `deflate` - DEFLATE compression\n- `bzip2` - Bzip2 compression\n- `xz` - XZ compression\n\n**Usage Examples:**\n\n```scala\n// Write with different compression codecs\nval df = spark.table(\"source_data\")\n\n// Snappy compression (default)\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"snappy\")\n .save(\"path/to/snappy_output\")\n\n// No compression\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"uncompressed\")\n .save(\"path/to/uncompressed_output\")\n\n// DEFLATE compression with custom level\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"deflate\")\n .save(\"path/to/deflate_output\")\n\n// Using Spark configuration\nspark.conf.set(\"spark.sql.avro.compression.codec\", \"bzip2\")\ndf.write.format(\"avro\").save(\"path/to/bzip2_output\")\n```\n\n## Spark Configuration Properties\n\nSpark-level configuration properties that affect Avro operations globally.\n\n### Built-in Configuration Properties\n\n```scala { .api }\n// Spark SQL configuration properties for Avro\n\"spark.sql.avro.compression.codec\" // Default compression codec\n\"spark.sql.avro.deflate.level\" // DEFLATE compression level\n\"spark.sql.legacy.replaceDatabricksSparkAvro.enabled\" // Databricks compatibility\n```\n\n**Configuration Examples:**\n\n```scala\n// Set global Avro compression\nspark.conf.set(\"spark.sql.avro.compression.codec\", \"deflate\")\n\n// Set DEFLATE compression level (1-9, higher = better compression)\nspark.conf.set(\"spark.sql.avro.deflate.level\", \"6\")\n\n// Enable Databricks Spark Avro compatibility\nspark.conf.set(\"spark.sql.legacy.replaceDatabricksSparkAvro.enabled\", \"true\")\n\n// Hadoop-level Avro configuration\nspark.conf.set(\"avro.mapred.ignore.inputs.without.extension\", \"false\")\n```\n\n### Hadoop Configuration Integration\n\nAvroOptions integrates with Hadoop Configuration for system-wide settings.\n\n```scala { .api }\n// Hadoop configuration properties affecting Avro operations\n\"avro.mapred.ignore.inputs.without.extension\" // File extension handling\n```\n\n**Usage Examples:**\n\n```scala\nimport org.apache.hadoop.conf.Configuration\n\n// Access Hadoop configuration\nval hadoopConf = spark.sparkContext.hadoopConfiguration\n\n// Set Hadoop-level Avro properties\nhadoopConf.setBoolean(\"avro.mapred.ignore.inputs.without.extension\", false)\n\n// Create AvroOptions with Hadoop configuration\nval options = new AvroOptions(\n Map(\"compression\" -> \"snappy\", \"recordName\" -> \"MyRecord\"),\n hadoopConf\n)\n```\n\n## Advanced Configuration Patterns\n\n### Dynamic Configuration\n\nConfiguring Avro options dynamically based on data characteristics.\n\n```scala\n// Dynamic compression based on data size\nval dataSize = spark.table(\"source_data\").count()\nval compressionCodec = if (dataSize > 1000000) \"bzip2\" else \"snappy\"\n\ndf.write\n .format(\"avro\")\n .option(\"compression\", compressionCodec)\n .save(\"path/to/output\")\n\n// Dynamic schema based on DataFrame structure\nval schema = df.schema\nval avroSchema = SchemaConverters.toAvroType(\n schema,\n recordName = s\"${tableName}Record\",\n nameSpace = \"com.company.generated\"\n)\n\ndf.write\n .format(\"avro\")\n .option(\"avroSchema\", avroSchema.toString)\n .save(\"path/to/schema_output\")\n```\n\n### Configuration Validation\n\nValidating configuration options before operations.\n\n```scala\n// Validate compression codec\nval supportedCodecs = Set(\"uncompressed\", \"snappy\", \"deflate\", \"bzip2\", \"xz\")\nval requestedCodec = \"gzip\" // Invalid codec\n\nif (!supportedCodecs.contains(requestedCodec)) {\n throw new IllegalArgumentException(s\"Unsupported compression codec: $requestedCodec\")\n}\n\n// Validate schema JSON format\nval schemaJson = \"\"\"invalid json\"\"\"\ntry {\n new Schema.Parser().parse(schemaJson)\n} catch {\n case e: Exception =>\n throw new IllegalArgumentException(s\"Invalid Avro schema: ${e.getMessage}\")\n}\n```\n\n### Performance Tuning Configuration\n\nOptimal configuration settings for different use cases.\n\n```scala\n// High throughput configuration (large files, streaming)\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"snappy\") // Fast compression\n .option(\"recordName\", \"StreamRecord\")\n .mode(\"append\")\n .save(\"path/to/streaming_output\")\n\n// Storage optimization configuration (archival, cold storage)\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"bzip2\") // High compression ratio\n .option(\"recordName\", \"ArchivalRecord\")\n .option(\"recordNamespace\", \"com.company.archive\")\n .mode(\"overwrite\")\n .save(\"path/to/archived_output\")\n\n// Schema evolution configuration\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"deflate\")\n .option(\"avroSchema\", evolutionCompatibleSchema)\n .mode(\"append\")\n .save(\"path/to/versioned_output\")
112
.save("path/to/named_output")
113
114
// Generated Avro schema will have:
115
// "name": "UserProfile"
116
// "namespace": "com.company.data"
117
```
118
119
### File Handling Configuration
120
121
Options for controlling how Avro files are processed and interpreted.
122
123
#### File Extension Handling
124
125
```scala { .api }
126
val ignoreExtension: Boolean // Whether to ignore .avro file extension requirement
127
```
128
129
**Configuration Logic:**
130
131
```scala
132
// Priority order for ignoreExtension:
133
// 1. Explicit option value
134
// 2. Hadoop config: avro.mapred.ignore.inputs.without.extension
135
// 3. Default: false (require .avro extension)
136
```
137
138
**Usage Examples:**
139
140
```scala
141
// Read files without .avro extension
142
val df1 = spark.read
143
.format("avro")
144
.option("ignoreExtension", "true")
145
.load("path/to/files_without_extension")
146
147
// Use Hadoop configuration default
148
val hadoopConf = spark.sparkContext.hadoopConfiguration
149
hadoopConf.setBoolean("avro.mapred.ignore.inputs.without.extension", true)
150
151
val df2 = spark.read
152
.format("avro")
153
.load("path/to/mixed_files")
154
```
155
156
### Compression Configuration
157
158
Comprehensive compression options for write operations with multiple codec support.
159
160
#### Compression Codec Selection
161
162
```scala { .api }
163
val compression: String // Compression codec for write operations
164
```
165
166
**Supported Compression Codecs:**
167
168
```scala
169
"uncompressed" // No compression
170
"snappy" // Default - balanced compression and speed
171
"deflate" // Good compression ratio, configurable level
172
"bzip2" // High compression ratio, slower processing
173
"xz" // High compression ratio, slower processing
174
```
175
176
**Configuration Priority:**
177
178
```scala
179
// Priority order for compression:
180
// 1. Explicit option value
181
// 2. Spark config: spark.sql.avro.compression.codec
182
// 3. Default: "snappy"
183
```
184
185
**Usage Examples:**
186
187
```scala
188
// Set compression via option
189
df.write
190
.format("avro")
191
.option("compression", "deflate")
192
.save("path/to/compressed_output")
193
194
// Set global default compression
195
spark.conf.set("spark.sql.avro.compression.codec", "bzip2")
196
197
// Configure deflate compression level
198
spark.conf.set("spark.sql.avro.deflate.level", "9") // Max compression
199
df.write
200
.format("avro")
201
.option("compression", "deflate")
202
.save("path/to/max_compressed_output")
203
```
204
205
### Advanced Configuration Options
206
207
Additional configuration options for specialized use cases.
208
209
#### Corrupt File Handling
210
211
```scala
212
// Handle corrupt files during read operations
213
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
214
215
val robustDF = spark.read
216
.format("avro")
217
.load("path/to/potentially_corrupt_files")
218
```
219
220
#### File Discovery Options
221
222
```scala
223
// Control file discovery behavior
224
val options = Map(
225
"ignoreExtension" -> "true",
226
"recursiveFileLookup" -> "true" // Spark SQL option
227
)
228
229
val df = spark.read
230
.format("avro")
231
.options(options)
232
.load("path/to/nested_directories")
233
```
234
235
### Configuration Validation
236
237
The AvroOptions class performs validation and provides helpful error messages for invalid configurations.
238
239
**Validation Examples:**
240
241
```scala
242
// Invalid compression codec
243
try {
244
df.write
245
.format("avro")
246
.option("compression", "invalid_codec")
247
.save("output")
248
} catch {
249
case e: IllegalArgumentException =>
250
println(s"Invalid compression codec: ${e.getMessage}")
251
}
252
253
// Schema validation during parse
254
try {
255
val invalidSchema = """{"type": "invalid_type"}"""
256
spark.read
257
.format("avro")
258
.option("avroSchema", invalidSchema)
259
.load("data.avro")
260
} catch {
261
case e: Exception =>
262
println(s"Schema parse error: ${e.getMessage}")
263
}
264
```
265
266
### Integration with Spark Configuration
267
268
AvroOptions integrates with Spark's configuration system for global defaults.
269
270
#### Spark Configuration Properties
271
272
```scala
273
// Global Avro configuration
274
spark.conf.set("spark.sql.avro.compression.codec", "snappy")
275
spark.conf.set("spark.sql.avro.deflate.level", "6")
276
277
// File handling configuration
278
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
279
spark.conf.set("spark.sql.files.ignoreMissingFiles", "true")
280
```
281
282
#### Hadoop Configuration Integration
283
284
```scala
285
// Access Hadoop configuration
286
val hadoopConf = spark.sparkContext.hadoopConfiguration
287
288
// Set Avro-specific Hadoop properties
289
hadoopConf.setBoolean("avro.mapred.ignore.inputs.without.extension", true)
290
291
// Configuration is automatically used by AvroOptions
292
val df = spark.read.format("avro").load("data")
293
```
294
295
### Complete Configuration Example
296
297
Comprehensive example showing all configuration options:
298
299
```scala
300
import org.apache.spark.sql.{SparkSession, SaveMode}
301
302
val spark = SparkSession.builder()
303
.appName("Avro Configuration Example")
304
.config("spark.sql.avro.compression.codec", "snappy")
305
.config("spark.sql.avro.deflate.level", "6")
306
.getOrCreate()
307
308
val customSchema = """{
309
"type": "record",
310
"name": "ProcessedEvent",
311
"namespace": "com.company.events",
312
"fields": [
313
{"name": "eventId", "type": "string"},
314
{"name": "timestamp", "type": "long"},
315
{"name": "payload", "type": ["null", "string"], "default": null}
316
]
317
}"""
318
319
val readOptions = Map(
320
"avroSchema" -> customSchema,
321
"ignoreExtension" -> "true"
322
)
323
324
val writeOptions = Map(
325
"avroSchema" -> customSchema,
326
"recordName" -> "ProcessedEvent",
327
"recordNamespace" -> "com.company.events",
328
"compression" -> "deflate"
329
)
330
331
// Read with configuration
332
val inputDF = spark.read
333
.format("avro")
334
.options(readOptions)
335
.load("path/to/input")
336
337
// Process data
338
val processedDF = inputDF.filter($"timestamp" > 1609459200000L)
339
340
// Write with configuration
341
processedDF.write
342
.format("avro")
343
.options(writeOptions)
344
.mode(SaveMode.Overwrite)
345
.save("path/to/output")
346
```