or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

column-functions.mdconfiguration.mddata-source.mdindex.mdschema-conversion.md

configuration.mddocs/

0

# Configuration Options

1

2

Comprehensive configuration system for customizing Avro read and write operations. The AvroOptions class provides case-insensitive parameter handling with intelligent defaults and integration with Hadoop configuration.

3

4

## Capabilities

5

6

### AvroOptions Class

7

8

Central configuration class for Avro read and write operations with case-insensitive parameter handling.

9

10

```scala { .api }

11

class AvroOptions(

12

@transient val parameters: CaseInsensitiveMap[String],

13

@transient val conf: Configuration

14

) extends Logging with Serializable {

15

16

def this(parameters: Map[String, String], conf: Configuration) = {

17

this(CaseInsensitiveMap(parameters), conf)

18

}

19

20

// Configuration properties

21

val schema: Option[String] // Optional user-provided schema in JSON format

22

val recordName: String // Top level record name (default: "topLevelRecord")

23

val recordNamespace: String // Record namespace (default: "")

24

val ignoreExtension: Boolean // Ignore file extensions in read operations

25

val compression: String // Compression codec for write operations

26

}

27

```

28

29

**Usage Examples:**

30

31

```scala

32

import org.apache.spark.sql.avro.AvroOptions

33

import org.apache.hadoop.conf.Configuration

34

35

val hadoopConf = new Configuration()

36

val options = Map(

37

"avroSchema" -> customSchemaJson,

38

"recordName" -> "MyRecord",

39

"compression" -> "snappy"

40

)

41

42

val avroOptions = new AvroOptions(options, hadoopConf)

43

44

// Access configuration values

45

val customSchema = avroOptions.schema

46

val recordName = avroOptions.recordName

47

val compressionCodec = avroOptions.compression

48

```

49

50

### Schema Configuration

51

52

Options for specifying and customizing Avro schemas during read and write operations.

53

54

#### Custom Schema Option

55

56

```scala { .api }

57

val schema: Option[String] // Optional user-provided schema in JSON format

58

```

59

60

**Usage Examples:**

61

62

```scala

63

val customSchema = """{

64

"type": "record",

65

"name": "CustomUser",

66

"namespace": "com.example",

67

"fields": [

68

{"name": "userId", "type": "long"},

69

{"name": "userName", "type": "string"},

70

{"name": "metadata", "type": ["null", "string"], "default": null}

71

]

72

}"""

73

74

// Read with custom schema

75

val df = spark.read

76

.format("avro")

77

.option("avroSchema", customSchema)

78

.load("path/to/data.avro")

79

80

// Write with custom schema

81

df.write

82

.format("avro")

83

.option("avroSchema", customSchema)

84

.save("path/to/output")

85

```

86

87

#### Record Naming Options

88

89

```scala { .api }

90

val recordName: String // Top level record name (default: "topLevelRecord")

91

val recordNamespace: String // Record namespace (default: "")

92

```

93

94

**Usage Examples:**

95

96

