Utilities for protobuf descriptor management, schema conversion, type registry operations, and field matching between protobuf and Spark SQL schemas.
Core utilities for building protobuf descriptors and type registries.
object ProtobufUtils {
/**
* Builds Protobuf message descriptor from message name and optional binary descriptor set
* @param messageName protobuf message name or Java class name
* @param binaryFileDescriptorSet optional binary FileDescriptorSet
* @return Protobuf Descriptor instance
*/
def buildDescriptor(
messageName: String,
binaryFileDescriptorSet: Option[Array[Byte]]
): Descriptor
/**
* Loads protobuf descriptor from Java class using reflection
* @param protobufClassName fully qualified Java class name
* @return Protobuf Descriptor instance
*/
def buildDescriptorFromJavaClass(protobufClassName: String): Descriptor
/**
* Builds descriptor from binary descriptor set and message name
* @param binaryFileDescriptorSet binary FileDescriptorSet data
* @param messageName message name to find in descriptor set
* @return Protobuf Descriptor instance
*/
def buildDescriptor(binaryFileDescriptorSet: Array[Byte], messageName: String): Descriptor
/**
* Builds TypeRegistry with all messages found in descriptor set
* @param descriptorBytes binary FileDescriptorSet data
* @return TypeRegistry for Any field processing
*/
def buildTypeRegistry(descriptorBytes: Array[Byte]): TypeRegistry
/**
* Builds TypeRegistry with descriptor and others from same proto file
* @param descriptor message descriptor
* @return TypeRegistry for Any field processing
*/
def buildTypeRegistry(descriptor: Descriptor): TypeRegistry
/**
* Converts field name sequence to human-readable string
* @param names sequence of hierarchical field names
* @return readable field path string
*/
def toFieldStr(names: Seq[String]): String
}Utilities for converting between protobuf schemas and Spark SQL schemas.
object SchemaConverters {
/**
* Converts protobuf schema to corresponding Spark SQL schema
* @param descriptor protobuf message descriptor
* @param protobufOptions configuration options affecting conversion
* @return SchemaType with DataType and nullability information
*/
def toSqlType(
descriptor: Descriptor,
protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)
): SchemaType
}Classes for managing field matching and validation between protobuf and Catalyst schemas.
/**
* Wrapper for matched field pair between Catalyst and Protobuf schemas
* @param catalystField Spark SQL field definition
* @param catalystPosition position in Catalyst schema
* @param fieldDescriptor protobuf field descriptor
*/
case class ProtoMatchedField(
catalystField: StructField,
catalystPosition: Int,
fieldDescriptor: FieldDescriptor
)
/**
* Helper class for field lookup and matching between protobuf and Catalyst schemas
* @param descriptor protobuf message descriptor
* @param catalystSchema Catalyst StructType schema
* @param protoPath sequence of parent field names leading to protobuf schema
* @param catalystPath sequence of parent field names leading to Catalyst schema
*/
class ProtoSchemaHelper(
descriptor: Descriptor,
catalystSchema: StructType,
protoPath: Seq[String],
catalystPath: Seq[String]
) {
/** Fields with matching equivalents in both protobuf and Catalyst schemas */
val matchedFields: Seq[ProtoMatchedField]
/**
* Validates no extra Catalyst fields exist that don't match protobuf fields
* @param ignoreNullable whether to ignore nullable Catalyst fields in validation
*/
def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit
/**
* Validates no extra required protobuf fields exist that don't match Catalyst fields
*/
def validateNoExtraRequiredProtoFields(): Unit
/**
* Extracts protobuf field by name with case sensitivity handling
* @param name field name to search for
* @return Some(FieldDescriptor) if found, None otherwise
*/
def getFieldByName(name: String): Option[FieldDescriptor]
}Schema-related data types used throughout the utilities.
/**
* Wrapper for SQL data type and nullability information
* @param dataType Spark SQL DataType
* @param nullable whether the field can contain null values
*/
case class SchemaType(dataType: DataType, nullable: Boolean)import org.apache.spark.sql.protobuf.utils.ProtobufUtils
// Build descriptor from Java class
val descriptor1 = ProtobufUtils.buildDescriptorFromJavaClass(
"com.example.protos.PersonMessage"
)
// Build descriptor from binary descriptor set
val descriptorBytes = Files.readAllBytes(Paths.get("/path/to/messages.desc"))
val descriptor2 = ProtobufUtils.buildDescriptor("PersonMessage", Some(descriptorBytes))
// Build descriptor with automatic detection
val descriptor3 = ProtobufUtils.buildDescriptor("PersonMessage", None) // Uses Java class
val descriptor4 = ProtobufUtils.buildDescriptor("PersonMessage", Some(descriptorBytes)) // Uses descriptor setimport org.apache.spark.sql.protobuf.utils.{SchemaConverters, ProtobufOptions}
val options = ProtobufOptions(Map(
"emit.default.values" -> "true",
"enums.as.ints" -> "false"
))
val schemaType = SchemaConverters.toSqlType(descriptor, options)
println(s"Spark SQL schema: ${schemaType.dataType}")
println(s"Is nullable: ${schemaType.nullable}")// Create type registry for Any field processing
val typeRegistry1 = ProtobufUtils.buildTypeRegistry(descriptorBytes)
// Create type registry from single descriptor
val typeRegistry2 = ProtobufUtils.buildTypeRegistry(descriptor)
// Use in protobuf deserialization with Any fields
val options = Map("convert.any.fields.to.json" -> "true").asJava
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "MessageWithAny", descriptorBytes, options) as "data"
)import org.apache.spark.sql.protobuf.utils.ProtobufUtils.ProtoSchemaHelper
import org.apache.spark.sql.types._
val catalystSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false),
StructField("email", StringType, nullable = true)
))
val helper = new ProtoSchemaHelper(
descriptor = personDescriptor,
catalystSchema = catalystSchema,
protoPath = Seq.empty,
catalystPath = Seq.empty
)
// Get matched fields
val matchedFields = helper.matchedFields
println(s"Found ${matchedFields.size} matching fields")
// Validate schemas
try {
helper.validateNoExtraCatalystFields(ignoreNullable = false)
helper.validateNoExtraRequiredProtoFields()
println("Schema validation passed")
} catch {
case e: AnalysisException =>
println(s"Schema validation failed: ${e.getMessage}")
}val helper = new ProtoSchemaHelper(descriptor, catalystSchema, Seq.empty, Seq.empty)
// Case-sensitive field lookup
helper.getFieldByName("userName") match {
case Some(field) => println(s"Found field: ${field.getName}")
case None => println("Field not found")
}
// Field matching respects Spark's case sensitivity configuration
val matchedFields = helper.matchedFields
matchedFields.foreach { matched =>
println(s"Catalyst field '${matched.catalystField.name}' matches " +
s"protobuf field '${matched.fieldDescriptor.getName}'")
}// Convert field paths to readable strings
val fieldPath = Seq("person", "address", "street")
val readableField = ProtobufUtils.toFieldStr(fieldPath)
println(readableField) // "field 'person.address.street'"
val topLevel = ProtobufUtils.toFieldStr(Seq.empty)
println(topLevel) // "top-level record"def processProtoSchema(descriptor: Descriptor): Unit = {
val fields = descriptor.getFields.asScala
fields.foreach { field =>
val fieldType = field.getJavaType
val isRepeated = field.isRepeated
val isRequired = field.isRequired
println(s"Field: ${field.getName}")
println(s" Type: ${fieldType}")
println(s" Repeated: ${isRepeated}")
println(s" Required: ${isRequired}")
if (field.getJavaType == FieldDescriptor.JavaType.MESSAGE) {
println(s" Message type: ${field.getMessageType.getFullName}")
}
}
}
processProtoSchema(descriptor)