or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-conversion.mdconfiguration.mddata-types.mdfile-operations.mdindex.mdschema-conversion.md

configuration.mddocs/

0

# Configuration Options

1

2

This document covers all configuration options available for the Spark Avro connector across reading, writing, and binary conversion operations.

3

4

## Read Configuration Options

5

6

### Schema Options

7

8

```scala { .api }

9

// Schema specification options

10

option("avroSchema", "JSON schema string") // Custom Avro schema for reading

11

option("avroSchemaUrl", "file:///path/to/schema.avsc") // Schema file location

12

```

13

14

**Usage Examples:**

15

```scala

16

// Using inline schema

17

val customSchema = """

18

{

19

"type": "record",

20

"name": "User",

21

"fields": [

22

{"name": "id", "type": "long"},

23

{"name": "name", "type": "string"},

24

{"name": "email", "type": ["null", "string"], "default": null}

25

]

26

}

27

"""

28

29

val df = spark.read

30

.format("avro")

31

.option("avroSchema", customSchema)

32

.load("path/to/files")

33

34

// Using schema file

35

val df2 = spark.read

36

.format("avro")

37

.option("avroSchemaUrl", "file:///schemas/user.avsc")

38

.load("path/to/files")

39

40

// Using HDFS schema location

41

val df3 = spark.read

42

.format("avro")

43

.option("avroSchemaUrl", "hdfs://namenode:port/schemas/user.avsc")

44

.load("path/to/files")

45

```

46

47

### Parse Mode Options

48

49

```scala { .api }

50

// Error handling during parsing

51

option("mode", "FAILFAST|PERMISSIVE|DROPMALFORMED")

52

```

53

54

**Mode Descriptions:**

55

- **FAILFAST**: Throw exception on any parsing error (default)

56

- **PERMISSIVE**: Set malformed records to null, continue processing

57

- **DROPMALFORMED**: Skip malformed records entirely

58

59

```scala

60

// Strict parsing - fail on any error

61

val strictDF = spark.read

62

.format("avro")

63

.option("mode", "FAILFAST")

64

.load("path/to/files")

65

66

// Lenient parsing - continue with nulls for bad records

67

val lenientDF = spark.read

68

.format("avro")

69

.option("mode", "PERMISSIVE")

70

.load("path/to/files")

71

72

// Skip bad records entirely

73

val filteredDF = spark.read

74

.format("avro")

75

.option("mode", "DROPMALFORMED")

76

.load("path/to/files")

77

```

78

79

### Field Matching Options

80

81

```scala { .api }

82

// Field matching strategy

83

option("positionalFieldMatching", "true|false") // Default: false

84

```

85

86

```scala

87

// Match fields by name (default behavior)

88

val nameMatchedDF = spark.read

89

.format("avro")

90

.option("positionalFieldMatching", "false")

91

.load("path/to/files")

92

93

// Match fields by position - useful for schema evolution

94

val positionMatchedDF = spark.read

95

.format("avro")

96

.option("positionalFieldMatching", "true")

97

.load("path/to/files")

98

```

99

100

### DateTime Handling Options

101

102

```scala { .api }

103

// Calendar rebase mode for date/timestamp values

104

option("datetimeRebaseMode", "EXCEPTION|LEGACY|CORRECTED")

105

```

106

107

**Rebase Modes:**

108

- **EXCEPTION**: Throw exception when rebasing is needed (safest)

109

- **LEGACY**: Use legacy Julian calendar behavior

110

- **CORRECTED**: Use proleptic Gregorian calendar (recommended)

111

112

```scala

113

val df = spark.read

114

.format("avro")

115

.option("datetimeRebaseMode", "CORRECTED")

116

.load("path/to/files")

117

```

118

119

### Union Type Options

120

121

```scala { .api }

122

// Union type field naming strategy

123

option("enableStableIdentifiersForUnionType", "true|false") // Default: false

124

```

125

126

```scala

127

// Default union field names: member0, member1, etc.

128

val defaultUnionDF = spark.read

129

.format("avro")

130

.option("enableStableIdentifiersForUnionType", "false")

131

.load("path/to/files")

132

133

// Stable union field names based on type: member_string, member_int, etc.

134

val stableUnionDF = spark.read

135

.format("avro")

136

.option("enableStableIdentifiersForUnionType", "true")

137

.load("path/to/files")

138

```

139

140

### File Handling Options

141

142

