or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-protobuf_2.13@3.5.x

docs

configuration.mddeserialization.mdindex.mdschema-conversion.mdserialization.md
tile.json

tessl/maven-org-apache-spark--spark-protobuf_2-13

tessl install tessl/maven-org-apache-spark--spark-protobuf_2-13@3.5.0

Apache Spark connector for Protocol Buffer (protobuf) data format support, providing SQL functions to convert between binary protobuf data and Catalyst data structures for processing structured data in distributed big data analytics pipelines

schema-conversion.mddocs/

Schema Conversion

Tools for converting between Protobuf descriptors and Spark SQL schemas with advanced type mapping and schema inference capabilities.

Capabilities

Schema Converters Object

Main utility object providing schema conversion functionality between Protobuf and Spark SQL.

/**
 * Utility object for converting Protobuf schemas to Spark SQL schemas
 * @since 3.4.0
 */
@DeveloperApi
object SchemaConverters {
  /**
   * Converts a Protobuf schema to corresponding Spark SQL schema
   * @param descriptor - The Protobuf message descriptor
   * @param protobufOptions - Configuration options for schema conversion
   * @return SchemaType containing the converted DataType and nullability
   * @since 3.4.0
   */
  def toSqlType(
    descriptor: Descriptor,
    protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)
  ): SchemaType
}

Schema Type Wrapper

Internal wrapper for SQL data type and nullability information.

/**
 * Internal wrapper for SQL data type and nullability
 * @param dataType - The Spark SQL DataType
 * @param nullable - Whether the type can contain null values
 * @since 3.4.0
 */
case class SchemaType(dataType: DataType, nullable: Boolean)

Proto Schema Helper

Helper class for performing field lookup and matching between Protobuf and Catalyst schemas.

/**
 * Helper class for field lookup/matching on Protobuf schemas
 * @param descriptor - The Protobuf descriptor to search for fields
 * @param catalystSchema - The Catalyst schema to use for matching
 * @param protoPath - Sequence of parent field names leading to protoSchema
 * @param catalystPath - Sequence of parent field names leading to catalystSchema
 */
class ProtoSchemaHelper(
  descriptor: Descriptor,
  catalystSchema: StructType,
  protoPath: Seq[String],
  catalystPath: Seq[String]
) {
  /** The fields which have matching equivalents in both Protobuf and Catalyst schemas */
  val matchedFields: Seq[ProtoMatchedField]
  
  /** Validate that there are no extra Catalyst fields without Protobuf matches */
  def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit
  
  /** Validate that there are no extra required Protobuf fields without Catalyst matches */
  def validateNoExtraRequiredProtoFields(): Unit
  
  /** Extract a field from the Protobuf schema by name with case sensitivity handling */
  def getFieldByName(name: String): Option[FieldDescriptor]
}

Proto Matched Field

Wrapper for paired Catalyst and Protobuf fields.

/**
 * Wrapper for a pair of matched fields, one Catalyst and one corresponding Protobuf field
 * @param catalystField - The Catalyst StructField
 * @param catalystPosition - Position of the field in the Catalyst schema
 * @param fieldDescriptor - The corresponding Protobuf FieldDescriptor
 */
case class ProtoMatchedField(
  catalystField: StructField,
  catalystPosition: Int,
  fieldDescriptor: FieldDescriptor
)

Protobuf Utilities Object

Core utility functions for building descriptors and working with protobuf schemas.

/**
 * Utility object providing protobuf descriptor building and management functions
 */
object ProtobufUtils {
  /**
   * Builds Protobuf message descriptor from Java class or serialized descriptor
   * @param messageName - Protobuf message name or Java class name (when binaryFileDescriptorSet is None)
   * @param binaryFileDescriptorSet - When provided, descriptor and dependencies are read from it
   * @return The built Protobuf Descriptor
   */
  def buildDescriptor(
    messageName: String, 
    binaryFileDescriptorSet: Option[Array[Byte]]
  ): Descriptor
  
