or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-functions.mdconfiguration.mdfile-operations.mdindex.mdschema-conversion.md

schema-conversion.mddocs/

0

# Schema Conversion

1

2

The Spark Avro connector provides utilities for converting between Avro schemas and Spark SQL schemas through the `SchemaConverters` object. This enables seamless interoperability between Avro and Spark data type systems, supporting complex nested types and logical type mappings.

3

4

## SchemaConverters Object

5

6

The `SchemaConverters` object provides bidirectional schema conversion capabilities.

7

8

```scala { .api }

9

@DeveloperApi

10

object SchemaConverters {

11

case class SchemaType(dataType: DataType, nullable: Boolean)

12

13

def toSqlType(avroSchema: Schema): SchemaType

14

def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType

15

def toAvroType(

16

catalystType: DataType,

17

nullable: Boolean,

18

recordName: String,

19

nameSpace: String

20

): Schema

21

}

22

```

23

24

## Converting Avro to Spark SQL Schema

25

26

### Basic Conversion

27

28

```scala { .api }

29

def toSqlType(avroSchema: Schema): SchemaType

30

```

31

32

**Parameters:**

33

- `avroSchema`: The Apache Avro schema to convert

34

35

**Returns:** `SchemaType` containing the Spark SQL DataType and nullability

36

37

**Usage Example:**

38

39

```scala

40

import org.apache.spark.sql.avro.SchemaConverters

41

import org.apache.avro.Schema

42

43

val avroSchemaJson = """

44

{

45

"type": "record",

46

"name": "User",

47

"fields": [

48

{"name": "id", "type": "long"},

49

{"name": "name", "type": "string"},

50

{"name": "email", "type": ["null", "string"], "default": null},

51

{"name": "age", "type": "int"},

52

{"name": "active", "type": "boolean"}

53

]

54

}

55

"""

56

57

val avroSchema = new Schema.Parser().parse(avroSchemaJson)

58

val schemaType = SchemaConverters.toSqlType(avroSchema)

59

60

println(s"Spark SQL DataType: ${schemaType.dataType}")

61

println(s"Is Nullable: ${schemaType.nullable}")

62

63

// Result:

64

// Spark SQL DataType: StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(email,StringType,true), StructField(age,IntegerType,false), StructField(active,BooleanType,false))

65

// Is Nullable: false

66

```

67

68

### Conversion with Options

69

70

```scala { .api }

71

def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType

72

```

73

74

**Additional Parameters:**

75

- `options`: Configuration options affecting conversion behavior

76

77

**Supported Options:**

78

- `enableStableIdentifiersForUnionType`: Use stable field names for union types

79

80

**Usage Example:**

81

82

```scala

83

val unionSchema = """

84

{

85

"type": "record",

86

"name": "Event",

87

"fields": [

88

{"name": "data", "type": [

89

{"type": "record", "name": "UserEvent", "fields": [{"name": "userId", "type": "long"}]},

90

{"type": "record", "name": "SystemEvent", "fields": [{"name": "systemId", "type": "string"}]}

91

]}

92

]

93

}

94

"""

95

96

val schema = new Schema.Parser().parse(unionSchema)

97

val options = Map("enableStableIdentifiersForUnionType" -> "true")

98

val schemaType = SchemaConverters.toSqlType(schema, options)

99

```

100

101

## Converting Spark SQL to Avro Schema

102

103

### Basic Conversion

104

105

```scala { .api }

106

def toAvroType(

107

catalystType: DataType,

108

nullable: Boolean,

109

recordName: String,

110

nameSpace: String

111

): Schema

112

```

113

114

**Parameters:**

115

- `catalystType`: Spark SQL DataType to convert

116

- `nullable`: Whether the root type should be nullable

117

- `recordName`: Name for the top-level record

118

- `nameSpace`: Namespace for the Avro schema

119

120

**Returns:** Apache Avro Schema object

121

122

**Usage Example:**

123

124

