or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-functions.mdconfiguration.mdfile-operations.mdindex.mdschema-conversion.md

configuration.mddocs/

0

# Configuration Options

1

2

The Spark Avro connector provides comprehensive configuration options through the `AvroOptions` class and related constants. These options control various aspects of Avro processing including compression, schema handling, field matching, and error handling.

3

4

## AvroOptions Class

5

6

The main configuration class for Avro operations.

7

8

```scala { .api }

9

class AvroOptions(

10

parameters: CaseInsensitiveMap[String],

11

conf: Configuration

12

) extends FileSourceOptions(parameters) {

13

14

val schema: Option[Schema]

15

val positionalFieldMatching: Boolean

16

val recordName: String

17

val recordNamespace: String

18

val compression: String

19

val parseMode: ParseMode

20

val datetimeRebaseModeInRead: String

21

val useStableIdForUnionType: Boolean

22

}

23

```

24

25

### Factory Method

26

27

```scala { .api }

28

object AvroOptions {

29

def apply(parameters: Map[String, String]): AvroOptions

30

}

31

```

32

33

**Usage Example:**

34

35

```scala

36

import org.apache.spark.sql.avro.AvroOptions

37

38

val options = AvroOptions(Map(

39

"compression" -> "snappy",

40

"recordName" -> "MyRecord",

41

"recordNamespace" -> "com.example.avro"

42

))

43

44

println(s"Compression: ${options.compression}")

45

println(s"Record Name: ${options.recordName}")

46

```

47

48

## Configuration Constants

49

50

All configuration option keys are defined as constants in the `AvroOptions` companion object.

51

52

```scala { .api }

53

object AvroOptions {

54

val COMPRESSION: String

55

val RECORD_NAME: String

56

val RECORD_NAMESPACE: String

57

val AVRO_SCHEMA: String

58

val AVRO_SCHEMA_URL: String

59

val POSITIONAL_FIELD_MATCHING: String

60

val DATETIME_REBASE_MODE: String

61

val MODE: String

62

val IGNORE_EXTENSION: String // Deprecated

63

val STABLE_ID_FOR_UNION_TYPE: String

64

}

65

```

66

67

## Schema Configuration

68

69

### avroSchema

70

71

Specify a custom Avro schema for reading or writing.

72

73

**Option Key:** `AvroOptions.AVRO_SCHEMA` (`"avroSchema"`)

74

75

**Usage in Reading:**

76

```scala

77

val customSchema = """

78

{

79

"type": "record",

80

"name": "User",

81

"fields": [

82

{"name": "id", "type": "long"},

83

{"name": "name", "type": "string"},

84

{"name": "email", "type": ["null", "string"], "default": null}

85

]

86

}

87

"""

88

89

val df = spark.read

90

.format("avro")

91

.option(AvroOptions.AVRO_SCHEMA, customSchema)

92

.load("path/to/avro/files")

93

```

94

95

**Usage in Writing:**

96

```scala

97

df.write

98

.format("avro")

99

.option(AvroOptions.AVRO_SCHEMA, customSchema)

100

.save("path/to/output")

101

```

102

103

### avroSchemaUrl

104

105

Load Avro schema from a URL or file path.

106

107

**Option Key:** `AvroOptions.AVRO_SCHEMA_URL` (`"avroSchemaUrl"`)

108

109

```scala

110

val df = spark.read

111

.format("avro")

112

.option(AvroOptions.AVRO_SCHEMA_URL, "hdfs://cluster/schemas/user.avsc")

113

.load("path/to/avro/files")

114

115

// Also works with local files

116

val df2 = spark.read

117

.format("avro")

118

.option(AvroOptions.AVRO_SCHEMA_URL, "file:///local/path/schema.avsc")

119

.load("path/to/avro/files")

120

```

121

122

## Record Configuration

123

124

### recordName

125

126

Set the top-level record name for writing Avro files.

127

128

**Option Key:** `AvroOptions.RECORD_NAME` (`"recordName"`)

129

**Default:** `"topLevelRecord"`

130

131

```scala

132

df.write

133

.format("avro")

134

.option(AvroOptions.RECORD_NAME, "UserRecord")

135

.save("path/to/output")

136

```

137

138

### recordNamespace

139

140

Set the namespace for the top-level record.

141

142

**Option Key:** `AvroOptions.RECORD_NAMESPACE` (`"recordNamespace"`)

143

**Default:** `""` (empty string)

144

145

```scala

146

df.write

147

.format("avro")

148

.option(AvroOptions.RECORD_NAME, "User")

149

.option(AvroOptions.RECORD_NAMESPACE, "com.example.avro")

150

.save("path/to/output")

151

```

152

153

## Compression Configuration

154

155

### compression

156

157

Set the compression codec for writing Avro files.

158

159

**Option Key:** `AvroOptions.COMPRESSION` (`"compression"`)

160

**Default:** Value from `spark.sql.avro.compression.codec` or `"snappy"`

