or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-conversion.mdconfiguration.mddata-types.mdfile-operations.mdindex.mdschema-conversion.md

schema-conversion.mddocs/

0

# Schema Conversion

1

2

This document covers converting between Avro schemas and Spark SQL schemas using the SchemaConverters utility.

3

4

## Core API

5

6

```scala { .api }

7

import org.apache.spark.sql.avro.SchemaConverters

8

import org.apache.spark.sql.types.{DataType, StructType}

9

import org.apache.avro.Schema

10

11

/**

12

* Wrapper for SQL data type and nullability.

13

*/

14

case class SchemaType(dataType: DataType, nullable: Boolean)

15

16

object SchemaConverters {

17

18

/**

19

* Converts an Avro schema to a corresponding Spark SQL schema.

20

* @param avroSchema the Avro schema to convert

21

* @return SchemaType containing the converted DataType and nullability

22

*/

23

def toSqlType(avroSchema: Schema): SchemaType

24

25

/**

26

* Converts an Avro schema with union type options.

27

* @param avroSchema the Avro schema to convert

28

* @param useStableIdForUnionType whether to use stable identifiers for union types

29

* @return SchemaType containing the converted DataType and nullability

30

*/

31

def toSqlType(avroSchema: Schema, useStableIdForUnionType: Boolean): SchemaType

32

33

/**

34

* Converts an Avro schema with parsing options.

35

* @param avroSchema the Avro schema to convert

36

* @param options conversion options map

37

* @return SchemaType containing the converted DataType and nullability

38

*/

39

def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType

40

41

/**

42

* Converts a Spark SQL schema to a corresponding Avro schema.

43

* @param catalystType the Spark SQL DataType to convert

44

* @param nullable whether the type should be nullable

45

* @param recordName the name for the top-level record (default: "topLevelRecord")

46

* @param nameSpace the namespace for the record (default: "")

47

* @return the corresponding Avro Schema

48

*/

49

def toAvroType(

50

catalystType: DataType,

51

nullable: Boolean = false,

52

recordName: String = "topLevelRecord",

53

nameSpace: String = ""

54

): Schema

55

}

56

```

57

58

## Avro to Spark SQL Conversion

59

60

### Basic Type Conversion

61

62

```scala

63

import org.apache.spark.sql.avro.SchemaConverters

64

import org.apache.avro.Schema

65

66

// Simple primitive type

67

val avroIntSchema = Schema.create(Schema.Type.INT)

68

val sparkType = SchemaConverters.toSqlType(avroIntSchema)

69

println(sparkType.dataType) // IntegerType

70

println(sparkType.nullable) // false

71

72

// String type

73

val avroStringSchema = Schema.create(Schema.Type.STRING)

74

val stringType = SchemaConverters.toSqlType(avroStringSchema)

75

println(stringType.dataType) // StringType

76

```

77

78

### Record Type Conversion

79

80

```scala

81

val avroRecordJson = """

82

{

83

"type": "record",

84

"name": "User",

85

"fields": [

86

{"name": "id", "type": "long"},

87

{"name": "name", "type": "string"},

88

{"name": "email", "type": ["null", "string"], "default": null},

89

{"name": "age", "type": ["null", "int"], "default": null}

90

]

91

}

92

"""

93

94

val avroSchema = new Schema.Parser().parse(avroRecordJson)

95

val schemaType = SchemaConverters.toSqlType(avroSchema)

96

97

schemaType.dataType match {

98

case structType: StructType =>

99

println(structType.treeString)

100

// root

101

// |-- id: long (nullable = false)

102

// |-- name: string (nullable = false)

103

// |-- email: string (nullable = true)

104

// |-- age: integer (nullable = true)

105

}

106

```

107

108

### Array and Map Conversion

109

110

```scala

111

// Array type

112

val avroArrayJson = """

113

{

114

"type": "array",

115

"items": "string"

116

}

117

"""

118

val arraySchema = new Schema.Parser().parse(avroArrayJson)

119

val arrayType = SchemaConverters.toSqlType(arraySchema)

120

println(arrayType.dataType) // ArrayType(StringType, containsNull=false)

121

122

// Map type

123

val avroMapJson = """

124

{

125

"type": "map",

126

"values": "int"

127

}

128

"""

129

val mapSchema = new Schema.Parser().parse(avroMapJson)

130

val mapType = SchemaConverters.toSqlType(mapSchema)

131

println(mapType.dataType) // MapType(StringType, IntegerType, valueContainsNull=false)

132

```

133

134

### Union Type Conversion

135

136

