or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddatasource.mdfunctions.mdindex.mdschema-conversion.md

schema-conversion.mddocs/

0

# Schema Conversion

1

2

The Schema Conversion API provides developer-level utilities for converting between Avro schemas and Spark SQL data types. This API is marked as `@DeveloperApi` and is primarily used for advanced schema manipulation and custom data source implementations.

3

4

## Core API

5

6

### SchemaConverters Object

7

8

```scala { .api }

9

object SchemaConverters {

10

def toSqlType(avroSchema: org.apache.avro.Schema): SchemaType

11

def toSqlType(avroSchema: org.apache.avro.Schema, useStableIdForUnionType: Boolean,

12

stableIdPrefixForUnionType: String, recursiveFieldMaxDepth: Int): SchemaType

13

def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): org.apache.avro.Schema

14

}

15

```

16

17

### SchemaType Case Class

18

19

```scala { .api }

20

case class SchemaType(dataType: DataType, nullable: Boolean)

21

```

22

23

Represents the result of converting an Avro schema to a Spark SQL data type, including nullability information.

24

25

## Avro to Spark SQL Conversion

26

27

### Basic Conversion

28

29

Converts an Avro schema to a Spark SQL DataType:

30

31

```scala { .api }

32

def toSqlType(avroSchema: org.apache.avro.Schema): SchemaType

33

```

34

35

**Usage Example:**

36

```scala

37

import org.apache.spark.sql.avro.SchemaConverters

38

import org.apache.avro.Schema

39

40

val avroSchemaJson = """{

41

"type": "record",

42

"name": "User",

43

"fields": [

44

{"name": "id", "type": "long"},

45

{"name": "name", "type": "string"},

46

{"name": "email", "type": ["null", "string"], "default": null}

47

]

48

}"""

49

50

val avroSchema = new Schema.Parser().parse(avroSchemaJson)

51

val sparkSchema = SchemaConverters.toSqlType(avroSchema)

52

53

println(sparkSchema.dataType)

54

// StructType(List(

55

// StructField("id", LongType, false),

56

// StructField("name", StringType, false),

57

// StructField("email", StringType, true)

58

// ))

59

```

60

61

### Advanced Conversion

62

63

Converts with advanced options for union type handling and recursion control:

64

65

```scala { .api }

66

def toSqlType(avroSchema: org.apache.avro.Schema,

67

useStableIdForUnionType: Boolean,

68

stableIdPrefixForUnionType: String,

69

recursiveFieldMaxDepth: Int): SchemaType

70

```

71

72

**Parameters:**

73

- `avroSchema`: The Avro schema to convert

74

- `useStableIdForUnionType`: Use stable identifiers for union types

75

- `stableIdPrefixForUnionType`: Prefix for stable union identifiers

76

- `recursiveFieldMaxDepth`: Maximum depth for recursive field resolution

77

78

**Usage Example:**

79

```scala

80

val complexSchema = SchemaConverters.toSqlType(

81

avroSchema,

82

useStableIdForUnionType = true,

83

stableIdPrefixForUnionType = "union_",

84

recursiveFieldMaxDepth = 10

85

)

86

```

87

88

## Spark SQL to Avro Conversion

89

90

### Type Conversion

91

92

Converts a Spark SQL DataType to an Avro schema:

93

94

```scala { .api }

95

def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): org.apache.avro.Schema

96

```

97

98

**Parameters:**

99

- `catalystType`: Spark SQL DataType to convert

100

- `nullable`: Whether the field can be null

101

- `recordName`: Name for generated record types

102

- `nameSpace`: Namespace for generated schemas

103

104

**Usage Example:**

105

```scala

106

import org.apache.spark.sql.types._

107

108

val sparkSchema = StructType(Seq(

109

StructField("id", LongType, nullable = false),

110

StructField("name", StringType, nullable = false),

111

StructField("score", DoubleType, nullable = true)

112

))

113

114

val avroSchema = SchemaConverters.toAvroType(

115

sparkSchema,

116

nullable = false,

117

recordName = "UserRecord",

118

nameSpace = "com.example"

119

)

120

121

println(avroSchema.toString(true))

122

```

123

124

## Type Mapping

125

126

### Primitive Types

127

128

