or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdfrom-protobuf.mdindex.mdschema-utilities.mdto-protobuf.md
tile.json

schema-utilities.mddocs/

Schema Utilities

Utilities for protobuf descriptor management, schema conversion, type registry operations, and field matching between protobuf and Spark SQL schemas.

Capabilities

ProtobufUtils Object

Core utilities for building protobuf descriptors and type registries.

object ProtobufUtils {
  /**
   * Builds Protobuf message descriptor from message name and optional binary descriptor set
   * @param messageName protobuf message name or Java class name
   * @param binaryFileDescriptorSet optional binary FileDescriptorSet
   * @return Protobuf Descriptor instance
   */
  def buildDescriptor(
    messageName: String, 
    binaryFileDescriptorSet: Option[Array[Byte]]
  ): Descriptor

  /**
   * Loads protobuf descriptor from Java class using reflection
   * @param protobufClassName fully qualified Java class name  
   * @return Protobuf Descriptor instance
   */
  def buildDescriptorFromJavaClass(protobufClassName: String): Descriptor

  /**
   * Builds descriptor from binary descriptor set and message name
   * @param binaryFileDescriptorSet binary FileDescriptorSet data
   * @param messageName message name to find in descriptor set
   * @return Protobuf Descriptor instance
   */
  def buildDescriptor(binaryFileDescriptorSet: Array[Byte], messageName: String): Descriptor

  /**
   * Builds TypeRegistry with all messages found in descriptor set
   * @param descriptorBytes binary FileDescriptorSet data
   * @return TypeRegistry for Any field processing
   */
  def buildTypeRegistry(descriptorBytes: Array[Byte]): TypeRegistry

  /**
   * Builds TypeRegistry with descriptor and others from same proto file
   * @param descriptor message descriptor
   * @return TypeRegistry for Any field processing
   */
  def buildTypeRegistry(descriptor: Descriptor): TypeRegistry

  /**
   * Converts field name sequence to human-readable string
   * @param names sequence of hierarchical field names
   * @return readable field path string
   */
  def toFieldStr(names: Seq[String]): String
}

SchemaConverters Object

Utilities for converting between protobuf schemas and Spark SQL schemas.

object SchemaConverters {
  /**
   * Converts protobuf schema to corresponding Spark SQL schema
   * @param descriptor protobuf message descriptor
   * @param protobufOptions configuration options affecting conversion
   * @return SchemaType with DataType and nullability information
   */
  def toSqlType(
    descriptor: Descriptor, 
    protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)
  ): SchemaType
}

Schema Helper Classes

Classes for managing field matching and validation between protobuf and Catalyst schemas.

/**
 * Wrapper for matched field pair between Catalyst and Protobuf schemas
 * @param catalystField Spark SQL field definition
 * @param catalystPosition position in Catalyst schema
 * @param fieldDescriptor protobuf field descriptor
 */
case class ProtoMatchedField(
  catalystField: StructField,
  catalystPosition: Int,
  fieldDescriptor: FieldDescriptor
)

/**
 * Helper class for field lookup and matching between protobuf and Catalyst schemas
 * @param descriptor protobuf message descriptor
 * @param catalystSchema Catalyst StructType schema
 * @param protoPath sequence of parent field names leading to protobuf schema
 * @param catalystPath sequence of parent field names leading to Catalyst schema
 */
class ProtoSchemaHelper(
  descriptor: Descriptor,
  catalystSchema: StructType,
  protoPath: Seq[String],
  catalystPath: Seq[String]
) {
  /** Fields with matching equivalents in both protobuf and Catalyst schemas */
  val matchedFields: Seq[ProtoMatchedField]
  
  /**
   * Validates no extra Catalyst fields exist that don't match protobuf fields
   * @param ignoreNullable whether to ignore nullable Catalyst fields in validation
   */
  def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit
  
  /**
   * Validates no extra required protobuf fields exist that don't match Catalyst fields
   */
  def validateNoExtraRequiredProtoFields(): Unit
  
  /**
   * Extracts protobuf field by name with case sensitivity handling
   * @param name field name to search for
   * @return Some(FieldDescriptor) if found, None otherwise
   */
  def getFieldByName(name: String): Option[FieldDescriptor]
}

Data Types

Schema-related data types used throughout the utilities.