  /**
   * Builds descriptor from binary FileDescriptorSet and message name
   * @param binaryFileDescriptorSet - Serialized FileDescriptorSet bytes
   * @param messageName - The protobuf message name to find
   * @return The built Protobuf Descriptor
   */
  def buildDescriptor(
    binaryFileDescriptorSet: Array[Byte], 
    messageName: String
  ): Descriptor
  
  /**
   * Builds descriptor from Java class using reflection
   * @param protobufClassName - Full Java class name (e.g., com.example.MyMessage)
   * @return The built Protobuf Descriptor
   */
  def buildDescriptorFromJavaClass(protobufClassName: String): Descriptor
  
  /**
   * Reads protobuf descriptor file content from filesystem
   * @param filePath - Path to the descriptor file
   * @return Byte array containing the descriptor file content
   */
  def readDescriptorFileContent(filePath: String): Array[Byte]
  
  /**
   * Builds TypeRegistry from descriptor bytes for Any field support
   * @param descriptorBytes - Serialized descriptor bytes
   * @return TypeRegistry for protobuf Any type resolution
   */
  def buildTypeRegistry(descriptorBytes: Array[Byte]): TypeRegistry
  
  /**
   * Builds TypeRegistry from descriptor for Any field support
   * @param descriptor - Protobuf message descriptor
   * @return TypeRegistry for protobuf Any type resolution
   */
  def buildTypeRegistry(descriptor: Descriptor): TypeRegistry
}

Usage Examples

Basic Schema Conversion

import org.apache.spark.sql.protobuf.utils.{SchemaConverters, ProtobufUtils, ProtobufOptions}
import com.google.protobuf.{Descriptors, TypeRegistry}
import com.google.protobuf.Descriptors.Descriptor

// Build descriptor from file
val descriptor: Descriptor = ProtobufUtils.buildDescriptor(
  "PersonMessage", 
  Some(ProtobufUtils.readDescriptorFileContent("/path/to/person.desc"))
)

// Convert to Spark SQL schema
val schemaType = SchemaConverters.toSqlType(descriptor)
val sparkSchema = schemaType.dataType

println(s"Converted schema: ${sparkSchema.prettyJson}")
println(s"Is nullable: ${schemaType.nullable}")

Schema Conversion with Options

import scala.collection.JavaConverters._

// Configure schema conversion options
val options = ProtobufOptions(Map(
  "recursive.fields.max.depth" -> "3",
  "enums.as.ints" -> "true",
  "convert.any.fields.to.json" -> "true"
))

val schemaWithOptions = SchemaConverters.toSqlType(descriptor, options)

// The resulting schema will have:
// - Recursive fields limited to depth 3
// - Enums as IntegerType instead of StringType  
// - Any fields as StringType for JSON conversion

Schema Validation and Matching

import org.apache.spark.sql.types._
import org.apache.spark.sql.protobuf.utils.ProtobufUtils.ProtoSchemaHelper

// Define expected Catalyst schema
val catalystSchema = StructType(Array(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false),
  StructField("email", StringType, nullable = true)
))

// Create schema helper for validation
val schemaHelper = new ProtoSchemaHelper(
  descriptor = descriptor,
  catalystSchema = catalystSchema,
  protoPath = Seq.empty,
  catalystPath = Seq.empty
)

// Get matched fields
val matchedFields = schemaHelper.matchedFields
matchedFields.foreach { matched =>
  println(s"Matched: ${matched.catalystField.name} -> ${matched.fieldDescriptor.getName}")
}

// Validate schema compatibility
try {
  schemaHelper.validateNoExtraCatalystFields(ignoreNullable = false)
  schemaHelper.validateNoExtraRequiredProtoFields()
  println("Schema validation passed")
} catch {
  case e: Exception =>
    println(s"Schema validation failed: ${e.getMessage}")
}