```scala

137

// Standard union handling

138

val unionJson = """

139

{

140

"type": "record",

141

"name": "Event",

142

"fields": [

143

{"name": "id", "type": "long"},

144

{"name": "data", "type": [

145

{"type": "record", "name": "TextEvent", "fields": [{"name": "message", "type": "string"}]},

146

{"type": "record", "name": "NumericEvent", "fields": [{"name": "value", "type": "double"}]}

147

]}

148

]

149

}

150

"""

151

152

val unionSchema = new Schema.Parser().parse(unionJson)

153

154

// Default union conversion (positional field names)

155

val defaultUnion = SchemaConverters.toSqlType(unionSchema)

156

157

// Stable union conversion (type-based field names)

158

val stableUnion = SchemaConverters.toSqlType(unionSchema, useStableIdForUnionType = true)

159

160

// With options

161

val options = Map("enableStableIdentifiersForUnionType" -> "true")

162

val optionsUnion = SchemaConverters.toSqlType(unionSchema, options)

163

```

164

165

## Spark SQL to Avro Conversion

166

167

### Basic Type Conversion

168

169

```scala

170

import org.apache.spark.sql.types._

171

172

// Convert basic types

173

val intSchema = SchemaConverters.toAvroType(IntegerType)

174

println(intSchema.toString) // "int"

175

176

val stringSchema = SchemaConverters.toAvroType(StringType, nullable = true)

177

println(stringSchema.toString) // ["null","string"]

178

```

179

180

### Struct Type Conversion

181

182

```scala

183

val sparkSchema = StructType(Seq(

184

StructField("id", LongType, nullable = false),

185

StructField("name", StringType, nullable = false),

186

StructField("email", StringType, nullable = true),

187

StructField("balance", DecimalType(10, 2), nullable = true)

188

))

189

190

val avroSchema = SchemaConverters.toAvroType(

191

sparkSchema,

192

nullable = false,

193

recordName = "Customer",

194

nameSpace = "com.example"

195

)

196

197

println(avroSchema.toString(true))

198

```

199

200

### Complex Type Conversion

201

202

```scala

203

// Array type

204

val arrayType = ArrayType(StringType, containsNull = true)

205

val avroArray = SchemaConverters.toAvroType(arrayType)

206

207

// Map type

208

val mapType = MapType(StringType, IntegerType, valueContainsNull = false)

209

val avroMap = SchemaConverters.toAvroType(mapType)

210

211

// Nested struct

212

val nestedSchema = StructType(Seq(

213

StructField("orderId", LongType, nullable = false),

214

StructField("customer", StructType(Seq(

215

StructField("id", LongType, nullable = false),

216

StructField("name", StringType, nullable = false)

217

)), nullable = false),

218

StructField("items", ArrayType(StructType(Seq(

219

StructField("productId", LongType, nullable = false),

220

StructField("quantity", IntegerType, nullable = false),

221

StructField("price", DoubleType, nullable = false)

222

))), nullable = false)

223

))

224

225

val nestedAvroSchema = SchemaConverters.toAvroType(

226

nestedSchema,

227

recordName = "Order",

228

nameSpace = "com.example.orders"

229

)

230

```

231

232

## Logical Type Support

233

234

### Date and Timestamp Types

235

236

```scala

237

// Date type conversion

238

val dateType = DateType

239

val avroDateSchema = SchemaConverters.toAvroType(dateType)

240

// Results in int type with date logical type

241

242

// Timestamp type conversion

243

val timestampType = TimestampType

244

val avroTimestampSchema = SchemaConverters.toAvroType(timestampType)

245

// Results in long type with timestamp-micros logical type

246

247

// TimestampNTZ (no timezone) conversion

248

val timestampNTZType = TimestampNTZType

249

val avroLocalTimestampSchema = SchemaConverters.toAvroType(timestampNTZType)

250

// Results in long type with local-timestamp-micros logical type

251

```

252

253

### Decimal Type Conversion

254

255

```scala

256

val decimalType = DecimalType(precision = 10, scale = 2)

257

val avroDecimalSchema = SchemaConverters.toAvroType(decimalType)

258

// Results in fixed type with decimal logical type

259

```

260

261

### Interval Types

262

263

```scala

264

// Year-month interval

265

val yearMonthInterval = YearMonthIntervalType()

266

val avroYMSchema = SchemaConverters.toAvroType(yearMonthInterval)

267

// Results in int type with catalyst type property

268

269

// Day-time interval

270

val dayTimeInterval = DayTimeIntervalType()

271

val avroDTSchema = SchemaConverters.toAvroType(dayTimeInterval)

272

// Results in long type with catalyst type property

273

```

274

275

## Schema Evolution Patterns

276

277

### Adding Optional Fields

278

279

