tessl install tessl/maven-org-apache-spark--spark-protobuf_2-13@3.5.0Apache Spark connector for Protocol Buffer (protobuf) data format support, providing SQL functions to convert between binary protobuf data and Catalyst data structures for processing structured data in distributed big data analytics pipelines
Convert binary protobuf data to Spark SQL data structures with automatic schema inference and comprehensive type mapping.
Convert protobuf binary data using a protobuf descriptor file created with protoc.
/**
* Converts binary protobuf column to Catalyst value using descriptor file
* @param data - The binary column containing protobuf data
* @param messageName - The protobuf message name to look for in descriptor file
* @param descFilePath - Path to protobuf descriptor file (usually created with protoc --descriptor_set_out)
* @return Column with converted Catalyst data structure
* @since 3.4.0
*/
@Experimental
def from_protobuf(data: Column, messageName: String, descFilePath: String): Column
/**
* Converts binary protobuf column to Catalyst value using descriptor file with options
* @param data - The binary column containing protobuf data
* @param messageName - The protobuf message name to look for in descriptor file
* @param descFilePath - Path to protobuf descriptor file
* @param options - Configuration options for protobuf processing
* @return Column with converted Catalyst data structure
* @since 3.4.0
*/
@Experimental
def from_protobuf(
data: Column,
messageName: String,
descFilePath: String,
options: java.util.Map[String, String]
): ColumnConvert protobuf binary data using a serialized FileDescriptorSet byte array.
/**
* Converts binary protobuf column to Catalyst value using FileDescriptorSet
* @param data - The binary column containing protobuf data
* @param messageName - The protobuf message name to look for in the descriptor set
* @param binaryFileDescriptorSet - Serialized FileDescriptorSet (typically from protoc --descriptor_set_out)
* @return Column with converted Catalyst data structure
* @since 3.5.0
*/
@Experimental
def from_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte]
): Column
/**
* Converts binary protobuf column to Catalyst value using FileDescriptorSet with options
* @param data - The binary column containing protobuf data
* @param messageName - The protobuf message name to look for in the descriptor set
* @param binaryFileDescriptorSet - Serialized FileDescriptorSet
* @param options - Configuration options for protobuf processing
* @return Column with converted Catalyst data structure
* @since 3.5.0
*/
@Experimental
def from_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte],
options: java.util.Map[String, String]
): ColumnConvert protobuf binary data using a Java class. The jar containing the Java class must be shaded with com.google.protobuf.* relocated to org.sparkproject.spark_protobuf.protobuf.*.
/**
* Converts binary protobuf column to Catalyst value using Java class
* @param data - The binary column containing protobuf data
* @param messageClassName - Full name of protobuf Java class (e.g., com.example.protos.ExampleEvent)
* @return Column with converted Catalyst data structure
* @since 3.4.0
* @note The jar with protobuf classes needs to be shaded
*/
@Experimental
def from_protobuf(data: Column, messageClassName: String): Column
/**
* Converts binary protobuf column to Catalyst value using Java class with options
* @param data - The binary column containing protobuf data
* @param messageClassName - Full name of protobuf Java class
* @param options - Configuration options for protobuf processing
* @return Column with converted Catalyst data structure
* @since 3.4.0
*/
@Experimental
def from_protobuf(
data: Column,
messageClassName: String,
options: java.util.Map[String, String]
): Columnimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.protobuf.functions._
val spark = SparkSession.builder().appName("ProtobufDeserialization").getOrCreate()
import spark.implicits._
// Load binary protobuf data
val binaryData = spark.read.format("binaryFile").load("/path/to/protobuf/files")
// Convert using descriptor file
val decodedDF = binaryData.select(
from_protobuf($"content", "PersonMessage", "/path/to/person.desc").as("person")
)
decodedDF.show()import java.util.{Map => JMap}
import scala.collection.JavaConverters._
// Configure options for handling recursive fields and enums
val options: JMap[String, String] = Map(
"recursive.fields.max.depth" -> "2",
"enums.as.ints" -> "true",
"emit.default.values" -> "true"
).asJava
val decodedWithOptions = binaryData.select(
from_protobuf($"content", "PersonMessage", "/path/to/person.desc", options).as("person")
)import org.apache.spark.sql.protobuf.utils.ProtobufUtils
// Read descriptor set content
val descriptorSetBytes = ProtobufUtils.readDescriptorFileContent("/path/to/messages.desc")
val decodedFromDescSet = binaryData.select(
from_protobuf($"content", "PersonMessage", descriptorSetBytes).as("person")
)// Using shaded protobuf Java class
val decodedFromClass = binaryData.select(
from_protobuf($"content", "com.example.protos.PersonMessage").as("person")
)
// With options
val decodedFromClassWithOptions = binaryData.select(
from_protobuf(
$"content",
"com.example.protos.PersonMessage",
Map("mode" -> "PERMISSIVE").asJava
).as("person")
)// For messages with nested structures and Any fields
val complexOptions: JMap[String, String] = Map(
"convert.any.fields.to.json" -> "true",
"recursive.fields.max.depth" -> "3",
"mode" -> "PERMISSIVE"
).asJava
val decodedComplex = binaryData.select(
from_protobuf($"content", "ComplexMessage", "/path/to/complex.desc", complexOptions)
.as("complex_data")
)
// Access nested fields
decodedComplex.select(
$"complex_data.header.timestamp",
$"complex_data.payload.data",
$"complex_data.metadata"
).show()The from_protobuf function automatically maps protobuf types to Spark SQL types:
enums.as.ints option)convert.any.fields.to.json option)The function supports different error handling modes:
val permissiveOptions = Map("mode" -> "PERMISSIVE").asJava
val decodedPermissive = binaryData.select(
from_protobuf($"content", "MessageType", descriptorPath, permissiveOptions).as("data")
)