0
# Schema Utilities
1
2
Utilities for protobuf descriptor management, schema conversion, type registry operations, and field matching between protobuf and Spark SQL schemas.
3
4
## Capabilities
5
6
### ProtobufUtils Object
7
8
Core utilities for building protobuf descriptors and type registries.
9
10
```scala { .api }
11
object ProtobufUtils {
12
/**
13
* Builds Protobuf message descriptor from message name and optional binary descriptor set
14
* @param messageName protobuf message name or Java class name
15
* @param binaryFileDescriptorSet optional binary FileDescriptorSet
16
* @return Protobuf Descriptor instance
17
*/
18
def buildDescriptor(
19
messageName: String,
20
binaryFileDescriptorSet: Option[Array[Byte]]
21
): Descriptor
22
23
/**
24
* Loads protobuf descriptor from Java class using reflection
25
* @param protobufClassName fully qualified Java class name
26
* @return Protobuf Descriptor instance
27
*/
28
def buildDescriptorFromJavaClass(protobufClassName: String): Descriptor
29
30
/**
31
* Builds descriptor from binary descriptor set and message name
32
* @param binaryFileDescriptorSet binary FileDescriptorSet data
33
* @param messageName message name to find in descriptor set
34
* @return Protobuf Descriptor instance
35
*/
36
def buildDescriptor(binaryFileDescriptorSet: Array[Byte], messageName: String): Descriptor
37
38
/**
39
* Builds TypeRegistry with all messages found in descriptor set
40
* @param descriptorBytes binary FileDescriptorSet data
41
* @return TypeRegistry for Any field processing
42
*/
43
def buildTypeRegistry(descriptorBytes: Array[Byte]): TypeRegistry
44
45
/**
46
* Builds TypeRegistry with descriptor and others from same proto file
47
* @param descriptor message descriptor
48
* @return TypeRegistry for Any field processing
49
*/
50
def buildTypeRegistry(descriptor: Descriptor): TypeRegistry
51
52
/**
53
* Converts field name sequence to human-readable string
54
* @param names sequence of hierarchical field names
55
* @return readable field path string
56
*/
57
def toFieldStr(names: Seq[String]): String
58
}
59
```
60
61
### SchemaConverters Object
62
63
Utilities for converting between protobuf schemas and Spark SQL schemas.
64
65
```scala { .api }
66
object SchemaConverters {
67
/**
68
* Converts protobuf schema to corresponding Spark SQL schema
69
* @param descriptor protobuf message descriptor
70
* @param protobufOptions configuration options affecting conversion
71
* @return SchemaType with DataType and nullability information
72
*/
73
def toSqlType(
74
descriptor: Descriptor,
75
protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)
76
): SchemaType
77
}
78
```
79
80
### Schema Helper Classes
81
82
Classes for managing field matching and validation between protobuf and Catalyst schemas.
83
84
```scala { .api }
85
/**
86
* Wrapper for matched field pair between Catalyst and Protobuf schemas
87
* @param catalystField Spark SQL field definition
88
* @param catalystPosition position in Catalyst schema
89
* @param fieldDescriptor protobuf field descriptor
90
*/
91
case class ProtoMatchedField(
92
catalystField: StructField,
93
catalystPosition: Int,
94
fieldDescriptor: FieldDescriptor
95
)
96
97
/**
98
* Helper class for field lookup and matching between protobuf and Catalyst schemas
99
* @param descriptor protobuf message descriptor
100
* @param catalystSchema Catalyst StructType schema
101
* @param protoPath sequence of parent field names leading to protobuf schema
102
* @param catalystPath sequence of parent field names leading to Catalyst schema
103
*/
104
class ProtoSchemaHelper(
105
descriptor: Descriptor,
106
catalystSchema: StructType,
107
protoPath: Seq[String],
108
catalystPath: Seq[String]
109
) {
110
/** Fields with matching equivalents in both protobuf and Catalyst schemas */
111
val matchedFields: Seq[ProtoMatchedField]
112
113
/**
114
* Validates no extra Catalyst fields exist that don't match protobuf fields
115
* @param ignoreNullable whether to ignore nullable Catalyst fields in validation
116
*/
117
def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit
118
119
/**
120
* Validates no extra required protobuf fields exist that don't match Catalyst fields
121
*/
122
def validateNoExtraRequiredProtoFields(): Unit
123
124
/**
125
* Extracts protobuf field by name with case sensitivity handling
126
* @param name field name to search for
127
* @return Some(FieldDescriptor) if found, None otherwise
128
*/
129
def getFieldByName(name: String): Option[FieldDescriptor]
130
}
131
```
132
133
### Data Types
134
135
Schema-related data types used throughout the utilities.
136
137
```scala { .api }
138
/**
139
* Wrapper for SQL data type and nullability information
140
* @param dataType Spark SQL DataType
141
* @param nullable whether the field can contain null values
142
*/
143
case class SchemaType(dataType: DataType, nullable: Boolean)
144
```
145
146
## Usage Examples
147
148
### Building Descriptors
149
150
```scala
151
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
152
153
// Build descriptor from Java class
154
val descriptor1 = ProtobufUtils.buildDescriptorFromJavaClass(
155
"com.example.protos.PersonMessage"
156
)
157
158
// Build descriptor from binary descriptor set
159
val descriptorBytes = Files.readAllBytes(Paths.get("/path/to/messages.desc"))
160
val descriptor2 = ProtobufUtils.buildDescriptor("PersonMessage", Some(descriptorBytes))
161
162
// Build descriptor with automatic detection
163
val descriptor3 = ProtobufUtils.buildDescriptor("PersonMessage", None) // Uses Java class
164
val descriptor4 = ProtobufUtils.buildDescriptor("PersonMessage", Some(descriptorBytes)) // Uses descriptor set
165
```
166
167
### Schema Conversion
168
169
```scala
170
import org.apache.spark.sql.protobuf.utils.{SchemaConverters, ProtobufOptions}
171
172
val options = ProtobufOptions(Map(
173
"emit.default.values" -> "true",
174
"enums.as.ints" -> "false"
175
))
176
177
val schemaType = SchemaConverters.toSqlType(descriptor, options)
178
println(s"Spark SQL schema: ${schemaType.dataType}")
179
println(s"Is nullable: ${schemaType.nullable}")
180
```
181
182
### Type Registry Creation
183
184
```scala
185
// Create type registry for Any field processing
186
val typeRegistry1 = ProtobufUtils.buildTypeRegistry(descriptorBytes)
187
188
// Create type registry from single descriptor
189
val typeRegistry2 = ProtobufUtils.buildTypeRegistry(descriptor)
190
191
// Use in protobuf deserialization with Any fields
192
val options = Map("convert.any.fields.to.json" -> "true").asJava
193
val deserializedDF = binaryDF.select(
194
from_protobuf(col("content"), "MessageWithAny", descriptorBytes, options) as "data"
195
)
196
```
197
198
### Schema Validation
199
200
```scala
201
import org.apache.spark.sql.protobuf.utils.ProtobufUtils.ProtoSchemaHelper
202
import org.apache.spark.sql.types._
203
204
val catalystSchema = StructType(Seq(
205
StructField("name", StringType, nullable = false),
206
StructField("age", IntegerType, nullable = false),
207
StructField("email", StringType, nullable = true)
208
))
209
210
val helper = new ProtoSchemaHelper(
211
descriptor = personDescriptor,
212
catalystSchema = catalystSchema,
213
protoPath = Seq.empty,
214
catalystPath = Seq.empty
215
)
216
217
// Get matched fields
218
val matchedFields = helper.matchedFields
219
println(s"Found ${matchedFields.size} matching fields")
220
221
// Validate schemas
222
try {
223
helper.validateNoExtraCatalystFields(ignoreNullable = false)
224
helper.validateNoExtraRequiredProtoFields()
225
println("Schema validation passed")
226
} catch {
227
case e: AnalysisException =>
228
println(s"Schema validation failed: ${e.getMessage}")
229
}
230
```
231
232
### Field Lookup
233
234
```scala
235
val helper = new ProtoSchemaHelper(descriptor, catalystSchema, Seq.empty, Seq.empty)
236
237
// Case-sensitive field lookup
238
helper.getFieldByName("userName") match {
239
case Some(field) => println(s"Found field: ${field.getName}")
240
case None => println("Field not found")
241
}
242
243
// Field matching respects Spark's case sensitivity configuration
244
val matchedFields = helper.matchedFields
245
matchedFields.foreach { matched =>
246
println(s"Catalyst field '${matched.catalystField.name}' matches " +
247
s"protobuf field '${matched.fieldDescriptor.getName}'")
248
}
249
```
250
251
### Error Message Formatting
252
253
```scala
254
// Convert field paths to readable strings
255
val fieldPath = Seq("person", "address", "street")
256
val readableField = ProtobufUtils.toFieldStr(fieldPath)
257
println(readableField) // "field 'person.address.street'"
258
259
val topLevel = ProtobufUtils.toFieldStr(Seq.empty)
260
println(topLevel) // "top-level record"
261
```
262
263
### Custom Schema Processing
264
265
```scala
266
def processProtoSchema(descriptor: Descriptor): Unit = {
267
val fields = descriptor.getFields.asScala
268
269
fields.foreach { field =>
270
val fieldType = field.getJavaType
271
val isRepeated = field.isRepeated
272
val isRequired = field.isRequired
273
274
println(s"Field: ${field.getName}")
275
println(s" Type: ${fieldType}")
276
println(s" Repeated: ${isRepeated}")
277
println(s" Required: ${isRequired}")
278
279
if (field.getJavaType == FieldDescriptor.JavaType.MESSAGE) {
280
println(s" Message type: ${field.getMessageType.getFullName}")
281
}
282
}
283
}
284
285
processProtoSchema(descriptor)
286
```