```scala

125

import org.apache.spark.sql.types._

126

127

val sparkSchema = StructType(Seq(

128

StructField("user_id", LongType, nullable = false),

129

StructField("username", StringType, nullable = false),

130

StructField("email", StringType, nullable = true),

131

StructField("created_at", TimestampType, nullable = false),

132

StructField("preferences", MapType(StringType, StringType), nullable = true)

133

))

134

135

val avroSchema = SchemaConverters.toAvroType(

136

catalystType = sparkSchema,

137

nullable = false,

138

recordName = "UserRecord",

139

nameSpace = "com.example.avro"

140

)

141

142

println(avroSchema.toString(true))

143

```

144

145

### Complex Type Conversion

146

147

Working with nested structures and arrays:

148

149

```scala

150

val complexSchema = StructType(Seq(

151

StructField("order_id", StringType, nullable = false),

152

StructField("customer", StructType(Seq(

153

StructField("id", LongType, nullable = false),

154

StructField("name", StringType, nullable = false),

155

StructField("addresses", ArrayType(StructType(Seq(

156

StructField("street", StringType, nullable = false),

157

StructField("city", StringType, nullable = false),

158

StructField("zip", StringType, nullable = true)

159

)), containsNull = false), nullable = true)

160

)), nullable = false),

161

StructField("items", ArrayType(StructType(Seq(

162

StructField("sku", StringType, nullable = false),

163

StructField("quantity", IntegerType, nullable = false),

164

StructField("price", DecimalType(10, 2), nullable = false)

165

)), containsNull = false), nullable = false)

166

))

167

168

val avroSchema = SchemaConverters.toAvroType(

169

complexSchema,

170

nullable = false,

171

recordName = "Order",

172

nameSpace = "com.ecommerce.orders"

173

)

174

```

175

176

## Type Mapping Reference

177

178

### Primitive Types

179

180

| Spark SQL Type | Avro Type | Notes |

181

|----------------|-----------|-------|

182

| BooleanType | boolean | Direct mapping |

183

| IntegerType | int | Direct mapping |

184

| LongType | long | Direct mapping |

185

| FloatType | float | Direct mapping |

186

| DoubleType | double | Direct mapping |

187

| StringType | string | Direct mapping |

188

| BinaryType | bytes | Direct mapping |

189

190

### Complex Types

191

192

| Spark SQL Type | Avro Type | Notes |

193

|----------------|-----------|-------|

194

| StructType | record | Field names and types preserved |

195

| ArrayType | array | Element type converted recursively |

196

| MapType | map | Key must be string, value type converted |

197

198

### Special Types

199

200

| Spark SQL Type | Avro Type | Notes |

201

|----------------|-----------|-------|

202

| TimestampType | long (timestamp-micros) | Logical type for microsecond precision |

203

| DateType | int (date) | Logical type for date values |

204

| DecimalType | bytes (decimal) | Logical type with precision/scale |

205

206

### Nullable Types

207

208

Spark SQL nullable fields are converted to Avro union types:

209

210

```scala

211

// Spark SQL: StructField("email", StringType, nullable = true)

212

// Avro: {"name": "email", "type": ["null", "string"], "default": null}

213

```

214

215

## Working with Logical Types

216

217

### Decimal Types

218

219

```scala

220

import org.apache.spark.sql.types.DecimalType

221

222

val decimalSchema = StructType(Seq(

223

StructField("price", DecimalType(10, 2), nullable = false),

224

StructField("tax", DecimalType(5, 4), nullable = true)

225

))

226

227

val avroSchema = SchemaConverters.toAvroType(

228

decimalSchema,

229

nullable = false,

230

recordName = "PriceInfo",

231

nameSpace = "com.example"

232

)

233

234

// Results in Avro schema with decimal logical types

235

```

236

237

### Timestamp and Date Types

238

239

```scala

240

val timeSchema = StructType(Seq(

241

StructField("created_at", TimestampType, nullable = false),

242

StructField("event_date", DateType, nullable = false),

243

StructField("updated_at", TimestampType, nullable = true)

244

))

245

246

val avroSchema = SchemaConverters.toAvroType(

247

timeSchema,

248

nullable = false,

249

recordName = "TimeRecord",

250

nameSpace = "com.example"

251

)

252

253

// Results in:

254

// - TimestampType -> long with timestamp-micros logical type

255

// - DateType -> int with date logical type

256

```