```scala

280

// Original schema

281

val originalSchema = StructType(Seq(

282

StructField("id", LongType, nullable = false),

283

StructField("name", StringType, nullable = false)

284

))

285

286

// Evolved schema with new optional field

287

val evolvedSchema = StructType(Seq(

288

StructField("id", LongType, nullable = false),

289

StructField("name", StringType, nullable = false),

290

StructField("email", StringType, nullable = true) // New optional field

291

))

292

293

val evolvedAvroSchema = SchemaConverters.toAvroType(evolvedSchema)

294

```

295

296

### Handling Schema Compatibility

297

298

```scala

299

def isSchemaCompatible(readerSchema: Schema, writerSchema: Schema): Boolean = {

300

try {

301

// Convert both schemas to Spark SQL format

302

val readerSqlType = SchemaConverters.toSqlType(readerSchema)

303

val writerSqlType = SchemaConverters.toSqlType(writerSchema)

304

305

// Check structural compatibility

306

DataType.equalsIgnoreCompatibleNullability(

307

readerSqlType.dataType,

308

writerSqlType.dataType

309

)

310

} catch {

311

case _: Exception => false

312

}

313

}

314

```

315

316

## Custom Type Properties

317

318

### Preserving Catalyst Type Information

319

320

```scala

321

// Some Spark SQL types are preserved using custom properties

322

val intervalSchema = SchemaConverters.toAvroType(YearMonthIntervalType())

323

324

// The generated Avro schema includes catalyst type property:

325

// {"type": "int", "spark.sql.catalyst.type": "interval year to month"}

326

327

// When converting back to Spark SQL, the property is used to restore the exact type

328

val restoredType = SchemaConverters.toSqlType(intervalSchema)

329

// Results in YearMonthIntervalType instead of IntegerType

330

```

331

332

## Exception Types

333

334

```scala { .api }

335

/**

336

* Exception thrown when schema conversion fails due to incompatibility.

337

*/

338

class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)

339

340

/**

341

* Exception thrown when an unsupported Avro type is encountered.

342

*/

343

class UnsupportedAvroTypeException(msg: String) extends Exception(msg)

344

```

345

346

## Error Handling

347

348

### Schema Conversion Errors

349

350

```scala

351

import org.apache.spark.sql.avro.IncompatibleSchemaException

352

353

try {

354

// Invalid recursive schema

355

val recursiveJson = """

356

{

357

"type": "record",

358

"name": "Node",

359

"fields": [

360

{"name": "value", "type": "int"},

361

{"name": "child", "type": "Node"}

362

]

363

}

364

"""

365

val recursiveSchema = new Schema.Parser().parse(recursiveJson)

366

val result = SchemaConverters.toSqlType(recursiveSchema)

367

} catch {

368

case e: IncompatibleSchemaException =>

369

println(s"Schema conversion failed: ${e.getMessage}")

370

}

371

```

372

373

### Unsupported Type Handling

374

375

```scala

376

try {

377

// Attempt to convert unsupported Spark SQL type

378

val unsupportedType = CalendarIntervalType

379

val avroSchema = SchemaConverters.toAvroType(unsupportedType)

380

} catch {

381

case e: IncompatibleSchemaException =>

382

println(s"Unsupported type conversion: ${e.getMessage}")

383

}

384

```

385

386

## Practical Examples

387

388

### Schema Registry Integration

389

390

```scala

391

// Example of using with Confluent Schema Registry

392

def convertSchemaRegistryToSpark(schemaRegistrySchema: String): StructType = {

393

val avroSchema = new Schema.Parser().parse(schemaRegistrySchema)

394

val schemaType = SchemaConverters.toSqlType(avroSchema)

395

schemaType.dataType.asInstanceOf[StructType]

396

}

397

398

def convertSparkToSchemaRegistry(sparkSchema: StructType, name: String): String = {

399

val avroSchema = SchemaConverters.toAvroType(

400

sparkSchema,

401

recordName = name,

402

nameSpace = "com.example"

403

)

404

avroSchema.toString(true) // Pretty-printed JSON

405

}

406

```

407

408

### Dynamic Schema Handling

409

410

```scala

411

def processWithDynamicSchema(df: DataFrame, avroSchemaJson: String): DataFrame = {

412

val avroSchema = new Schema.Parser().parse(avroSchemaJson)

413

val schemaType = SchemaConverters.toSqlType(avroSchema)

414

415

// Verify DataFrame schema compatibility

416

val dfSchema = df.schema

417

if (DataType.equalsIgnoreCompatibleNullability(dfSchema, schemaType.dataType)) {

418

// Schemas are compatible, proceed with processing

419

df.select("*")

420

} else {

421

throw new IllegalArgumentException("DataFrame schema incompatible with Avro schema")

422

}

423

}

424

```