```scala

97

// Configure record naming for write operations

98

df.write

99

.format("avro")

100

.option("recordName", "UserProfile")

101

.option("recordNamespace", "com.company.data")

102

.save("path/to/named_output")

103

104

// Example of generated Avro schema with custom naming

105

// Result schema will have:

106

// {

107

// "type": "record",

108

// "name": "UserProfile",

109

// "namespace": "com.company.data",

110

// "fields": [...]

111

// }\n```\n\n### File Extension Handling\n\nControls how Avro files are identified and processed based on file extensions.\n\n#### ignoreExtension Option\n\n```scala { .api }\nval ignoreExtension: Boolean // Control file extension filtering in read operations\n```\n\n**Usage Examples:**\n\n```scala\n// Read all files regardless of extension\nval df1 = spark.read\n .format(\"avro\")\n .option(\"ignoreExtension\", \"true\")\n .load(\"path/to/mixed_files\")\n\n// Only read .avro files (default behavior)\nval df2 = spark.read\n .format(\"avro\")\n .option(\"ignoreExtension\", \"false\")\n .load(\"path/to/mixed_files\")\n\n// Use Hadoop configuration property\n// Set via spark.conf.set() or hadoop configuration\nspark.conf.set(\"avro.mapred.ignore.inputs.without.extension\", \"false\")\n```\n\n### Compression Configuration\n\nOptions for configuring Avro file compression during write operations.\n\n#### Compression Option\n\n```scala { .api }\nval compression: String // Compression codec (default: \"snappy\")\n```\n\n**Supported Compression Codecs:**\n- `uncompressed` - No compression\n- `snappy` - Google Snappy compression (default)\n- `deflate` - DEFLATE compression\n- `bzip2` - Bzip2 compression\n- `xz` - XZ compression\n\n**Usage Examples:**\n\n```scala\n// Write with different compression codecs\nval df = spark.table(\"source_data\")\n\n// Snappy compression (default)\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"snappy\")\n .save(\"path/to/snappy_output\")\n\n// No compression\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"uncompressed\")\n .save(\"path/to/uncompressed_output\")\n\n// DEFLATE compression with custom level\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"deflate\")\n .save(\"path/to/deflate_output\")\n\n// Using Spark configuration\nspark.conf.set(\"spark.sql.avro.compression.codec\", \"bzip2\")\ndf.write.format(\"avro\").save(\"path/to/bzip2_output\")\n```\n\n## Spark Configuration Properties\n\nSpark-level configuration properties that affect Avro operations globally.\n\n### Built-in Configuration Properties\n\n```scala { .api }\n// Spark SQL configuration properties for Avro\n\"spark.sql.avro.compression.codec\" // Default compression codec\n\"spark.sql.avro.deflate.level\" // DEFLATE compression level\n\"spark.sql.legacy.replaceDatabricksSparkAvro.enabled\" // Databricks compatibility\n```\n\n**Configuration Examples:**\n\n```scala\n// Set global Avro compression\nspark.conf.set(\"spark.sql.avro.compression.codec\", \"deflate\")\n\n// Set DEFLATE compression level (1-9, higher = better compression)\nspark.conf.set(\"spark.sql.avro.deflate.level\", \"6\")\n\n// Enable Databricks Spark Avro compatibility\nspark.conf.set(\"spark.sql.legacy.replaceDatabricksSparkAvro.enabled\", \"true\")\n\n// Hadoop-level Avro configuration\nspark.conf.set(\"avro.mapred.ignore.inputs.without.extension\", \"false\")\n```\n\n### Hadoop Configuration Integration\n\nAvroOptions integrates with Hadoop Configuration for system-wide settings.\n\n```scala { .api }\n// Hadoop configuration properties affecting Avro operations\n\"avro.mapred.ignore.inputs.without.extension\" // File extension handling\n```\n\n**Usage Examples:**\n\n```scala\nimport org.apache.hadoop.conf.Configuration\n\n// Access Hadoop configuration\nval hadoopConf = spark.sparkContext.hadoopConfiguration\n\n// Set Hadoop-level Avro properties\nhadoopConf.setBoolean(\"avro.mapred.ignore.inputs.without.extension\", false)\n\n// Create AvroOptions with Hadoop configuration\nval options = new AvroOptions(\n Map(\"compression\" -> \"snappy\", \"recordName\" -> \"MyRecord\"),\n hadoopConf\n)\n```\n\n## Advanced Configuration Patterns\n\n### Dynamic Configuration\n\nConfiguring Avro options dynamically based on data characteristics.\n\n```scala\n// Dynamic compression based on data size\nval dataSize = spark.table(\"source_data\").count()\nval compressionCodec = if (dataSize > 1000000) \"bzip2\" else \"snappy\"\n\ndf.write\n .format(\"avro\")\n .option(\"compression\", compressionCodec)\n .save(\"path/to/output\")\n\n// Dynamic schema based on DataFrame structure\nval schema = df.schema\nval avroSchema = SchemaConverters.toAvroType(\n schema,\n recordName = s\"${tableName}Record\",\n nameSpace = \"com.company.generated\"\n)\n\ndf.write\n .format(\"avro\")\n .option(\"avroSchema\", avroSchema.toString)\n .save(\"path/to/schema_output\")\n```\n\n### Configuration Validation\n\nValidating configuration options before operations.\n\n```scala\n// Validate compression codec\nval supportedCodecs = Set(\"uncompressed\", \"snappy\", \"deflate\", \"bzip2\", \"xz\")\nval requestedCodec = \"gzip\" // Invalid codec\n\nif (!supportedCodecs.contains(requestedCodec)) {\n throw new IllegalArgumentException(s\"Unsupported compression codec: $requestedCodec\")\n}\n\n// Validate schema JSON format\nval schemaJson = \"\"\"invalid json\"\"\"\ntry {\n new Schema.Parser().parse(schemaJson)\n} catch {\n case e: Exception =>\n throw new IllegalArgumentException(s\"Invalid Avro schema: ${e.getMessage}\")\n}\n```\n\n### Performance Tuning Configuration\n\nOptimal configuration settings for different use cases.\n\n```scala\n// High throughput configuration (large files, streaming)\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"snappy\") // Fast compression\n .option(\"recordName\", \"StreamRecord\")\n .mode(\"append\")\n .save(\"path/to/streaming_output\")\n\n// Storage optimization configuration (archival, cold storage)\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"bzip2\") // High compression ratio\n .option(\"recordName\", \"ArchivalRecord\")\n .option(\"recordNamespace\", \"com.company.archive\")\n .mode(\"overwrite\")\n .save(\"path/to/archived_output\")\n\n// Schema evolution configuration\ndf.write\n .format(\"avro\")\n .option(\"compression\", \"deflate\")\n .option(\"avroSchema\", evolutionCompatibleSchema)\n .mode(\"append\")\n .save(\"path/to/versioned_output\")