```scala { .api }

143

// File extension handling (deprecated - use pathGlobFilter instead)

144

option("ignoreExtension", "true|false") // Default: true

145

146

// Standard Spark options also supported

147

option("pathGlobFilter", "*.avro") // File pattern matching

148

option("recursiveFileLookup", "true|false") // Recursive directory scanning

149

option("ignoreCorruptFiles", "true|false") // Skip corrupt files

150

option("modifiedBefore", "2023-01-01") // Filter by modification time

151

option("modifiedAfter", "2022-01-01") // Filter by modification time

152

```

153

154

```scala

155

// Modern approach using pathGlobFilter

156

val filteredDF = spark.read

157

.format("avro")

158

.option("pathGlobFilter", "*.avro")

159

.option("recursiveFileLookup", "true")

160

.load("path/to/nested/directories")

161

162

// Legacy approach (deprecated)

163

val legacyDF = spark.read

164

.format("avro")

165

.option("ignoreExtension", "true")

166

.load("path/to/files")

167

```

168

169

## Write Configuration Options

170

171

### Schema Options

172

173

```scala { .api }

174

// Output schema specification

175

option("avroSchema", "JSON schema string") // Custom output schema

176

option("recordName", "string") // Top-level record name

177

option("recordNamespace", "string") // Record namespace

178

```

179

180

```scala

181

val outputSchema = """

182

{

183

"type": "record",

184

"name": "OutputRecord",

185

"namespace": "com.example.data",

186

"fields": [

187

{"name": "id", "type": "long"},

188

{"name": "value", "type": "string"}

189

]

190

}

191

"""

192

193

df.write

194

.format("avro")

195

.option("avroSchema", outputSchema)

196

.save("path/to/output")

197

198

// Alternative: specify record name and namespace separately

199

df.write

200

.format("avro")

201

.option("recordName", "MyRecord")

202

.option("recordNamespace", "com.example")

203

.save("path/to/output")

204

```

205

206

### Compression Options

207

208

```scala { .api }

209

// Compression codec selection

210

option("compression", "uncompressed|snappy|deflate|bzip2|xz|zstandard")

211

```

212

213

**Compression Codec Characteristics:**

214

- **snappy**: Fast compression/decompression, moderate ratio (default)

215

- **deflate**: Good compression ratio, configurable level

216

- **bzip2**: High compression ratio, slower performance

217

- **xz**: High compression ratio, slower performance

218

- **zstandard**: Best balance of speed and compression ratio

219

- **uncompressed**: No compression, fastest I/O

220

221

```scala

222

// Fast compression (default)

223

df.write

224

.format("avro")

225

.option("compression", "snappy")

226

.save("path/to/output")

227

228

// Best compression ratio

229

df.write

230

.format("avro")

231

.option("compression", "zstandard")

232

.save("path/to/output")

233

234

// No compression

235

df.write

236

.format("avro")

237

.option("compression", "uncompressed")

238

.save("path/to/output")

239

```

240

241

### Field Matching Options

242

243

```scala { .api }

244

// Field matching strategy for writing

245

option("positionalFieldMatching", "true|false") // Default: false

246

```

247

248

```scala

249

// Write with positional field matching

250

df.write

251

.format("avro")

252

.option("positionalFieldMatching", "true")

253

.save("path/to/output")

254

```

255

256

## Binary Conversion Options

257

258

Options used with `from_avro()` and `to_avro()` functions:

259

260

```scala { .api }

261

import scala.collection.JavaConverters._

262

263

// Available options for from_avro()

264

val fromAvroOptions: java.util.Map[String, String] = Map(

265

"mode" -> "FAILFAST|PERMISSIVE|DROPMALFORMED",

266

"datetimeRebaseMode" -> "EXCEPTION|LEGACY|CORRECTED",

267

"enableStableIdentifiersForUnionType" -> "true|false"

268

).asJava

269

```

270

271

```scala

272

import org.apache.spark.sql.avro.functions._

273

import scala.collection.JavaConverters._

274

275

val options = Map(

276

"mode" -> "PERMISSIVE",

277

"datetimeRebaseMode" -> "CORRECTED",

278

"enableStableIdentifiersForUnionType" -> "true"

279

).asJava

280

281

val decodedDF = df.select(

282

from_avro(col("avro_data"), avroSchema, options).as("decoded")

283

)

284

```

285

286

## Global Configuration

287

288

### Spark SQL Configuration

289

290

```scala { .api }

291

// Set global defaults via Spark configuration

292

spark.conf.set("spark.sql.avro.compression.codec", "snappy")

293

spark.conf.set("spark.sql.avro.deflate.level", "6")

294

spark.conf.set("spark.sql.avro.datetimeRebaseModeInRead", "CORRECTED")

295

```

296

297

