or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddatasource.mdfunctions.mdindex.mdschema-conversion.md

configuration.mddocs/

0

# Configuration Options

1

2

Apache Spark Avro provides extensive configuration options for controlling Avro processing behavior, including compression settings, schema handling, parsing modes, and performance optimizations.

3

4

## Read Options

5

6

Configuration options for reading Avro files and deserializing Avro data.

7

8

### Error Handling

9

10

```scala { .api }

11

val readOptions = Map(

12

"mode" -> "PERMISSIVE" // PERMISSIVE | DROPMALFORMED | FAILFAST

13

)

14

```

15

16

**Available Modes:**

17

- **PERMISSIVE** (default): Sets corrupt records to null and continues processing

18

- **DROPMALFORMED**: Ignores corrupt records completely

19

- **FAILFAST**: Throws exception on first corrupt record

20

21

**Usage Example:**

22

```scala

23

// Strict parsing - fail on any malformed data

24

val strictDf = spark.read.format("avro")

25

.option("mode", "FAILFAST")

26

.load("sensitive-data.avro")

27

28

// Permissive parsing - continue with nulls for bad records

29

val lenientDf = spark.read.format("avro")

30

.option("mode", "PERMISSIVE")

31

.load("messy-data.avro")

32

```

33

34

### Schema Options

35

36

```scala { .api }

37

val schemaOptions = Map(

38

"avroSchema" -> jsonSchemaString, // Override input schema

39

"ignoreExtension" -> "true", // Read files regardless of extension

40

"recursiveFieldMaxDepth" -> "10" // Maximum recursion depth for nested fields

41

)

42

```

43

44

**Schema Override Example:**

45

```scala

46

val fixedSchema = """{

47

"type": "record",

48

"name": "User",

49

"fields": [

50

{"name": "id", "type": "long"},

51

{"name": "name", "type": "string"}

52

]

53

}"""

54

55

val df = spark.read.format("avro")

56

.option("avroSchema", fixedSchema)

57

.option("ignoreExtension", "true")

58

.load("data-files/*")

59

```

60

61

### Union Type Handling

62

63

```scala { .api }

64

val unionOptions = Map(

65

"enableStableIdentifiersForUnionType" -> "true", // Use stable identifiers for union types

66

"stableIdentifierPrefixForUnionType" -> "union_" // Prefix for stable union identifiers

67

)

68

```

69

70

**Usage Example:**

71

```scala

72

import java.util.{Map => JMap}

73

import scala.jdk.CollectionConverters._

74

75

val options: JMap[String, String] = Map(

76

"enableStableIdentifiersForUnionType" -> "true",

77

"stableIdentifierPrefixForUnionType" -> "variant_"

78

).asJava

79

80

val df = df.select(from_avro(col("union_data"), unionSchema, options).as("parsed"))

81

```

82

83

### DateTime Handling

84

85

```scala { .api }

86

val datetimeOptions = Map(

87

"datetimeRebaseMode" -> "CORRECTED" // EXCEPTION | CORRECTED | LEGACY

88

)

89

```

90

91

**Rebase Modes:**

92

- **EXCEPTION**: Throw exception for dates before 1582-10-15

93

- **CORRECTED**: Apply Gregorian calendar corrections

94

- **LEGACY**: Use legacy Julian calendar handling

95

96

## Write Options

97

98

Configuration options for writing Avro files and serializing data to Avro format.

99

100

### Compression

101

102

```scala { .api }

103

val compressionOptions = Map(

104

"compression" -> "snappy" // uncompressed | deflate | snappy | bzip2 | xz | zstandard

105

)

106

```

107

108

**Available Compression Codecs:**

109

110

| Codec | Performance | Compression Ratio | Use Case |

111

|-------|-------------|-------------------|----------|

112

| uncompressed | Fastest | None | Development, high-speed networks |

113

| snappy | Very Fast | Good | General purpose, balanced performance |

114

| deflate | Fast | Better | Standard compression |

115

| bzip2 | Slow | Best | Archival, storage-optimized |

116

| xz | Slowest | Excellent | Long-term storage |

117

| zstandard | Fast | Excellent | Modern high-performance compression |

118

119

#### AvroCompressionCodec Java Enum

120

121

The compression codecs are defined as a Java enum with additional utility methods:

122

123

```java { .api }

124

public enum AvroCompressionCodec {

125

UNCOMPRESSED, DEFLATE, SNAPPY, BZIP2, XZ, ZSTANDARD;

126

127

public String getCodecName();

128

public boolean getSupportCompressionLevel();

129

public String lowerCaseName();

130

public static AvroCompressionCodec fromString(String s);

131

}

132

```

133

134

**Enum Methods:**

135

- `getCodecName()`: Returns the Avro codec name constant

136

- `getSupportCompressionLevel()`: Returns true if codec supports compression levels

137

- `lowerCaseName()`: Returns lowercase name for the codec

138

- `fromString(String)`: Parses codec from string (case-insensitive)

139

140

**Usage Example:**

141

```scala

142

// High-performance compression

143

df.write.format("avro")

144

.option("compression", "snappy")

145

.save("fast-access-data")

146

147

// Maximum compression for archival

148

df.write.format("avro")

149

.option("compression", "zstandard")

150

.save("archived-data")

151

```

152

153

### Schema Generation

154

155

```scala { .api }

156

val schemaGenOptions = Map(

157

"recordName" -> "MyRecord", // Name for generated record types

158

"recordNamespace" -> "com.example", // Namespace for generated schemas

159

"avroSchema" -> customSchemaJson // Use custom output schema

160

)

161

```

162

163

**Custom Schema Example:**

164