112

.save("path/to/named_output")

113

114

// Generated Avro schema will have:

115

// "name": "UserProfile"

116

// "namespace": "com.company.data"

117

```

118

119

### File Handling Configuration

120

121

Options for controlling how Avro files are processed and interpreted.

122

123

#### File Extension Handling

124

125

```scala { .api }

126

val ignoreExtension: Boolean // Whether to ignore .avro file extension requirement

127

```

128

129

**Configuration Logic:**

130

131

```scala

132

// Priority order for ignoreExtension:

133

// 1. Explicit option value

134

// 2. Hadoop config: avro.mapred.ignore.inputs.without.extension

135

// 3. Default: false (require .avro extension)

136

```

137

138

**Usage Examples:**

139

140

```scala

141

// Read files without .avro extension

142

val df1 = spark.read

143

.format("avro")

144

.option("ignoreExtension", "true")

145

.load("path/to/files_without_extension")

146

147

// Use Hadoop configuration default

148

val hadoopConf = spark.sparkContext.hadoopConfiguration

149

hadoopConf.setBoolean("avro.mapred.ignore.inputs.without.extension", true)

150

151

val df2 = spark.read

152

.format("avro")

153

.load("path/to/mixed_files")

154

```

155

156

### Compression Configuration

157

158

Comprehensive compression options for write operations with multiple codec support.

159

160

#### Compression Codec Selection

161

162

```scala { .api }

163

val compression: String // Compression codec for write operations

164

```

165

166

**Supported Compression Codecs:**

167

168

```scala

169

"uncompressed" // No compression

170

"snappy" // Default - balanced compression and speed

171

"deflate" // Good compression ratio, configurable level

172

"bzip2" // High compression ratio, slower processing

173

"xz" // High compression ratio, slower processing

174

```

175

176

**Configuration Priority:**

177

178

```scala

179

// Priority order for compression:

180

// 1. Explicit option value

181

// 2. Spark config: spark.sql.avro.compression.codec

182

// 3. Default: "snappy"

183

```

184

185

**Usage Examples:**

186

187

```scala

188

// Set compression via option

189

df.write

190

.format("avro")

191

.option("compression", "deflate")

192

.save("path/to/compressed_output")

193

194

// Set global default compression

195

spark.conf.set("spark.sql.avro.compression.codec", "bzip2")

196

197

// Configure deflate compression level

198

spark.conf.set("spark.sql.avro.deflate.level", "9") // Max compression

199

