Apache Spark connector for Protocol Buffers data source enabling seamless protobuf serialization and deserialization in Spark SQL.
npx @tessl/cli install tessl/maven-org-apache-spark--spark-protobuf_2-13@4.0.0Apache Spark Protobuf Connector enables seamless serialization and deserialization of Protocol Buffers data within Apache Spark SQL. It provides bidirectional conversion between protobuf binary data and Spark's Catalyst data structures through specialized SQL functions and comprehensive configuration options.
org.apache.spark:spark-protobuf_2.13:4.0.0import org.apache.spark.sql.protobuf.functions._For Java compatibility:
import static org.apache.spark.sql.protobuf.functions.*;import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.DataFrame
// Convert binary protobuf data to Spark rows
val df: DataFrame = spark.read.format("binaryFile").load("protobuf-data")
// Deserialize protobuf binary to structured data
val deserializedDF = df.select(
from_protobuf(col("content"), "MessageName", descriptorFile) as "data"
)
// Serialize structured data back to protobuf binary
val serializedDF = deserializedDF.select(
to_protobuf(col("data"), "MessageName", descriptorFile) as "protobuf_binary"
)The Spark Protobuf Connector is built around several key components:
from_protobuf and to_protobuf functions for DataFrame operationsConvert binary protobuf data to Spark SQL structured types using from_protobuf functions with support for various descriptor sources and configuration options.
def from_protobuf(
data: Column,
messageName: String,
descFilePath: String,
options: java.util.Map[String, String]
): Column
def from_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte],
options: java.util.Map[String, String]
): Column
def from_protobuf(
data: Column,
messageClassName: String,
options: java.util.Map[String, String]
): ColumnProtobuf to Catalyst Conversion
Convert Spark SQL structured types to binary protobuf data using to_protobuf functions with comprehensive serialization options.
def to_protobuf(
data: Column,
messageName: String,
descFilePath: String,
options: java.util.Map[String, String]
): Column
def to_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte],
options: java.util.Map[String, String]
): Column
def to_protobuf(
data: Column,
messageClassName: String,
options: java.util.Map[String, String]
): ColumnCatalyst to Protobuf Conversion
Comprehensive configuration system for controlling protobuf processing behavior including parse modes, type conversions, and schema handling.
class ProtobufOptions(
parameters: Map[String, String],
conf: Configuration
)
object ProtobufOptions {
def apply(parameters: Map[String, String]): ProtobufOptions
val CONVERT_ANY_FIELDS_TO_JSON_CONFIG: String
}Utilities for protobuf descriptor management, schema conversion, and type registry operations.
object ProtobufUtils {
def buildDescriptor(
messageName: String,
binaryFileDescriptorSet: Option[Array[Byte]]
): Descriptor
def buildDescriptorFromJavaClass(protobufClassName: String): Descriptor
def buildTypeRegistry(descriptorBytes: Array[Byte]): TypeRegistry
}
object SchemaConverters {
def toSqlType(
descriptor: Descriptor,
protobufOptions: ProtobufOptions
): SchemaType
}case class SchemaType(dataType: DataType, nullable: Boolean)
case class ProtoMatchedField(
catalystField: StructField,
catalystPosition: Int,
fieldDescriptor: FieldDescriptor
)
class ProtoSchemaHelper(
descriptor: Descriptor,
catalystSchema: StructType,
protoPath: Seq[String],
catalystPath: Seq[String]
) {
val matchedFields: Seq[ProtoMatchedField]
def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit
def validateNoExtraRequiredProtoFields(): Unit
def getFieldByName(name: String): Option[FieldDescriptor]
}