/**
 * Wrapper for SQL data type and nullability information
 * @param dataType Spark SQL DataType
 * @param nullable whether the field can contain null values
 */
case class SchemaType(dataType: DataType, nullable: Boolean)

Usage Examples

Building Descriptors

import org.apache.spark.sql.protobuf.utils.ProtobufUtils

// Build descriptor from Java class
val descriptor1 = ProtobufUtils.buildDescriptorFromJavaClass(
  "com.example.protos.PersonMessage"
)

// Build descriptor from binary descriptor set
val descriptorBytes = Files.readAllBytes(Paths.get("/path/to/messages.desc"))
val descriptor2 = ProtobufUtils.buildDescriptor("PersonMessage", Some(descriptorBytes))

// Build descriptor with automatic detection
val descriptor3 = ProtobufUtils.buildDescriptor("PersonMessage", None) // Uses Java class
val descriptor4 = ProtobufUtils.buildDescriptor("PersonMessage", Some(descriptorBytes)) // Uses descriptor set

Schema Conversion

import org.apache.spark.sql.protobuf.utils.{SchemaConverters, ProtobufOptions}

val options = ProtobufOptions(Map(
  "emit.default.values" -> "true",
  "enums.as.ints" -> "false"
))

val schemaType = SchemaConverters.toSqlType(descriptor, options)
println(s"Spark SQL schema: ${schemaType.dataType}")
println(s"Is nullable: ${schemaType.nullable}")

Type Registry Creation

// Create type registry for Any field processing
val typeRegistry1 = ProtobufUtils.buildTypeRegistry(descriptorBytes)

// Create type registry from single descriptor
val typeRegistry2 = ProtobufUtils.buildTypeRegistry(descriptor)

// Use in protobuf deserialization with Any fields
val options = Map("convert.any.fields.to.json" -> "true").asJava
val deserializedDF = binaryDF.select(
  from_protobuf(col("content"), "MessageWithAny", descriptorBytes, options) as "data"
)

Schema Validation

import org.apache.spark.sql.protobuf.utils.ProtobufUtils.ProtoSchemaHelper
import org.apache.spark.sql.types._

val catalystSchema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false),
  StructField("email", StringType, nullable = true)
))

val helper = new ProtoSchemaHelper(
  descriptor = personDescriptor,
  catalystSchema = catalystSchema,
  protoPath = Seq.empty,
  catalystPath = Seq.empty
)

// Get matched fields
val matchedFields = helper.matchedFields
println(s"Found ${matchedFields.size} matching fields")

// Validate schemas
try {
  helper.validateNoExtraCatalystFields(ignoreNullable = false)
  helper.validateNoExtraRequiredProtoFields()
  println("Schema validation passed")
} catch {
  case e: AnalysisException => 
    println(s"Schema validation failed: ${e.getMessage}")
}

Field Lookup

val helper = new ProtoSchemaHelper(descriptor, catalystSchema, Seq.empty, Seq.empty)

// Case-sensitive field lookup
helper.getFieldByName("userName") match {
  case Some(field) => println(s"Found field: ${field.getName}")
  case None => println("Field not found")
}

// Field matching respects Spark's case sensitivity configuration
val matchedFields = helper.matchedFields
matchedFields.foreach { matched =>
  println(s"Catalyst field '${matched.catalystField.name}' matches " +
          s"protobuf field '${matched.fieldDescriptor.getName}'")
}

Error Message Formatting

// Convert field paths to readable strings
val fieldPath = Seq("person", "address", "street")
val readableField = ProtobufUtils.toFieldStr(fieldPath)
println(readableField) // "field 'person.address.street'"

val topLevel = ProtobufUtils.toFieldStr(Seq.empty)
println(topLevel) // "top-level record"

Custom Schema Processing

def processProtoSchema(descriptor: Descriptor): Unit = {
  val fields = descriptor.getFields.asScala
  
  fields.foreach { field =>
    val fieldType = field.getJavaType
    val isRepeated = field.isRepeated
    val isRequired = field.isRequired
    
    println(s"Field: ${field.getName}")
    println(s"  Type: ${fieldType}")  
    println(s"  Repeated: ${isRepeated}")
    println(s"  Required: ${isRequired}")
    
    if (field.getJavaType == FieldDescriptor.JavaType.MESSAGE) {
      println(s"  Message type: ${field.getMessageType.getFullName}")
    }
  }
}

processProtoSchema(descriptor)