tessl install tessl/maven-org-apache-spark--spark-protobuf_2-13@3.5.0Apache Spark connector for Protocol Buffer (protobuf) data format support, providing SQL functions to convert between binary protobuf data and Catalyst data structures for processing structured data in distributed big data analytics pipelines
Tools for converting between Protobuf descriptors and Spark SQL schemas with advanced type mapping and schema inference capabilities.
Main utility object providing schema conversion functionality between Protobuf and Spark SQL.
/**
* Utility object for converting Protobuf schemas to Spark SQL schemas
* @since 3.4.0
*/
@DeveloperApi
object SchemaConverters {
/**
* Converts a Protobuf schema to corresponding Spark SQL schema
* @param descriptor - The Protobuf message descriptor
* @param protobufOptions - Configuration options for schema conversion
* @return SchemaType containing the converted DataType and nullability
* @since 3.4.0
*/
def toSqlType(
descriptor: Descriptor,
protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)
): SchemaType
}Internal wrapper for SQL data type and nullability information.
/**
* Internal wrapper for SQL data type and nullability
* @param dataType - The Spark SQL DataType
* @param nullable - Whether the type can contain null values
* @since 3.4.0
*/
case class SchemaType(dataType: DataType, nullable: Boolean)Helper class for performing field lookup and matching between Protobuf and Catalyst schemas.
/**
* Helper class for field lookup/matching on Protobuf schemas
* @param descriptor - The Protobuf descriptor to search for fields
* @param catalystSchema - The Catalyst schema to use for matching
* @param protoPath - Sequence of parent field names leading to protoSchema
* @param catalystPath - Sequence of parent field names leading to catalystSchema
*/
class ProtoSchemaHelper(
descriptor: Descriptor,
catalystSchema: StructType,
protoPath: Seq[String],
catalystPath: Seq[String]
) {
/** The fields which have matching equivalents in both Protobuf and Catalyst schemas */
val matchedFields: Seq[ProtoMatchedField]
/** Validate that there are no extra Catalyst fields without Protobuf matches */
def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit
/** Validate that there are no extra required Protobuf fields without Catalyst matches */
def validateNoExtraRequiredProtoFields(): Unit
/** Extract a field from the Protobuf schema by name with case sensitivity handling */
def getFieldByName(name: String): Option[FieldDescriptor]
}Wrapper for paired Catalyst and Protobuf fields.
/**
* Wrapper for a pair of matched fields, one Catalyst and one corresponding Protobuf field
* @param catalystField - The Catalyst StructField
* @param catalystPosition - Position of the field in the Catalyst schema
* @param fieldDescriptor - The corresponding Protobuf FieldDescriptor
*/
case class ProtoMatchedField(
catalystField: StructField,
catalystPosition: Int,
fieldDescriptor: FieldDescriptor
)Core utility functions for building descriptors and working with protobuf schemas.
/**
* Utility object providing protobuf descriptor building and management functions
*/
object ProtobufUtils {
/**
* Builds Protobuf message descriptor from Java class or serialized descriptor
* @param messageName - Protobuf message name or Java class name (when binaryFileDescriptorSet is None)
* @param binaryFileDescriptorSet - When provided, descriptor and dependencies are read from it
* @return The built Protobuf Descriptor
*/
def buildDescriptor(
messageName: String,
binaryFileDescriptorSet: Option[Array[Byte]]
): Descriptor
/**
* Builds descriptor from binary FileDescriptorSet and message name
* @param binaryFileDescriptorSet - Serialized FileDescriptorSet bytes
* @param messageName - The protobuf message name to find
* @return The built Protobuf Descriptor
*/
def buildDescriptor(
binaryFileDescriptorSet: Array[Byte],
messageName: String
): Descriptor
/**
* Builds descriptor from Java class using reflection
* @param protobufClassName - Full Java class name (e.g., com.example.MyMessage)
* @return The built Protobuf Descriptor
*/
def buildDescriptorFromJavaClass(protobufClassName: String): Descriptor
/**
* Reads protobuf descriptor file content from filesystem
* @param filePath - Path to the descriptor file
* @return Byte array containing the descriptor file content
*/
def readDescriptorFileContent(filePath: String): Array[Byte]
/**
* Builds TypeRegistry from descriptor bytes for Any field support
* @param descriptorBytes - Serialized descriptor bytes
* @return TypeRegistry for protobuf Any type resolution
*/
def buildTypeRegistry(descriptorBytes: Array[Byte]): TypeRegistry
/**
* Builds TypeRegistry from descriptor for Any field support
* @param descriptor - Protobuf message descriptor
* @return TypeRegistry for protobuf Any type resolution
*/
def buildTypeRegistry(descriptor: Descriptor): TypeRegistry
}import org.apache.spark.sql.protobuf.utils.{SchemaConverters, ProtobufUtils, ProtobufOptions}
import com.google.protobuf.{Descriptors, TypeRegistry}
import com.google.protobuf.Descriptors.Descriptor
// Build descriptor from file
val descriptor: Descriptor = ProtobufUtils.buildDescriptor(
"PersonMessage",
Some(ProtobufUtils.readDescriptorFileContent("/path/to/person.desc"))
)
// Convert to Spark SQL schema
val schemaType = SchemaConverters.toSqlType(descriptor)
val sparkSchema = schemaType.dataType
println(s"Converted schema: ${sparkSchema.prettyJson}")
println(s"Is nullable: ${schemaType.nullable}")import scala.collection.JavaConverters._
// Configure schema conversion options
val options = ProtobufOptions(Map(
"recursive.fields.max.depth" -> "3",
"enums.as.ints" -> "true",
"convert.any.fields.to.json" -> "true"
))
val schemaWithOptions = SchemaConverters.toSqlType(descriptor, options)
// The resulting schema will have:
// - Recursive fields limited to depth 3
// - Enums as IntegerType instead of StringType
// - Any fields as StringType for JSON conversionimport org.apache.spark.sql.types._
import org.apache.spark.sql.protobuf.utils.ProtobufUtils.ProtoSchemaHelper
// Define expected Catalyst schema
val catalystSchema = StructType(Array(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false),
StructField("email", StringType, nullable = true)
))
// Create schema helper for validation
val schemaHelper = new ProtoSchemaHelper(
descriptor = descriptor,
catalystSchema = catalystSchema,
protoPath = Seq.empty,
catalystPath = Seq.empty
)
// Get matched fields
val matchedFields = schemaHelper.matchedFields
matchedFields.foreach { matched =>
println(s"Matched: ${matched.catalystField.name} -> ${matched.fieldDescriptor.getName}")
}
// Validate schema compatibility
try {
schemaHelper.validateNoExtraCatalystFields(ignoreNullable = false)
schemaHelper.validateNoExtraRequiredProtoFields()
println("Schema validation passed")
} catch {
case e: Exception =>
println(s"Schema validation failed: ${e.getMessage}")
}// Example protobuf message:
// message ComplexMessage {
// string name = 1;
// repeated int32 numbers = 2;
// map<string, string> metadata = 3;
// google.protobuf.Timestamp created_at = 4;
// google.protobuf.Duration timeout = 5;
// NestedMessage nested = 6;
// optional string description = 7;
// }
val complexDescriptor = ProtobufUtils.buildDescriptor("ComplexMessage", descriptorBytes)
val complexSchema = SchemaConverters.toSqlType(complexDescriptor)
// Resulting Spark schema structure:
// StructType(Array(
// StructField("name", StringType, false),
// StructField("numbers", ArrayType(IntegerType), false),
// StructField("metadata", MapType(StringType, StringType), false),
// StructField("created_at", TimestampType, false),
// StructField("timeout", DayTimeIntervalType.defaultConcreteType, false),
// StructField("nested", StructType(...), false),
// StructField("description", StringType, true) // optional field
// ))// Example with recursive message:
// message Person {
// string name = 1;
// Person friend = 2; // recursive field
// }
val recursiveOptions = ProtobufOptions(Map(
"recursive.fields.max.depth" -> "2"
))
val recursiveSchema = SchemaConverters.toSqlType(personDescriptor, recursiveOptions)
// With depth 2, the schema becomes:
// StructType(Array(
// StructField("name", StringType, false),
// StructField("friend", StructType(Array(
// StructField("name", StringType, false),
// StructField("friend", StructType(Array(
// StructField("name", StringType, false)
// // friend field truncated at depth 2
// )), true)
// )), true)
// ))// Example protobuf with enum:
// enum Status {
// UNKNOWN = 0;
// ACTIVE = 1;
// INACTIVE = 2;
// }
// message StatusMessage {
// Status status = 1;
// }
// Default behavior (enums as strings)
val enumAsStringSchema = SchemaConverters.toSqlType(statusDescriptor)
// Results in: StructField("status", StringType, false)
// Enums as integers
val enumAsIntOptions = ProtobufOptions(Map("enums.as.ints" -> "true"))
val enumAsIntSchema = SchemaConverters.toSqlType(statusDescriptor, enumAsIntOptions)
// Results in: StructField("status", IntegerType, false)import org.apache.spark.sql.types._
def inspectSchema(schemaType: SchemaType): Unit = {
def printSchema(dataType: DataType, indent: String = ""): Unit = {
dataType match {
case struct: StructType =>
println(s"${indent}StructType:")
struct.fields.foreach { field =>
println(s"${indent} ${field.name}: ${field.dataType.typeName} (nullable: ${field.nullable})")
if (field.dataType.isInstanceOf[StructType] || field.dataType.isInstanceOf[ArrayType]) {
printSchema(field.dataType, indent + " ")
}
}
case array: ArrayType =>
println(s"${indent}ArrayType of ${array.elementType.typeName}")
printSchema(array.elementType, indent + " ")
case map: MapType =>
println(s"${indent}MapType: ${map.keyType.typeName} -> ${map.valueType.typeName}")
printSchema(map.valueType, indent + " ")
case other =>
println(s"${indent}${other.typeName}")
}
}
printSchema(schemaType.dataType)
}
// Usage
val schema = SchemaConverters.toSqlType(descriptor)
inspectSchema(schema)import org.apache.spark.sql.protobuf.utils.ProtobufUtils
import com.google.protobuf.{Descriptors, TypeRegistry}
// Build descriptor from descriptor file
val descriptorBytes = ProtobufUtils.readDescriptorFileContent("/path/to/messages.desc")
val descriptor = ProtobufUtils.buildDescriptor("PersonMessage", Some(descriptorBytes))
// Build descriptor directly from bytes and message name
val descriptor2 = ProtobufUtils.buildDescriptor(descriptorBytes, "PersonMessage")
// Build descriptor from Java class (requires shaded protobuf classes)
val javaDescriptor = ProtobufUtils.buildDescriptorFromJavaClass(
"com.example.protos.PersonMessage"
)
// Build TypeRegistry for Any field support
val typeRegistry = ProtobufUtils.buildTypeRegistry(descriptorBytes)
val typeRegistryFromDescriptor = ProtobufUtils.buildTypeRegistry(descriptor)// Reading multiple descriptor files
val descriptorFiles = List(
"/path/to/person.desc",
"/path/to/address.desc",
"/path/to/common.desc"
)
val allDescriptors = descriptorFiles.map { filePath =>
val bytes = ProtobufUtils.readDescriptorFileContent(filePath)
// Note: In practice, you'd combine these into a single FileDescriptorSet
bytes
}
// Build descriptors for different message types
val personDescriptor = ProtobufUtils.buildDescriptor("Person", Some(allDescriptors.head))
val addressDescriptor = ProtobufUtils.buildDescriptor("Address", Some(allDescriptors(1)))
// Create schemas for each
val personSchema = SchemaConverters.toSqlType(personDescriptor)
val addressSchema = SchemaConverters.toSqlType(addressDescriptor)import scala.util.{Try, Success, Failure}
def safelyBuildDescriptor(messageName: String, filePath: String): Option[Descriptor] = {
Try {
val bytes = ProtobufUtils.readDescriptorFileContent(filePath)
ProtobufUtils.buildDescriptor(messageName, Some(bytes))
} match {
case Success(descriptor) => Some(descriptor)
case Failure(exception) =>
println(s"Failed to build descriptor for $messageName: ${exception.getMessage}")
None
}
}
// Usage
safelyBuildDescriptor("PersonMessage", "/path/to/person.desc") match {
case Some(descriptor) =>
val schema = SchemaConverters.toSqlType(descriptor)
println(s"Successfully built schema: ${schema.dataType.prettyJson}")
case None =>
println("Failed to build descriptor")
}Complete mapping between Protobuf and Spark SQL types:
| Protobuf Type | Spark SQL Type | Notes |
|---|---|---|
| int32, sint32, sfixed32 | IntegerType | 32-bit signed integer |
| int64, sint64, sfixed64 | LongType | 64-bit signed integer |
| uint32, fixed32 | LongType | Unsigned 32-bit as signed 64-bit |
| uint64, fixed64 | DecimalType | Unsigned 64-bit as decimal |
| float | FloatType | 32-bit floating point |
| double | DoubleType | 64-bit floating point |
| bool | BooleanType | Boolean value |
| string | StringType | UTF-8 string |
| bytes | BinaryType | Byte array |
| enum | StringType / IntegerType | Configurable via enums.as.ints |
| message | StructType | Nested structure |
| repeated T | ArrayType(T) | Array of type T |
| map<K,V> | MapType(K,V) | Map with key type K, value type V |
| google.protobuf.Timestamp | TimestampType | Timestamp with nanosecond precision |
| google.protobuf.Duration | DayTimeIntervalType | Time interval |
| google.protobuf.Any | StringType | JSON string with convert.any.fields.to.json |
Common schema conversion errors and solutions:
// Handle schema conversion errors
try {
val schema = SchemaConverters.toSqlType(descriptor, options)
// Use schema
} catch {
case e: IllegalArgumentException if e.getMessage.contains("recursive") =>
println("Recursive field depth exceeded, increase recursive.fields.max.depth")
case e: IllegalStateException if e.getMessage.contains("descriptor") =>
println("Invalid protobuf descriptor")
case e: Exception =>
println(s"Schema conversion error: ${e.getMessage}")
}