```scala

298

// Configure global defaults

299

spark.conf.set("spark.sql.avro.compression.codec", "zstandard")

300

spark.conf.set("spark.sql.avro.datetimeRebaseModeInRead", "CORRECTED")

301

302

// These defaults will be used when options are not explicitly specified

303

val df = spark.read.format("avro").load("path/to/files")

304

```

305

306

### Hadoop Configuration

307

308

```scala

309

// For advanced use cases, configure Hadoop-level settings

310

val hadoopConf = spark.sparkContext.hadoopConfiguration

311

312

// Avro-specific Hadoop configurations

313

hadoopConf.setBoolean("avro.mapred.ignore.inputs.without.extension", false)

314

hadoopConf.set("avro.schema.input.key", customSchemaJson)

315

```

316

317

## Performance Tuning Options

318

319

### File Size Optimization

320

321

```scala

322

// Control output file size

323

df.coalesce(10) // Reduce number of output files

324

.write

325

.format("avro")

326

.option("compression", "snappy")

327

.save("path/to/output")

328

329

// For large datasets, use repartition for better parallelism

330

df.repartition(100)

331

.write

332

.format("avro")

333

.save("path/to/output")

334

```

335

336

### Memory Management

337

338

```scala

339

// For large schema conversions, consider caching

340

val convertedDF = df.select(

341

from_avro(col("large_avro_data"), complexSchema).as("converted")

342

).cache()

343

344

// Persist decoded data for multiple operations

345

convertedDF.persist(StorageLevel.MEMORY_AND_DISK_SER)

346

```

347

348

## Error Handling Configuration

349

350

### Comprehensive Error Handling Setup

351

352

```scala

353

val robustReadConfig = spark.read

354

.format("avro")

355

.option("mode", "PERMISSIVE") // Continue on parse errors

356

.option("ignoreCorruptFiles", "true") // Skip corrupt files

357

.option("datetimeRebaseMode", "CORRECTED") // Handle date/time properly

358

.option("pathGlobFilter", "*.avro") // Only process .avro files

359

360

val df = robustReadConfig.load("path/to/potentially/problematic/files")

361

362

// Filter out rows that failed to parse

363

val cleanDF = df.filter(col("decoded_data").isNotNull)

364

```

365

366

### Write with Error Recovery

367

368

```scala

369

try {

370

df.write

371

.format("avro")

372

.mode("overwrite")

373

.option("compression", "snappy")

374

.save("path/to/output")

375

} catch {

376

case e: AnalysisException if e.getMessage.contains("schema") =>

377

// Handle schema-related errors

378

println(s"Schema error: ${e.getMessage}")

379

// Retry with different schema or options

380

case e: Exception =>

381

// Handle other errors

382

println(s"Write failed: ${e.getMessage}")

383

}

384

```

385

386

## Configuration Best Practices

387

388

### Production Configuration Template

389

390

```scala

391

// Robust production configuration for reading

392

def createRobustAvroReader(spark: SparkSession): DataFrameReader = {

393

spark.read

394

.format("avro")

395

.option("mode", "PERMISSIVE")

396

.option("ignoreCorruptFiles", "true")

397

.option("datetimeRebaseMode", "CORRECTED")

398

.option("pathGlobFilter", "*.avro")

399

.option("recursiveFileLookup", "true")

400

}

401

402

// Optimized production configuration for writing

403

def createOptimizedAvroWriter(df: DataFrame): DataFrameWriter[Row] = {

404

df.coalesce(math.max(1, df.rdd.getNumPartitions / 4)) // Reduce small files

405

.write

406

.format("avro")

407

.option("compression", "zstandard") // Best compression

408

.mode("overwrite")

409

}

410

```

411

412

### Schema Evolution Configuration

413

414

```scala

415

// Configuration for schema evolution scenarios

416

def createEvolutionSafeReader(

417

spark: SparkSession,

418

readerSchema: String

419

): DataFrameReader = {

420

spark.read

421

.format("avro")

422

.option("avroSchema", readerSchema) // Use evolved schema

423

.option("positionalFieldMatching", "false") // Match by name

424

.option("mode", "PERMISSIVE") // Handle missing fields gracefully

425

.option("enableStableIdentifiersForUnionType", "true") // Stable union handling

426

}

427

```

428

429

### Streaming Configuration

430

431

```scala

432

// Configuration for streaming Avro data

433

val streamingAvroDF = spark.readStream

434

.format("kafka")

435

.option("kafka.bootstrap.servers", "localhost:9092")

436

.option("subscribe", "avro-topic")

437

.load()

438

.select(

439

from_avro(

440

col("value"),

441

avroSchema,

442

Map(

443

"mode" -> "PERMISSIVE",

444

"datetimeRebaseMode" -> "CORRECTED"

445

).asJava

446

).as("data")

447

)

448

```