tessl install tessl/maven-org-apache-spark--spark-protobuf_2-13@3.5.0Apache Spark connector for Protocol Buffer (protobuf) data format support, providing SQL functions to convert between binary protobuf data and Catalyst data structures for processing structured data in distributed big data analytics pipelines
Apache Spark connector for Protocol Buffer (protobuf) data format support, providing SQL functions to convert between binary protobuf data and Catalyst data structures for processing structured data in distributed big data analytics pipelines.
maven:org.apache.spark:spark-protobuf_2.13:3.5.6import org.apache.spark.sql.protobuf.functions._For utility classes:
import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, SchemaConverters}import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.protobuf.functions._
val spark = SparkSession.builder().appName("ProtobufExample").getOrCreate()
import spark.implicits._
// Convert binary protobuf data to Spark DataFrame
val df = Seq(protobufBinaryData).toDF("binary_data")
// Using descriptor file
val decoded = df.select(
from_protobuf($"binary_data", "MessageName", "/path/to/descriptor.desc").as("decoded")
)
// Convert DataFrame back to protobuf binary
val encoded = decoded.select(
to_protobuf($"decoded", "MessageName", "/path/to/descriptor.desc").as("binary")
)The Spark Protobuf connector is built around several key components:
from_protobuf, to_protobuf) for data conversionConvert binary protobuf data to Spark SQL data structures with schema inference and type safety.
// Using descriptor file
def from_protobuf(data: Column, messageName: String, descFilePath: String): Column
// Using descriptor file with options
def from_protobuf(
data: Column,
messageName: String,
descFilePath: String,
options: java.util.Map[String, String]
): Column
// Using binary FileDescriptorSet
def from_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte]
): Column
// Using Java class
def from_protobuf(data: Column, messageClassName: String): ColumnConvert Spark SQL data structures to binary protobuf format for output and storage.
// Using descriptor file
def to_protobuf(data: Column, messageName: String, descFilePath: String): Column
// Using descriptor file with options
def to_protobuf(
data: Column,
messageName: String,
descFilePath: String,
options: java.util.Map[String, String]
): Column
// Using binary FileDescriptorSet
def to_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte]
): Column
// Using Java class
def to_protobuf(data: Column, messageClassName: String): ColumnTools for converting between Protobuf descriptors and Spark SQL schemas with advanced type mapping.
object SchemaConverters {
def toSqlType(
descriptor: Descriptor,
protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)
): SchemaType
}
case class SchemaType(dataType: DataType, nullable: Boolean)Comprehensive configuration system for controlling protobuf processing behavior, error handling, and advanced features.
class ProtobufOptions(
parameters: CaseInsensitiveMap[String],
conf: Configuration
)
object ProtobufOptions {
def apply(parameters: Map[String, String]): ProtobufOptions
}// Core Spark types
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.{DataType, StructType, StructField}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, ParseMode, FailFastMode, PermissiveMode}
import org.apache.spark.sql.catalyst.FileSourceOptions
import org.apache.hadoop.conf.Configuration
// Protobuf types
import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
import com.google.protobuf.TypeRegistry
// Utility types
case class ProtoMatchedField(
catalystField: StructField,
catalystPosition: Int,
fieldDescriptor: FieldDescriptor
)
case class SchemaType(dataType: DataType, nullable: Boolean)
class ProtoSchemaHelper(
descriptor: Descriptor,
catalystSchema: StructType,
protoPath: Seq[String],
catalystPath: Seq[String]
) {
val matchedFields: Seq[ProtoMatchedField]
def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit
def validateNoExtraRequiredProtoFields(): Unit
def getFieldByName(name: String): Option[FieldDescriptor]
}
class ProtobufOptions(
parameters: CaseInsensitiveMap[String],
conf: Configuration
) extends FileSourceOptions(parameters) {
val parseMode: ParseMode
val recursiveFieldMaxDepth: Int
val convertAnyFieldsToJson: Boolean
val emitDefaultValues: Boolean
val enumsAsInts: Boolean
}
object ProtobufOptions {
def apply(parameters: Map[String, String]): ProtobufOptions
val CONVERT_ANY_FIELDS_TO_JSON_CONFIG: String
}
object ProtobufUtils {
def buildDescriptor(messageName: String, binaryFileDescriptorSet: Option[Array[Byte]]): Descriptor
def buildDescriptor(binaryFileDescriptorSet: Array[Byte], messageName: String): Descriptor
def buildDescriptorFromJavaClass(protobufClassName: String): Descriptor
def readDescriptorFileContent(filePath: String): Array[Byte]
def buildTypeRegistry(descriptorBytes: Array[Byte]): TypeRegistry
def buildTypeRegistry(descriptor: Descriptor): TypeRegistry
}
object SchemaConverters {
def toSqlType(descriptor: Descriptor, protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): SchemaType
}