| Avro Type | Spark SQL Type | Notes |

129

|-----------|----------------|-------|

130

| boolean | BooleanType | Direct mapping |

131

| int | IntegerType | 32-bit signed integer |

132

| long | LongType | 64-bit signed integer |

133

| float | FloatType | 32-bit IEEE 754 |

134

| double | DoubleType | 64-bit IEEE 754 |

135

| bytes | BinaryType | Variable-length byte array |

136

| string | StringType | UTF-8 encoded string |

137

138

### Complex Types

139

140

| Avro Type | Spark SQL Type | Notes |

141

|-----------|----------------|-------|

142

| record | StructType | Named fields with types |

143

| array | ArrayType | Homogeneous element type |

144

| map | MapType | String keys, typed values |

145

| union | StructType or nullable field | Depends on union content |

146

| enum | StringType | Enum values as strings |

147

| fixed | BinaryType | Fixed-length byte array |

148

149

### Logical Types

150

151

| Avro Logical Type | Spark SQL Type | Notes |

152

|-------------------|----------------|-------|

153

| decimal | DecimalType | Precision and scale preserved |

154

| date | DateType | Days since Unix epoch |

155

| time-millis | IntegerType | Milliseconds since midnight |

156

| time-micros | LongType | Microseconds since midnight |

157

| timestamp-millis | TimestampType | Milliseconds since Unix epoch |

158

| timestamp-micros | TimestampType | Microseconds since Unix epoch |

159

| duration | CalendarIntervalType | ISO-8601 duration |

160

161

## Advanced Features

162

163

### Union Type Handling

164

165

Avro unions are handled differently based on their composition:

166

167

```scala

168

// Simple nullable field: ["null", "string"] -> StringType(nullable=true)

169

val nullableString = """{"name": "optional_field", "type": ["null", "string"]}"""

170

171

// Complex union: ["string", "int"] -> StructType with member_0, member_1 fields

172

val complexUnion = """{"name": "mixed_field", "type": ["string", "int"]}"""

173

```

174

175

### Recursive Schema Support

176

177

The API handles recursive schemas with configurable depth limits:

178

179

```scala

180

val recursiveSchema = """{

181

"type": "record",

182

"name": "Node",

183

"fields": [

184

{"name": "value", "type": "string"},

185

{"name": "children", "type": {"type": "array", "items": "Node"}}

186

]

187

}"""

188

189

val converted = SchemaConverters.toSqlType(

190

new Schema.Parser().parse(recursiveSchema),

191

useStableIdForUnionType = false,

192

stableIdPrefixForUnionType = "",

193

recursiveFieldMaxDepth = 5 // Limit recursion to prevent stack overflow

194

)

195

```

196

197

### Schema Evolution Compatibility

198

199

The conversion API supports schema evolution patterns:

200

201

```scala

202

// Reader schema (newer version)

203

val readerSchema = """{

204

"type": "record",

205

"name": "User",

206

"fields": [

207

{"name": "id", "type": "long"},

208

{"name": "name", "type": "string"},

209

{"name": "email", "type": "string", "default": ""}

210

]

211

}"""

212

213

// Writer schema (older version) - missing email field

214

val writerSchema = """{

215

"type": "record",

216

"name": "User",

217

"fields": [

218

{"name": "id", "type": "long"},

219

{"name": "name", "type": "string"}

220

]

221

}"""

222

223

// Both schemas convert to compatible Spark SQL types

224

val readerSparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(readerSchema))

225

val writerSparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(writerSchema))

226

```

227

228

## Error Handling

229

230

The conversion API throws exceptions for unsupported schema patterns:

231

232

- **UnsupportedAvroTypeException**: Thrown for unsupported Avro types

233

- **IncompatibleSchemaException**: Thrown for incompatible schema conversions

234

- **IllegalArgumentException**: Thrown for invalid parameters

235

236

```scala

237

try {

238

val converted = SchemaConverters.toSqlType(complexAvroSchema)

239

} catch {

240

case e: UnsupportedAvroTypeException =>

241

println(s"Unsupported Avro type: ${e.getMessage}")

242

case e: IncompatibleSchemaException =>

243

println(s"Schema incompatibility: ${e.getMessage}")

244

}

245

```