or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

column-functions.mdconfiguration.mddata-source.mdindex.mdschema-conversion.md

schema-conversion.mddocs/

0

# Schema Conversion

1

2

Utilities for converting between Apache Avro schemas and Spark SQL schemas. Supports complex nested types, logical types, and bidirectional conversion with proper nullability handling.

3

4

## Capabilities

5

6

### Avro to Spark SQL Conversion

7

8

Convert Avro schemas to Spark SQL DataType structures with nullability information.

9

10

```scala { .api }

11

object SchemaConverters {

12

/**

13

* Converts an Avro schema to Spark SQL schema type

14

* @param avroSchema the Avro schema to convert

15

* @return SchemaType containing DataType and nullability information

16

*/

17

def toSqlType(avroSchema: Schema): SchemaType

18

}

19

20

case class SchemaType(dataType: DataType, nullable: Boolean)

21

```

22

23

**Usage Examples:**

24

25

```scala

26

import org.apache.avro.Schema

27

import org.apache.spark.sql.avro.SchemaConverters

28

29

// Parse Avro schema from JSON

30

val avroSchemaJson = """{

31

"type": "record",

32

"name": "User",

33

"fields": [

34

{"name": "id", "type": "long"},

35

{"name": "name", "type": "string"},

36

{"name": "email", "type": ["null", "string"], "default": null},

37

{"name": "addresses", "type": {

38

"type": "array",

39

"items": {

40

"type": "record",

41

"name": "Address",

42

"fields": [

43

{"name": "street", "type": "string"},

44

{"name": "city", "type": "string"}

45

]

46

}

47

}}

48

]

49

}"""

50

51

val avroSchema = new Schema.Parser().parse(avroSchemaJson)

52

53

// Convert to Spark SQL schema

54

val schemaType = SchemaConverters.toSqlType(avroSchema)

55

val sparkSqlSchema = schemaType.dataType.asInstanceOf[StructType]

56

57

// Use in DataFrame operations

58

val df = spark.read

59

.schema(sparkSqlSchema)

60

.format("avro")

61

.load("path/to/user_data.avro")

62

63

// Print schema information

64

println(s"Nullable: ${schemaType.nullable}")

65

println(s"Spark Schema: ${sparkSqlSchema.prettyJson}")

66

```

67

68

### Spark SQL to Avro Conversion

69

70

Convert Spark SQL DataType structures to Avro schemas with configurable naming.

71

72

```scala { .api }

73

object SchemaConverters {

74

/**

75

* Converts a Spark SQL DataType to Avro schema

76

* @param catalystType the Spark SQL DataType to convert

77

* @param nullable whether the type should be nullable

78

* @param recordName name for record types (default: "topLevelRecord")

79

* @param nameSpace namespace for record types (default: "")

80

* @return Avro Schema

81

* @throws IncompatibleSchemaException if the DataType cannot be converted

82

*/

83

def toAvroType(

84

catalystType: DataType,

85

nullable: Boolean = false,

86

recordName: String = "topLevelRecord",

87

nameSpace: String = ""

88

): Schema

89

90

/**

91

* Exception thrown when schema conversion fails

92

*/

93

case class IncompatibleSchemaException(msg: String) extends Exception(msg)

94

}

95

```

96

97

**Usage Examples:**

98

99

```scala

100

import org.apache.spark.sql.types._

101

import org.apache.spark.sql.avro.SchemaConverters

102

103

// Create Spark SQL schema

104

val sparkSchema = StructType(Seq(

105

StructField("id", LongType, nullable = false),

106

StructField("name", StringType, nullable = false),

107

StructField("email", StringType, nullable = true),

108

StructField("scores", ArrayType(DoubleType, containsNull = false), nullable = false)

109

))

110

111

// Convert to Avro schema

112

val avroSchema = SchemaConverters.toAvroType(

113

sparkSchema,

114

nullable = false,

115

recordName = "UserProfile",

116

nameSpace = "com.example.data"

117

)

118

119

// Use in write operations

120

val df = spark.table("user_data")

121

df.write

122

.format("avro")

123

.option("avroSchema", avroSchema.toString)

124

.save("path/to/output")

125

126

// Print schema JSON

127

println(avroSchema.toString(true))

128

129

// Convert to Avro schema

130

val avroSchema = SchemaConverters.toAvroType(

131

catalystType = sparkSchema,

132

nullable = false,

133

recordName = "UserRecord",

134

nameSpace = "com.example"

135

)

136

137

// Use in write operations

138

val df = spark.table("users")

139

df.write

140

.format("avro")

141

.option("avroSchema", avroSchema.toString)

142

.save("path/to/output")

143

```

144

145

### Supported Type Mappings

146

147

Comprehensive type conversion support between Avro and Spark SQL types.

148

149

#### Avro to Spark SQL Type Mappings

150

151

```scala { .api }

152

// Primitive types

153

NULL → NullType

154

BOOLEAN → BooleanType

155

INT → IntegerType

156

LONG → LongType

157

FLOAT → FloatType

158

DOUBLE → DoubleType

159

BYTES → BinaryType

160

STRING → StringType

161

ENUM → StringType

162

163

// Logical types

164

INT with date logical type → DateType

165

LONG with timestamp-millis logical type → TimestampType

166

LONG with timestamp-micros logical type → TimestampType

167

BYTES/FIXED with decimal logical type → DecimalType

168

169

// Complex types

170

RECORD → StructType

171

ARRAY → ArrayType

172

MAP → MapType(StringType, valueType)

173

UNION → Special handling (see below)

174

FIXED → BinaryType

175

```

