or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfrom-protobuf.mdindex.mdschema-utilities.mdto-protobuf.md

schema-utilities.mddocs/

0

# Schema Utilities

1

2

Utilities for protobuf descriptor management, schema conversion, type registry operations, and field matching between protobuf and Spark SQL schemas.

3

4

## Capabilities

5

6

### ProtobufUtils Object

7

8

Core utilities for building protobuf descriptors and type registries.

9

10

```scala { .api }

11

object ProtobufUtils {

12

/**

13

* Builds Protobuf message descriptor from message name and optional binary descriptor set

14

* @param messageName protobuf message name or Java class name

15

* @param binaryFileDescriptorSet optional binary FileDescriptorSet

16

* @return Protobuf Descriptor instance

17

*/

18

def buildDescriptor(

19

messageName: String,

20

binaryFileDescriptorSet: Option[Array[Byte]]

21

): Descriptor

22

23

/**

24

* Loads protobuf descriptor from Java class using reflection

25

* @param protobufClassName fully qualified Java class name

26

* @return Protobuf Descriptor instance

27

*/

28

def buildDescriptorFromJavaClass(protobufClassName: String): Descriptor

29

30

/**

31

* Builds descriptor from binary descriptor set and message name

32

* @param binaryFileDescriptorSet binary FileDescriptorSet data

33

* @param messageName message name to find in descriptor set

34

* @return Protobuf Descriptor instance

35

*/

36

def buildDescriptor(binaryFileDescriptorSet: Array[Byte], messageName: String): Descriptor

37

38

/**

39

* Builds TypeRegistry with all messages found in descriptor set

40

* @param descriptorBytes binary FileDescriptorSet data

41

* @return TypeRegistry for Any field processing

42

*/

43

def buildTypeRegistry(descriptorBytes: Array[Byte]): TypeRegistry

44

45

/**

46

* Builds TypeRegistry with descriptor and others from same proto file

47

* @param descriptor message descriptor

48

* @return TypeRegistry for Any field processing

49

*/

50

def buildTypeRegistry(descriptor: Descriptor): TypeRegistry

51

52

/**

53

* Converts field name sequence to human-readable string

54

* @param names sequence of hierarchical field names

55

* @return readable field path string

56

*/

57

def toFieldStr(names: Seq[String]): String

58

}

59

```

60

61

### SchemaConverters Object

62

63

Utilities for converting between protobuf schemas and Spark SQL schemas.

64

65

```scala { .api }

66

object SchemaConverters {

67

/**

68

* Converts protobuf schema to corresponding Spark SQL schema

69

* @param descriptor protobuf message descriptor

70

* @param protobufOptions configuration options affecting conversion

71

* @return SchemaType with DataType and nullability information

72

*/

73

def toSqlType(

74

descriptor: Descriptor,

75

protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)

76

): SchemaType

77

}

78

```

79

80

### Schema Helper Classes

81

82

Classes for managing field matching and validation between protobuf and Catalyst schemas.

83

84

```scala { .api }

85

/**

86

* Wrapper for matched field pair between Catalyst and Protobuf schemas

87

* @param catalystField Spark SQL field definition

88

* @param catalystPosition position in Catalyst schema

89

* @param fieldDescriptor protobuf field descriptor

90

*/

91

case class ProtoMatchedField(

92

catalystField: StructField,

93

catalystPosition: Int,

94

fieldDescriptor: FieldDescriptor

95

)

96

97

/**

98

* Helper class for field lookup and matching between protobuf and Catalyst schemas

99

* @param descriptor protobuf message descriptor

100

* @param catalystSchema Catalyst StructType schema

101

* @param protoPath sequence of parent field names leading to protobuf schema

102

* @param catalystPath sequence of parent field names leading to Catalyst schema

103

*/

104

class ProtoSchemaHelper(

105

descriptor: Descriptor,

106

catalystSchema: StructType,

107

protoPath: Seq[String],

108

catalystPath: Seq[String]

109

) {

110

/** Fields with matching equivalents in both protobuf and Catalyst schemas */

111

val matchedFields: Seq[ProtoMatchedField]

112

113

/**

114

* Validates no extra Catalyst fields exist that don't match protobuf fields

115

* @param ignoreNullable whether to ignore nullable Catalyst fields in validation

116

*/

117

def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit

118

119

/**

120

* Validates no extra required protobuf fields exist that don't match Catalyst fields

121

*/

122

def validateNoExtraRequiredProtoFields(): Unit

123

124

/**

125

* Extracts protobuf field by name with case sensitivity handling

126

* @param name field name to search for

127

* @return Some(FieldDescriptor) if found, None otherwise

128

*/

129

def getFieldByName(name: String): Option[FieldDescriptor]

130

}

131

```

132

133

### Data Types

134

135

Schema-related data types used throughout the utilities.

136

137

```scala { .api }

138

/**

139

* Wrapper for SQL data type and nullability information

140

* @param dataType Spark SQL DataType

141

* @param nullable whether the field can contain null values

142

*/

143

case class SchemaType(dataType: DataType, nullable: Boolean)

144

```