Complex Type Mapping Examples

// Example protobuf message:
// message ComplexMessage {
//   string name = 1;
//   repeated int32 numbers = 2;
//   map<string, string> metadata = 3;
//   google.protobuf.Timestamp created_at = 4;
//   google.protobuf.Duration timeout = 5;
//   NestedMessage nested = 6;
//   optional string description = 7;
// }

val complexDescriptor = ProtobufUtils.buildDescriptor("ComplexMessage", descriptorBytes)
val complexSchema = SchemaConverters.toSqlType(complexDescriptor)

// Resulting Spark schema structure:
// StructType(Array(
//   StructField("name", StringType, false),
//   StructField("numbers", ArrayType(IntegerType), false),
//   StructField("metadata", MapType(StringType, StringType), false),
//   StructField("created_at", TimestampType, false),
//   StructField("timeout", DayTimeIntervalType.defaultConcreteType, false),
//   StructField("nested", StructType(...), false),
//   StructField("description", StringType, true)  // optional field
// ))

Recursive Field Handling

// Example with recursive message:
// message Person {
//   string name = 1;
//   Person friend = 2;  // recursive field
// }

val recursiveOptions = ProtobufOptions(Map(
  "recursive.fields.max.depth" -> "2"
))

val recursiveSchema = SchemaConverters.toSqlType(personDescriptor, recursiveOptions)

// With depth 2, the schema becomes:
// StructType(Array(
//   StructField("name", StringType, false),
//   StructField("friend", StructType(Array(
//     StructField("name", StringType, false),
//     StructField("friend", StructType(Array(
//       StructField("name", StringType, false)
//       // friend field truncated at depth 2
//     )), true)
//   )), true)
// ))

Enum Type Handling

// Example protobuf with enum:
// enum Status {
//   UNKNOWN = 0;
//   ACTIVE = 1;
//   INACTIVE = 2;
// }
// message StatusMessage {
//   Status status = 1;
// }

// Default behavior (enums as strings)
val enumAsStringSchema = SchemaConverters.toSqlType(statusDescriptor)
// Results in: StructField("status", StringType, false)

// Enums as integers
val enumAsIntOptions = ProtobufOptions(Map("enums.as.ints" -> "true"))
val enumAsIntSchema = SchemaConverters.toSqlType(statusDescriptor, enumAsIntOptions)
// Results in: StructField("status", IntegerType, false)

Schema Inspection Utilities

import org.apache.spark.sql.types._

def inspectSchema(schemaType: SchemaType): Unit = {
  def printSchema(dataType: DataType, indent: String = ""): Unit = {
    dataType match {
      case struct: StructType =>
        println(s"${indent}StructType:")
        struct.fields.foreach { field =>
          println(s"${indent}  ${field.name}: ${field.dataType.typeName} (nullable: ${field.nullable})")
          if (field.dataType.isInstanceOf[StructType] || field.dataType.isInstanceOf[ArrayType]) {
            printSchema(field.dataType, indent + "    ")
          }
        }
      case array: ArrayType =>
        println(s"${indent}ArrayType of ${array.elementType.typeName}")
        printSchema(array.elementType, indent + "  ")
      case map: MapType =>
        println(s"${indent}MapType: ${map.keyType.typeName} -> ${map.valueType.typeName}")
        printSchema(map.valueType, indent + "  ")
      case other =>
        println(s"${indent}${other.typeName}")
    }
  }
  
  printSchema(schemaType.dataType)
}

// Usage
val schema = SchemaConverters.toSqlType(descriptor)
inspectSchema(schema)

Descriptor Building Examples

import org.apache.spark.sql.protobuf.utils.ProtobufUtils
import com.google.protobuf.{Descriptors, TypeRegistry}

// Build descriptor from descriptor file
val descriptorBytes = ProtobufUtils.readDescriptorFileContent("/path/to/messages.desc")
val descriptor = ProtobufUtils.buildDescriptor("PersonMessage", Some(descriptorBytes))