161

162

**Supported Codecs:**

163

- `uncompressed`: No compression

164

- `snappy`: Snappy compression (default)

165

- `deflate`: Deflate compression

166

- `bzip2`: Bzip2 compression

167

- `xz`: XZ compression

168

- `zstandard`: Zstandard compression

169

170

```scala

171

// Different compression options

172

df.write.format("avro").option(AvroOptions.COMPRESSION, "uncompressed").save("uncompressed")

173

df.write.format("avro").option(AvroOptions.COMPRESSION, "snappy").save("snappy")

174

df.write.format("avro").option(AvroOptions.COMPRESSION, "deflate").save("deflate")

175

df.write.format("avro").option(AvroOptions.COMPRESSION, "bzip2").save("bzip2")

176

df.write.format("avro").option(AvroOptions.COMPRESSION, "xz").save("xz")

177

df.write.format("avro").option(AvroOptions.COMPRESSION, "zstandard").save("zstandard")

178

```

179

180

## Field Matching Configuration

181

182

### positionalFieldMatching

183

184

Control how fields are matched between Spark and Avro schemas.

185

186

**Option Key:** `AvroOptions.POSITIONAL_FIELD_MATCHING` (`"positionalFieldMatching"`)

187

**Default:** `false`

188

189

**Values:**

190

- `false`: Match fields by name (default)

191

- `true`: Match fields by position

192

193

```scala

194

// Match by position instead of name

195

val df = spark.read

196

.format("avro")

197

.option(AvroOptions.POSITIONAL_FIELD_MATCHING, "true")

198

.load("path/to/avro/files")

199

200

// Useful when field names don't match but structure is identical

201

val sparkSchema = StructType(Seq(

202

StructField("user_id", LongType), // Position 0

203

StructField("full_name", StringType), // Position 1

204

StructField("email_addr", StringType) // Position 2

205

))

206

207

// Avro schema has different field names but same positions:

208

// {"name": "id", "type": "long"} // Position 0

209

// {"name": "name", "type": "string"} // Position 1

210

// {"name": "email", "type": "string"} // Position 2

211

```

212

213

## Error Handling Configuration

214

215

### mode

216

217

Control how parsing errors are handled.

218

219

**Option Key:** `AvroOptions.MODE` (`"mode"`)

220

**Default:** `FAILFAST`

221

222

**Values:**

223

- `FAILFAST`: Throw exception on first error (default)

224

- `PERMISSIVE`: Set corrupt records to null, continue processing

225

- `DROPMALFORMED`: Drop corrupt records, continue processing

226

227

```scala

228

// Drop malformed records

229

val df = spark.read

230

.format("avro")

231

.option(AvroOptions.MODE, "DROPMALFORMED")

232

.load("path/to/avro/files")

233

234

// Keep corrupt records as null

235

val df2 = spark.read

236

.format("avro")

237

.option(AvroOptions.MODE, "PERMISSIVE")

238

.load("path/to/avro/files")

239

```

240

241

## Date/Time Configuration

242

243

### datetimeRebaseMode

244

245

Control rebasing of DATE and TIMESTAMP values between Julian and Proleptic Gregorian calendars.

246

247

**Option Key:** `AvroOptions.DATETIME_REBASE_MODE` (`"datetimeRebaseMode"`)

248

**Default:** Value from `spark.sql.avro.datetimeRebaseModeInRead`

249

250

**Values:**

251

- `EXCEPTION`: Throw exception for dates requiring rebasing

252

- `LEGACY`: Use legacy Julian calendar behavior

253

- `CORRECTED`: Apply Proleptic Gregorian calendar correction

254

255

```scala

256

val df = spark.read

257

.format("avro")

258

.option(AvroOptions.DATETIME_REBASE_MODE, "CORRECTED")

259

.load("path/to/historical/avro/files")

260

```

261

262

## Union Type Configuration

263

264

### enableStableIdentifiersForUnionType

265

266

Control field naming for Avro union types when converting to Spark SQL.

267

268

**Option Key:** `AvroOptions.STABLE_ID_FOR_UNION_TYPE` (`"enableStableIdentifiersForUnionType"`)

269

**Default:** `false`

270

271

**Values:**

272

- `false`: Use dynamic field names (default)

273

- `true`: Use stable, consistent field names based on type

274

275

```scala

276

val unionSchema = """

277

{

278

"type": "record",

279

"name": "Event",

280

"fields": [

281

{"name": "data", "type": [

282

{"type": "record", "name": "UserEvent", "fields": [{"name": "userId", "type": "long"}]},

283

{"type": "record", "name": "SystemEvent", "fields": [{"name": "systemId", "type": "string"}]}

284

]}

285

]

286

}

287

"""

288

289

val df = spark.read

290

.format("avro")

291

.option(AvroOptions.AVRO_SCHEMA, unionSchema)

292

.option(AvroOptions.STABLE_ID_FOR_UNION_TYPE, "true")

293

.load("path/to/union/data")

294

295

// Results in stable field names like "data.member_userevent" and "data.member_systemevent"

296

```