145

146

## Usage Examples

147

148

### Building Descriptors

149

150

```scala

151

import org.apache.spark.sql.protobuf.utils.ProtobufUtils

152

153

// Build descriptor from Java class

154

val descriptor1 = ProtobufUtils.buildDescriptorFromJavaClass(

155

"com.example.protos.PersonMessage"

156

)

157

158

// Build descriptor from binary descriptor set

159

val descriptorBytes = Files.readAllBytes(Paths.get("/path/to/messages.desc"))

160

val descriptor2 = ProtobufUtils.buildDescriptor("PersonMessage", Some(descriptorBytes))

161

162

// Build descriptor with automatic detection

163

val descriptor3 = ProtobufUtils.buildDescriptor("PersonMessage", None) // Uses Java class

164

val descriptor4 = ProtobufUtils.buildDescriptor("PersonMessage", Some(descriptorBytes)) // Uses descriptor set

165

```

166

167

### Schema Conversion

168

169

```scala

170

import org.apache.spark.sql.protobuf.utils.{SchemaConverters, ProtobufOptions}

171

172

val options = ProtobufOptions(Map(

173

"emit.default.values" -> "true",

174

"enums.as.ints" -> "false"

175

))

176

177

val schemaType = SchemaConverters.toSqlType(descriptor, options)

178

println(s"Spark SQL schema: ${schemaType.dataType}")

179

println(s"Is nullable: ${schemaType.nullable}")

180

```

181

182

### Type Registry Creation

183

184

```scala

185

// Create type registry for Any field processing

186

val typeRegistry1 = ProtobufUtils.buildTypeRegistry(descriptorBytes)

187

188

// Create type registry from single descriptor

189

val typeRegistry2 = ProtobufUtils.buildTypeRegistry(descriptor)

190

191

// Use in protobuf deserialization with Any fields

192

val options = Map("convert.any.fields.to.json" -> "true").asJava

193

val deserializedDF = binaryDF.select(

194

from_protobuf(col("content"), "MessageWithAny", descriptorBytes, options) as "data"

195

)

196

```

197

198

### Schema Validation

199

200

```scala

201

import org.apache.spark.sql.protobuf.utils.ProtobufUtils.ProtoSchemaHelper

202

import org.apache.spark.sql.types._

203

204

val catalystSchema = StructType(Seq(

205

StructField("name", StringType, nullable = false),

206

StructField("age", IntegerType, nullable = false),

207

StructField("email", StringType, nullable = true)

208

))

209

210

val helper = new ProtoSchemaHelper(

211

descriptor = personDescriptor,

212

catalystSchema = catalystSchema,

213

protoPath = Seq.empty,

214

catalystPath = Seq.empty

215

)

216

217

// Get matched fields

218

val matchedFields = helper.matchedFields

219

println(s"Found ${matchedFields.size} matching fields")

220

221

// Validate schemas

222

try {

223

helper.validateNoExtraCatalystFields(ignoreNullable = false)

224

helper.validateNoExtraRequiredProtoFields()

225

println("Schema validation passed")

226

} catch {

227

case e: AnalysisException =>

228

println(s"Schema validation failed: ${e.getMessage}")

229

}

230

```

231

232

### Field Lookup

233

234

```scala

235

val helper = new ProtoSchemaHelper(descriptor, catalystSchema, Seq.empty, Seq.empty)

236

237

// Case-sensitive field lookup

238

helper.getFieldByName("userName") match {

239

case Some(field) => println(s"Found field: ${field.getName}")

240

case None => println("Field not found")

241

}

242

243

// Field matching respects Spark's case sensitivity configuration

244

val matchedFields = helper.matchedFields

245

matchedFields.foreach { matched =>

246

println(s"Catalyst field '${matched.catalystField.name}' matches " +

247

s"protobuf field '${matched.fieldDescriptor.getName}'")

248

}

249

```

250

251

### Error Message Formatting

252

253

```scala

254

// Convert field paths to readable strings

255

val fieldPath = Seq("person", "address", "street")

256

val readableField = ProtobufUtils.toFieldStr(fieldPath)

257

println(readableField) // "field 'person.address.street'"

258

259

val topLevel = ProtobufUtils.toFieldStr(Seq.empty)

260

println(topLevel) // "top-level record"

261

```

262

263

### Custom Schema Processing

264

265

```scala

266

def processProtoSchema(descriptor: Descriptor): Unit = {

267

val fields = descriptor.getFields.asScala

268

269

fields.foreach { field =>

270

val fieldType = field.getJavaType

271

val isRepeated = field.isRepeated

272

val isRequired = field.isRequired

273

274

println(s"Field: ${field.getName}")

275

println(s" Type: ${fieldType}")

276

println(s" Repeated: ${isRepeated}")

277

println(s" Required: ${isRequired}")

278

279

if (field.getJavaType == FieldDescriptor.JavaType.MESSAGE) {

280

println(s" Message type: ${field.getMessageType.getFullName}")

281

}

282

}

283

}

284

285

processProtoSchema(descriptor)

286

```