257

258

## Schema Validation and Compatibility

259

260

### Checking Data Type Support

261

262

Before conversion, verify that all data types are supported:

263

264

```scala

265

import org.apache.spark.sql.avro.AvroUtils

266

267

def validateSchema(schema: StructType): Boolean = {

268

schema.fields.forall(field => AvroUtils.supportsDataType(field.dataType))

269

}

270

271

val isSupported = validateSchema(yourSparkSchema)

272

if (!isSupported) {

273

throw new IllegalArgumentException("Schema contains unsupported data types")

274

}

275

```

276

277

### Schema Evolution Considerations

278

279

When converting schemas for evolution scenarios:

280

281

```scala

282

// Original schema

283

val v1Schema = StructType(Seq(

284

StructField("id", LongType, nullable = false),

285

StructField("name", StringType, nullable = false)

286

))

287

288

// Evolved schema (backward compatible)

289

val v2Schema = StructType(Seq(

290

StructField("id", LongType, nullable = false),

291

StructField("name", StringType, nullable = false),

292

StructField("email", StringType, nullable = true), // New optional field

293

StructField("created_at", TimestampType, nullable = true) // New optional field

294

))

295

296

val v1Avro = SchemaConverters.toAvroType(v1Schema, false, "User", "com.example")

297

val v2Avro = SchemaConverters.toAvroType(v2Schema, false, "User", "com.example")

298

299

// v2Avro can read data written with v1Avro schema

300

```

301

302

## Error Handling

303

304

### Unsupported Type Conversion

305

306

```scala

307

import org.apache.spark.sql.types.CalendarIntervalType

308

309

val unsupportedSchema = StructType(Seq(

310

StructField("id", LongType, nullable = false),

311

StructField("interval", CalendarIntervalType, nullable = true) // Not supported

312

))

313

314

try {

315

val avroSchema = SchemaConverters.toAvroType(

316

unsupportedSchema,

317

nullable = false,

318

recordName = "Test",

319

nameSpace = "com.example"

320

)

321

} catch {

322

case e: IllegalArgumentException =>

323

println(s"Conversion failed: ${e.getMessage}")

324

}

325

```

326

327

### Schema Parse Errors

328

329

```scala

330

val invalidAvroJson = """{"type": "invalid"}"""

331

332

try {

333

val avroSchema = new Schema.Parser().parse(invalidAvroJson)

334

val sparkType = SchemaConverters.toSqlType(avroSchema)

335

} catch {

336

case e: org.apache.avro.SchemaParseException =>

337

println(s"Invalid Avro schema: ${e.getMessage}")

338

}

339

```

340

341

## Practical Examples

342

343

### DataFrame Schema Conversion

344

345

Converting existing DataFrame schema to Avro for writing:

346

347

```scala

348

val df = spark.table("users")

349

val sparkSchema = df.schema

350

351

val avroSchema = SchemaConverters.toAvroType(

352

sparkSchema,

353

nullable = false,

354

recordName = "UserRecord",

355

nameSpace = "com.company.users"

356

)

357

358

// Use the schema for writing

359

df.write

360

.format("avro")

361

.option("avroSchema", avroSchema.toString)

362

.save("users_with_schema")

363

```

364

365

### Schema Registry Integration

366

367

Using converted schemas with external schema registries:

368

369

```scala

370

def registerSchema(subject: String, sparkSchema: StructType): Int = {

371

val avroSchema = SchemaConverters.toAvroType(

372

sparkSchema,

373

nullable = false,

374

recordName = subject.capitalize + "Record",

375

nameSpace = "com.company.schemas"

376

)

377

378

// Register with schema registry (pseudo-code)

379

schemaRegistry.register(subject, avroSchema)

380

}

381

382

val schemaId = registerSchema("user-events", df.schema)

383

```