176

177

#### Spark SQL to Avro Type Mappings

178

179

```scala { .api }

180

// Primitive types

181

BooleanType → BOOLEAN

182

ByteType → INT

183

ShortType → INT

184

IntegerType → INT

185

LongType → LONG

186

FloatType → FLOAT

187

DoubleType → DOUBLE

188

StringType → STRING

189

BinaryType → BYTES

190

191

// Special types

192

DateType → INT with date logical type

193

TimestampType → LONG with timestamp-micros logical type

194

DecimalType → FIXED with decimal logical type

195

196

// Complex types

197

StructType → RECORD

198

ArrayType → ARRAY

199

MapType → MAP (only with StringType keys)

200

```

201

202

### UNION Type Handling

203

204

Special handling for Avro UNION types with multiple conversion strategies.

205

206

```scala { .api }

207

// Union type conversion strategies:

208

209

// 1. Nullable unions (union with null)

210

["null", "string"] → StringType with nullable = true

211

212

// 2. Simple type promotion unions

213

["int", "long"] → LongType

214

["float", "double"] → DoubleType

215

216

// 3. Complex unions (converted to struct with member fields)

217

["string", "int", "record"] → StructType with fields:

218

- member0: StringType (nullable = true)

219

- member1: IntegerType (nullable = true)

220

- member2: StructType (nullable = true)

221

```

222

223

**Union Handling Examples:**

224

225

```scala

226

// Nullable field union

227

val nullableUnion = """["null", "string"]"""

228

// Converts to: StringType, nullable = true

229

230

// Type promotion union

231

val promotionUnion = """["int", "long"]"""

232

// Converts to: LongType, nullable = false

233

234

// Complex union

235

val complexUnion = """[

236

"string",

237

{"type": "record", "name": "Address", "fields": [

238

{"name": "street", "type": "string"}

239

]}

240

]"""

241

// Converts to: StructType with member0 (string) and member1 (Address record)

242

```

243

244

### Logical Types Support

245

246

Full support for Avro logical types with proper Spark SQL mapping.

247

248

#### Date and Timestamp Logical Types

249

250

```scala { .api }

251

// Date logical type (days since epoch)

252

{

253

"type": "int",

254

"logicalType": "date"

255

} → DateType

256

257

// Timestamp logical types

258

{

259

"type": "long",

260

"logicalType": "timestamp-millis"

261

} → TimestampType

262

263

{

264

"type": "long",

265

"logicalType": "timestamp-micros"

266

} → TimestampType

267

```

268

269

#### Decimal Logical Type

270

271

```scala { .api }

272

// Decimal as BYTES

273

{

274

"type": "bytes",

275

"logicalType": "decimal",

276

"precision": 10,

277

"scale": 2

278

} → DecimalType(10, 2)

279

280

// Decimal as FIXED

281

{

282

"type": "fixed",

283

"name": "DecimalFixed",

284

"size": 8,

285

"logicalType": "decimal",

286

"precision": 18,

287

"scale": 4

288

} → DecimalType(18, 4)

289

```

290

291

### Schema Evolution Support

292

293

Handling schema evolution and compatibility between different schema versions.

294

295

```scala

296

// Schema evolution example

297

val v1Schema = """{

298

"type": "record",

299

"name": "User",

300

"fields": [

301

{"name": "id", "type": "long"},

302

{"name": "name", "type": "string"}

303

]

304

}"""

305

306

val v2Schema = """{

307

"type": "record",

308

"name": "User",

309

"fields": [

310

{"name": "id", "type": "long"},

311

{"name": "name", "type": "string"},

312

{"name": "email", "type": ["null", "string"], "default": null}

313

]

314

}"""

315

316

// Reading evolved data with original schema

317

val df = spark.read

318

.format("avro")

319

.option("avroSchema", v1Schema) // Use older schema

320

.load("path/to/v2_data.avro") // Reading newer data

321

```

322

323

### Error Handling

324

325

Schema conversion error handling and common issues.

326

327

```scala { .api }

328

class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)

329

```

330

331

**Common Error Scenarios:**

332

333

```scala

334

// Unsupported type conversion

335

try {

336

val unsupportedType = MapType(IntegerType, StringType) // Avro MAP requires string keys

337

SchemaConverters.toAvroType(unsupportedType)

338

} catch {

339

case e: IncompatibleSchemaException =>

340

println(s"Conversion failed: ${e.getMessage}")

341

}

342

343

// Recursive schema detection

344

val recursiveSchema = """{

345

"type": "record",

346

"name": "Node",

347

"fields": [

348

{"name": "value", "type": "string"},

349

{"name": "child", "type": "Node"}

350

]

351

}"""

352

353

try {

354

SchemaConverters.toSqlType(new Schema.Parser().parse(recursiveSchema))

355

} catch {

356

case e: IncompatibleSchemaException =>

357

println("Recursive schemas not supported")

358

}

359

```