// Build descriptor directly from bytes and message name
val descriptor2 = ProtobufUtils.buildDescriptor(descriptorBytes, "PersonMessage")

// Build descriptor from Java class (requires shaded protobuf classes)
val javaDescriptor = ProtobufUtils.buildDescriptorFromJavaClass(
  "com.example.protos.PersonMessage"
)

// Build TypeRegistry for Any field support
val typeRegistry = ProtobufUtils.buildTypeRegistry(descriptorBytes)
val typeRegistryFromDescriptor = ProtobufUtils.buildTypeRegistry(descriptor)

Advanced Descriptor Management

// Reading multiple descriptor files
val descriptorFiles = List(
  "/path/to/person.desc",
  "/path/to/address.desc",
  "/path/to/common.desc"
)

val allDescriptors = descriptorFiles.map { filePath =>
  val bytes = ProtobufUtils.readDescriptorFileContent(filePath)
  // Note: In practice, you'd combine these into a single FileDescriptorSet
  bytes
}

// Build descriptors for different message types
val personDescriptor = ProtobufUtils.buildDescriptor("Person", Some(allDescriptors.head))
val addressDescriptor = ProtobufUtils.buildDescriptor("Address", Some(allDescriptors(1)))

// Create schemas for each
val personSchema = SchemaConverters.toSqlType(personDescriptor)
val addressSchema = SchemaConverters.toSqlType(addressDescriptor)

Error Handling with Descriptor Building

import scala.util.{Try, Success, Failure}

def safelyBuildDescriptor(messageName: String, filePath: String): Option[Descriptor] = {
  Try {
    val bytes = ProtobufUtils.readDescriptorFileContent(filePath)
    ProtobufUtils.buildDescriptor(messageName, Some(bytes))
  } match {
    case Success(descriptor) => Some(descriptor)
    case Failure(exception) =>
      println(s"Failed to build descriptor for $messageName: ${exception.getMessage}")
      None
  }
}

// Usage
safelyBuildDescriptor("PersonMessage", "/path/to/person.desc") match {
  case Some(descriptor) =>
    val schema = SchemaConverters.toSqlType(descriptor)
    println(s"Successfully built schema: ${schema.dataType.prettyJson}")
  case None =>
    println("Failed to build descriptor")
}

Type Mapping Reference

Complete mapping between Protobuf and Spark SQL types:

Protobuf TypeSpark SQL TypeNotes
int32, sint32, sfixed32IntegerType32-bit signed integer
int64, sint64, sfixed64LongType64-bit signed integer
uint32, fixed32LongTypeUnsigned 32-bit as signed 64-bit
uint64, fixed64DecimalTypeUnsigned 64-bit as decimal
floatFloatType32-bit floating point
doubleDoubleType64-bit floating point
boolBooleanTypeBoolean value
stringStringTypeUTF-8 string
bytesBinaryTypeByte array
enumStringType / IntegerTypeConfigurable via enums.as.ints
messageStructTypeNested structure
repeated TArrayType(T)Array of type T
map<K,V>MapType(K,V)Map with key type K, value type V
google.protobuf.TimestampTimestampTypeTimestamp with nanosecond precision
google.protobuf.DurationDayTimeIntervalTypeTime interval
google.protobuf.AnyStringTypeJSON string with convert.any.fields.to.json

Error Handling

Common schema conversion errors and solutions:

// Handle schema conversion errors
try {
  val schema = SchemaConverters.toSqlType(descriptor, options)
  // Use schema
} catch {
  case e: IllegalArgumentException if e.getMessage.contains("recursive") =>
    println("Recursive field depth exceeded, increase recursive.fields.max.depth")
  case e: IllegalStateException if e.getMessage.contains("descriptor") =>
    println("Invalid protobuf descriptor")
  case e: Exception =>
    println(s"Schema conversion error: ${e.getMessage}")
}