```scala

165

val outputSchema = """{

166

"type": "record",

167

"name": "ExportRecord",

168

"namespace": "com.company.exports",

169

"fields": [

170

{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-micros"}},

171

{"name": "user_id", "type": "string"},

172

{"name": "metrics", "type": {"type": "map", "values": "double"}}

173

]

174

}"""

175

176

df.write.format("avro")

177

.option("avroSchema", outputSchema)

178

.option("compression", "snappy")

179

.save("structured-export")

180

```

181

182

## Function Options

183

184

Configuration options for `from_avro`, `to_avro`, and `schema_of_avro` functions.

185

186

### Parsing Options

187

188

```scala { .api }

189

val functionOptions: java.util.Map[String, String] = Map(

190

"mode" -> "PERMISSIVE",

191

"datetimeRebaseMode" -> "CORRECTED",

192

"enableStableIdentifiersForUnionType" -> "false",

193

"stableIdentifierPrefixForUnionType" -> "",

194

"recursiveFieldMaxDepth" -> "5"

195

).asJava

196

```

197

198

**Complete Function Usage:**

199

```scala

200

import java.util.{Map => JMap}

201

import scala.jdk.CollectionConverters._

202

import org.apache.spark.sql.avro.functions._

203

204

val parseOptions: JMap[String, String] = Map(

205

"mode" -> "FAILFAST",

206

"datetimeRebaseMode" -> "CORRECTED",

207

"enableStableIdentifiersForUnionType" -> "true",

208

"recursiveFieldMaxDepth" -> "10"

209

).asJava

210

211

val decodedDf = df.select(

212

from_avro(col("avro_binary"), avroSchema, parseOptions).as("decoded_data")

213

)

214

215

val schemaOptions: JMap[String, String] = Map(

216

"enableStableIdentifiersForUnionType" -> "true",

217

"recursiveFieldMaxDepth" -> "8"

218

).asJava

219

220

val schemaDf = spark.sql(

221

"SELECT schema_of_avro('"+complexSchema+"', map('enableStableIdentifiersForUnionType', 'true')) AS spark_schema"

222

)

223

```

224

225

## Performance Tuning Options

226

227

### File Size Optimization

228

229

```scala { .api }

230

// Control output file size through partitioning

231

df.coalesce(numPartitions)

232

.write.format("avro")

233

.option("compression", "snappy")

234

.save(path)

235

236

// Optimal partition size: 128MB - 1GB per file

237

val optimalPartitions = (df.count() * avgRowSizeBytes / targetPartitionSizeBytes).toInt

238

```

239

240

### Schema Inference Control

241

242

```scala { .api }

243

// Disable schema inference by providing explicit schema

244

val df = spark.read.format("avro")

245

.option("avroSchema", knownSchema) // Avoids scanning all files

246

.load("large-dataset/*.avro")

247

```

248

249

### Memory Management

250

251

```scala { .api }

252

val memoryOptions = Map(

253

"recursiveFieldMaxDepth" -> "3", // Limit nesting to control memory usage

254

"mode" -> "DROPMALFORMED" // Skip problematic records to maintain memory

255

)

256

```

257

258

## Option Constants

259

260

For type-safe configuration, use the predefined constants:

261

262

```scala { .api }

263

// Common option keys (from AvroOptions)

264

object AvroOptionKeys {

265

val IGNORE_EXTENSION = "ignoreExtension"

266

val MODE = "mode"

267

val RECORD_NAME = "recordName"

268

val COMPRESSION = "compression"

269

val AVRO_SCHEMA = "avroSchema"

270

val AVRO_SCHEMA_URL = "avroSchemaUrl"

271

val RECORD_NAMESPACE = "recordNamespace"

272

val POSITIONAL_FIELD_MATCHING = "positionalFieldMatching"

273

val DATETIME_REBASE_MODE = "datetimeRebaseMode"

274

val STABLE_ID_FOR_UNION_TYPE = "enableStableIdentifiersForUnionType"

275

val STABLE_ID_PREFIX_FOR_UNION_TYPE = "stableIdentifierPrefixForUnionType"

276

val RECURSIVE_FIELD_MAX_DEPTH = "recursiveFieldMaxDepth"

277

}

278

```

279

280

## Configuration Examples

281

282

### Production Reading Configuration

283

284

```scala

285

val productionReadConfig = Map(

286

"mode" -> "FAILFAST", // Strict error handling

287

"ignoreExtension" -> "false", // Validate file extensions

288

"recursiveFieldMaxDepth" -> "5", // Reasonable nesting limit

289

"datetimeRebaseMode" -> "CORRECTED" // Handle historical dates correctly

290

)

291

292

val df = spark.read.format("avro")

293

.options(productionReadConfig)

294

.load("production-data/*.avro")

295

```

296

297

### High-Performance Writing Configuration

298

299

```scala

300

val highPerfWriteConfig = Map(

301

"compression" -> "snappy", // Fast compression

302

"recordName" -> "OptimizedRecord", // Descriptive record name

303

"recordNamespace" -> "com.company.data" // Proper namespace

304

)

305

306

df.coalesce(200) // Optimize partition count

307

.write.format("avro")

308

.options(highPerfWriteConfig)

309

.save("optimized-output")

310

```

311

312

### Development/Testing Configuration

313

314

```scala

315

val devConfig = Map(

316

"mode" -> "PERMISSIVE", // Lenient error handling

317

"ignoreExtension" -> "true", // Flexible file reading

318

"compression" -> "uncompressed" // Fast writes for testing

319

)

320

321

val testDf = spark.read.format("avro")

322

.options(devConfig)

323

.load("test-data/*")

324

```