Apache Spark connector for Protocol Buffers data source enabling seamless protobuf serialization and deserialization in Spark SQL.
Functions for converting binary Protocol Buffer data to Spark SQL structured types with comprehensive configuration options and multiple descriptor source support.
Convert binary protobuf data using a descriptor file path on the filesystem.
/**
* Converts a binary column of Protobuf format into its corresponding catalyst value.
* @param data the binary column containing protobuf data
* @param messageName the protobuf message name to look for in descriptor file
* @param descFilePath the protobuf descriptor file path (created with protoc --descriptor_set_out)
* @param options configuration options for deserialization behavior
* @return Column with deserialized Spark SQL data
*/
def from_protobuf(
data: Column,
messageName: String,
descFilePath: String,
options: java.util.Map[String, String]
): Column
/**
* Converts a binary column of Protobuf format into its corresponding catalyst value.
* @param data the binary column containing protobuf data
* @param messageName the protobuf message name to look for in descriptor file
* @param descFilePath the protobuf descriptor file path (created with protoc --descriptor_set_out)
* @return Column with deserialized Spark SQL data
*/
def from_protobuf(
data: Column,
messageName: String,
descFilePath: String
): ColumnConvert binary protobuf data using a pre-loaded binary FileDescriptorSet.
/**
* Converts a binary column of Protobuf format into its corresponding catalyst value.
* @param data the binary column containing protobuf data
* @param messageName the protobuf MessageName to look for in the descriptor set
* @param binaryFileDescriptorSet serialized protobuf descriptor (FileDescriptorSet)
* @param options configuration options for deserialization behavior
* @return Column with deserialized Spark SQL data
*/
def from_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte],
options: java.util.Map[String, String]
): Column
/**
* Converts a binary column of Protobuf format into its corresponding catalyst value.
* @param data the binary column containing protobuf data
* @param messageName the protobuf MessageName to look for in the descriptor set
* @param binaryFileDescriptorSet serialized protobuf descriptor (FileDescriptorSet)
* @return Column with deserialized Spark SQL data
*/
def from_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte]
): ColumnConvert binary protobuf data using a Java class name (requires shaded protobuf classes).
/**
* Converts a binary column of Protobuf format into its corresponding catalyst value.
* The jar containing Java class should be shaded with com.google.protobuf.*
* relocated to org.sparkproject.spark_protobuf.protobuf.*
* @param data the binary column containing protobuf data
* @param messageClassName the full name for Protobuf Java class
* @param options configuration options for deserialization behavior
* @return Column with deserialized Spark SQL data
*/
def from_protobuf(
data: Column,
messageClassName: String,
options: java.util.Map[String, String]
): Column
/**
* Converts a binary column of Protobuf format into its corresponding catalyst value.
* The jar containing Java class should be shaded with com.google.protobuf.*
* relocated to org.sparkproject.spark_protobuf.protobuf.*
* @param data the binary column containing protobuf data
* @param messageClassName the full name for Protobuf Java class
* @return Column with deserialized Spark SQL data
*/
def from_protobuf(
data: Column,
messageClassName: String
): Columnimport org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.col
// Read binary protobuf data
val binaryDF = spark.read.format("binaryFile").load("/path/to/protobuf/files")
// Deserialize using descriptor file
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", "/path/to/person.desc") as "person_data"
)
deserializedDF.show(false)import scala.jdk.CollectionConverters._
val options = Map(
"mode" -> "PERMISSIVE",
"recursive.fields.max.depth" -> "3",
"emit.default.values" -> "true",
"enums.as.ints" -> "false"
).asJava
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", descriptorBytes, options) as "person_data"
)// Requires shaded protobuf JAR on classpath
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "com.example.protos.PersonMessage") as "person_data"
)val options = Map("mode" -> "PERMISSIVE").asJava
// PERMISSIVE mode: malformed records become null
val tolerantDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", descriptorFile, options) as "person_data"
).filter(col("person_data").isNotNull)
// FAILFAST mode: malformed records cause job failure
val strictOptions = Map("mode" -> "FAILFAST").asJava
val strictDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", descriptorFile, strictOptions) as "person_data"
)val options = Map("convert.any.fields.to.json" -> "true").asJava
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "MessageWithAny", descriptorBytes, options) as "data"
)
// Any fields will be converted to JSON strings instead of binaryInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-protobuf-2-13