Functions for converting binary Protocol Buffer data to Spark SQL structured types with comprehensive configuration options and multiple descriptor source support.
Convert binary protobuf data using a descriptor file path on the filesystem.
/**
* Converts a binary column of Protobuf format into its corresponding catalyst value.
* @param data the binary column containing protobuf data
* @param messageName the protobuf message name to look for in descriptor file
* @param descFilePath the protobuf descriptor file path (created with protoc --descriptor_set_out)
* @param options configuration options for deserialization behavior
* @return Column with deserialized Spark SQL data
*/
def from_protobuf(
data: Column,
messageName: String,
descFilePath: String,
options: java.util.Map[String, String]
): Column
/**
* Converts a binary column of Protobuf format into its corresponding catalyst value.
* @param data the binary column containing protobuf data
* @param messageName the protobuf message name to look for in descriptor file
* @param descFilePath the protobuf descriptor file path (created with protoc --descriptor_set_out)
* @return Column with deserialized Spark SQL data
*/
def from_protobuf(
data: Column,
messageName: String,
descFilePath: String
): ColumnConvert binary protobuf data using a pre-loaded binary FileDescriptorSet.
/**
* Converts a binary column of Protobuf format into its corresponding catalyst value.
* @param data the binary column containing protobuf data
* @param messageName the protobuf MessageName to look for in the descriptor set
* @param binaryFileDescriptorSet serialized protobuf descriptor (FileDescriptorSet)
* @param options configuration options for deserialization behavior
* @return Column with deserialized Spark SQL data
*/
def from_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte],
options: java.util.Map[String, String]
): Column
/**
* Converts a binary column of Protobuf format into its corresponding catalyst value.
* @param data the binary column containing protobuf data
* @param messageName the protobuf MessageName to look for in the descriptor set
* @param binaryFileDescriptorSet serialized protobuf descriptor (FileDescriptorSet)
* @return Column with deserialized Spark SQL data
*/
def from_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte]
): ColumnConvert binary protobuf data using a Java class name (requires shaded protobuf classes).
/**
* Converts a binary column of Protobuf format into its corresponding catalyst value.
* The jar containing Java class should be shaded with com.google.protobuf.*
* relocated to org.sparkproject.spark_protobuf.protobuf.*
* @param data the binary column containing protobuf data
* @param messageClassName the full name for Protobuf Java class
* @param options configuration options for deserialization behavior
* @return Column with deserialized Spark SQL data
*/
def from_protobuf(
data: Column,
messageClassName: String,
options: java.util.Map[String, String]
): Column
/**
* Converts a binary column of Protobuf format into its corresponding catalyst value.
* The jar containing Java class should be shaded with com.google.protobuf.*
* relocated to org.sparkproject.spark_protobuf.protobuf.*
* @param data the binary column containing protobuf data
* @param messageClassName the full name for Protobuf Java class
* @return Column with deserialized Spark SQL data
*/
def from_protobuf(
data: Column,
messageClassName: String
): Columnimport org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.col
// Read binary protobuf data
val binaryDF = spark.read.format("binaryFile").load("/path/to/protobuf/files")
// Deserialize using descriptor file
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", "/path/to/person.desc") as "person_data"
)
deserializedDF.show(false)import scala.jdk.CollectionConverters._
val options = Map(
"mode" -> "PERMISSIVE",
"recursive.fields.max.depth" -> "3",
"emit.default.values" -> "true",
"enums.as.ints" -> "false"
).asJava
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", descriptorBytes, options) as "person_data"
)// Requires shaded protobuf JAR on classpath
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "com.example.protos.PersonMessage") as "person_data"
)val options = Map("mode" -> "PERMISSIVE").asJava
// PERMISSIVE mode: malformed records become null
val tolerantDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", descriptorFile, options) as "person_data"
).filter(col("person_data").isNotNull)
// FAILFAST mode: malformed records cause job failure
val strictOptions = Map("mode" -> "FAILFAST").asJava
val strictDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", descriptorFile, strictOptions) as "person_data"
)val options = Map("convert.any.fields.to.json" -> "true").asJava
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "MessageWithAny", descriptorBytes, options) as "data"
)
// Any fields will be converted to JSON strings instead of binary