297

298

## Deprecated Options

299

300

### ignoreExtension (Deprecated)

301

302

**Option Key:** `AvroOptions.IGNORE_EXTENSION` (`"ignoreExtension"`)

303

**Status:** Deprecated in Spark 3.0, use `pathGlobFilter` instead

304

305

```scala

306

// Deprecated way

307

val df = spark.read

308

.format("avro")

309

.option(AvroOptions.IGNORE_EXTENSION, "true")

310

.load("path/to/files")

311

312

// Modern way

313

val df2 = spark.read

314

.format("avro")

315

.option("pathGlobFilter", "*.data") // Or other pattern

316

.load("path/to/files")

317

```

318

319

## Global Configuration

320

321

Some Avro behavior can be controlled through Spark SQL configuration:

322

323

### spark.sql.avro.compression.codec

324

325

Default compression codec for Avro files.

326

327

```scala

328

spark.conf.set("spark.sql.avro.compression.codec", "zstandard")

329

330

// All subsequent Avro writes will use zstandard compression by default

331

df.write.format("avro").save("path/to/output")

332

```

333

334

### spark.sql.avro.deflate.level

335

336

Compression level for deflate codec (1-9).

337

338

```scala

339

spark.conf.set("spark.sql.avro.deflate.level", "6")

340

341

df.write

342

.format("avro")

343

.option(AvroOptions.COMPRESSION, "deflate")

344

.save("path/to/output")

345

```

346

347

### spark.sql.avro.datetimeRebaseModeInRead

348

349

Default datetime rebase mode for reading.

350

351

```scala

352

spark.conf.set("spark.sql.avro.datetimeRebaseModeInRead", "CORRECTED")

353

```

354

355

## Option Combinations

356

357

### Complete Example

358

359

Combining multiple options for complex scenarios:

360

361

```scala

362

val complexDF = spark.read

363

.format("avro")

364

.option(AvroOptions.AVRO_SCHEMA_URL, "hdfs://cluster/schemas/evolved-schema.avsc")

365

.option(AvroOptions.MODE, "PERMISSIVE")

366

.option(AvroOptions.POSITIONAL_FIELD_MATCHING, "false")

367

.option(AvroOptions.DATETIME_REBASE_MODE, "CORRECTED")

368

.option(AvroOptions.STABLE_ID_FOR_UNION_TYPE, "true")

369

.option("pathGlobFilter", "*.avro")

370

.load("path/to/avro/files")

371

372

complexDF.write

373

.format("avro")

374

.option(AvroOptions.COMPRESSION, "zstandard")

375

.option(AvroOptions.RECORD_NAME, "ProcessedEvent")

376

.option(AvroOptions.RECORD_NAMESPACE, "com.company.events")

377

.mode("overwrite")

378

.save("path/to/processed/output")

379

```

380

381

### Schema Evolution Scenario

382

383

Configuration for reading old data with new schema:

384

385

```scala

386

val evolvedSchema = """

387

{

388

"type": "record",

389

"name": "UserV2",

390

"fields": [

391

{"name": "id", "type": "long"},

392

{"name": "name", "type": "string"},

393

{"name": "email", "type": "string"},

394

{"name": "phone", "type": ["null", "string"], "default": null},

395

{"name": "created_at", "type": ["null", "long"], "default": null}

396

]

397

}

398

"""

399

400

val migratedDF = spark.read

401

.format("avro")

402

.option(AvroOptions.AVRO_SCHEMA, evolvedSchema)

403

.option(AvroOptions.MODE, "PERMISSIVE") // Handle missing fields gracefully

404

.option(AvroOptions.DATETIME_REBASE_MODE, "CORRECTED")

405

.load("path/to/legacy/user/data")

406

```

407

408

## Performance Considerations

409

410

### Compression Trade-offs

411

412

```scala

413

// Fastest compression (good for temporary data)

414

df.write.format("avro").option(AvroOptions.COMPRESSION, "snappy").save("temp")

415

416

// Best compression ratio (good for archival)

417

df.write.format("avro").option(AvroOptions.COMPRESSION, "zstandard").save("archive")

418

419

// No compression (fastest write, largest files)

420

df.write.format("avro").option(AvroOptions.COMPRESSION, "uncompressed").save("staging")

421

```

422

423

### Schema Caching

424

425

When repeatedly using the same schema:

426

427

```scala

428

// Cache schema in broadcast variable for reuse

429

val schemaBC = spark.sparkContext.broadcast(complexSchema)

430

431

// Use in multiple operations

432

df1.write.format("avro").option(AvroOptions.AVRO_SCHEMA, schemaBC.value).save("output1")

433

df2.write.format("avro").option(AvroOptions.AVRO_SCHEMA, schemaBC.value).save("output2")

434

```