df.write

200

.format("avro")

201

.option("compression", "deflate")

202

.save("path/to/max_compressed_output")

203

```

204

205

### Advanced Configuration Options

206

207

Additional configuration options for specialized use cases.

208

209

#### Corrupt File Handling

210

211

```scala

212

// Handle corrupt files during read operations

213

spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")

214

215

val robustDF = spark.read

216

.format("avro")

217

.load("path/to/potentially_corrupt_files")

218

```

219

220

#### File Discovery Options

221

222

```scala

223

// Control file discovery behavior

224

val options = Map(

225

"ignoreExtension" -> "true",

226

"recursiveFileLookup" -> "true" // Spark SQL option

227

)

228

229

val df = spark.read

230

.format("avro")

231

.options(options)

232

.load("path/to/nested_directories")

233

```

234

235

### Configuration Validation

236

237

The AvroOptions class performs validation and provides helpful error messages for invalid configurations.

238

239

**Validation Examples:**

240

241

```scala

242

// Invalid compression codec

243

try {

244

df.write

245

.format("avro")

246

.option("compression", "invalid_codec")

247

.save("output")

248

} catch {

249

case e: IllegalArgumentException =>

250

println(s"Invalid compression codec: ${e.getMessage}")

251

}

252

253

// Schema validation during parse

254

try {

255

val invalidSchema = """{"type": "invalid_type"}"""

256

spark.read

257

.format("avro")

258

.option("avroSchema", invalidSchema)

259

.load("data.avro")

260

} catch {

261

case e: Exception =>

262

println(s"Schema parse error: ${e.getMessage}")

263

}

264

```

265

266

### Integration with Spark Configuration

267

268

AvroOptions integrates with Spark's configuration system for global defaults.

269

270

#### Spark Configuration Properties

271

272

```scala

273

// Global Avro configuration

274

spark.conf.set("spark.sql.avro.compression.codec", "snappy")

275

spark.conf.set("spark.sql.avro.deflate.level", "6")

276

277

// File handling configuration

278

spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")

279

spark.conf.set("spark.sql.files.ignoreMissingFiles", "true")

280

```

281

282

#### Hadoop Configuration Integration

283

284

```scala

285

// Access Hadoop configuration

286

val hadoopConf = spark.sparkContext.hadoopConfiguration

287

288

// Set Avro-specific Hadoop properties

289

hadoopConf.setBoolean("avro.mapred.ignore.inputs.without.extension", true)

290

291

// Configuration is automatically used by AvroOptions

292

val df = spark.read.format("avro").load("data")

293

```

294

295

### Complete Configuration Example

296

297

Comprehensive example showing all configuration options:

298

299

```scala

300

import org.apache.spark.sql.{SparkSession, SaveMode}

301

302

val spark = SparkSession.builder()

303

.appName("Avro Configuration Example")

304

.config("spark.sql.avro.compression.codec", "snappy")

305

.config("spark.sql.avro.deflate.level", "6")

306

.getOrCreate()

307

308

val customSchema = """{

309

"type": "record",

310

"name": "ProcessedEvent",

311

"namespace": "com.company.events",

312

"fields": [

313

{"name": "eventId", "type": "string"},

314

{"name": "timestamp", "type": "long"},

315

{"name": "payload", "type": ["null", "string"], "default": null}

316

]

317

}"""

318

319

val readOptions = Map(

320

"avroSchema" -> customSchema,

321

"ignoreExtension" -> "true"

322

)

323

324

val writeOptions = Map(

325

"avroSchema" -> customSchema,

326

"recordName" -> "ProcessedEvent",

327

"recordNamespace" -> "com.company.events",

328

"compression" -> "deflate"

329

)

330

331

// Read with configuration

332

val inputDF = spark.read

333

.format("avro")

334

.options(readOptions)

335

.load("path/to/input")

336

337

// Process data

338

val processedDF = inputDF.filter($"timestamp" > 1609459200000L)

339

340

// Write with configuration

341

processedDF.write

342

.format("avro")

343

.options(writeOptions)

344

.mode(SaveMode.Overwrite)

345

.save("path/to/output")

346

```