or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-protobuf_2.13@3.5.x

docs

configuration.mddeserialization.mdindex.mdschema-conversion.mdserialization.md
tile.json

tessl/maven-org-apache-spark--spark-protobuf_2-13

tessl install tessl/maven-org-apache-spark--spark-protobuf_2-13@3.5.0

Apache Spark connector for Protocol Buffer (protobuf) data format support, providing SQL functions to convert between binary protobuf data and Catalyst data structures for processing structured data in distributed big data analytics pipelines

deserialization.mddocs/

Protobuf Deserialization

Convert binary protobuf data to Spark SQL data structures with automatic schema inference and comprehensive type mapping.

Capabilities

From Protobuf with Descriptor File

Convert protobuf binary data using a protobuf descriptor file created with protoc.

/**
 * Converts binary protobuf column to Catalyst value using descriptor file
 * @param data - The binary column containing protobuf data
 * @param messageName - The protobuf message name to look for in descriptor file
 * @param descFilePath - Path to protobuf descriptor file (usually created with protoc --descriptor_set_out)
 * @return Column with converted Catalyst data structure
 * @since 3.4.0
 */
@Experimental
def from_protobuf(data: Column, messageName: String, descFilePath: String): Column

/**
 * Converts binary protobuf column to Catalyst value using descriptor file with options
 * @param data - The binary column containing protobuf data
 * @param messageName - The protobuf message name to look for in descriptor file  
 * @param descFilePath - Path to protobuf descriptor file
 * @param options - Configuration options for protobuf processing
 * @return Column with converted Catalyst data structure
 * @since 3.4.0
 */
@Experimental
def from_protobuf(
  data: Column,
  messageName: String, 
  descFilePath: String,
  options: java.util.Map[String, String]
): Column

From Protobuf with FileDescriptorSet

Convert protobuf binary data using a serialized FileDescriptorSet byte array.

/**
 * Converts binary protobuf column to Catalyst value using FileDescriptorSet
 * @param data - The binary column containing protobuf data
 * @param messageName - The protobuf message name to look for in the descriptor set
 * @param binaryFileDescriptorSet - Serialized FileDescriptorSet (typically from protoc --descriptor_set_out)
 * @return Column with converted Catalyst data structure  
 * @since 3.5.0
 */
@Experimental
def from_protobuf(
  data: Column,
  messageName: String,
  binaryFileDescriptorSet: Array[Byte]
): Column

/**
 * Converts binary protobuf column to Catalyst value using FileDescriptorSet with options
 * @param data - The binary column containing protobuf data
 * @param messageName - The protobuf message name to look for in the descriptor set
 * @param binaryFileDescriptorSet - Serialized FileDescriptorSet
 * @param options - Configuration options for protobuf processing
 * @return Column with converted Catalyst data structure
 * @since 3.5.0
 */
@Experimental  
def from_protobuf(
  data: Column,
  messageName: String,
  binaryFileDescriptorSet: Array[Byte],
  options: java.util.Map[String, String]
): Column

From Protobuf with Java Class

Convert protobuf binary data using a Java class. The jar containing the Java class must be shaded with com.google.protobuf.* relocated to org.sparkproject.spark_protobuf.protobuf.*.

/**
 * Converts binary protobuf column to Catalyst value using Java class
 * @param data - The binary column containing protobuf data
 * @param messageClassName - Full name of protobuf Java class (e.g., com.example.protos.ExampleEvent)
 * @return Column with converted Catalyst data structure
 * @since 3.4.0
 * @note The jar with protobuf classes needs to be shaded
 */
@Experimental
def from_protobuf(data: Column, messageClassName: String): Column

/**
 * Converts binary protobuf column to Catalyst value using Java class with options
 * @param data - The binary column containing protobuf data
 * @param messageClassName - Full name of protobuf Java class  
 * @param options - Configuration options for protobuf processing
 * @return Column with converted Catalyst data structure
 * @since 3.4.0
 */
@Experimental
def from_protobuf(
  data: Column,
  messageClassName: String,
  options: java.util.Map[String, String]
): Column

Usage Examples

Basic Deserialization with Descriptor File

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.protobuf.functions._

val spark = SparkSession.builder().appName("ProtobufDeserialization").getOrCreate()
import spark.implicits._

// Load binary protobuf data
val binaryData = spark.read.format("binaryFile").load("/path/to/protobuf/files")

// Convert using descriptor file
val decodedDF = binaryData.select(
  from_protobuf($"content", "PersonMessage", "/path/to/person.desc").as("person")
)

decodedDF.show()

Deserialization with Configuration Options

import java.util.{Map => JMap}
import scala.collection.JavaConverters._

// Configure options for handling recursive fields and enums
val options: JMap[String, String] = Map(
  "recursive.fields.max.depth" -> "2",
  "enums.as.ints" -> "true",
  "emit.default.values" -> "true"
).asJava

val decodedWithOptions = binaryData.select(
  from_protobuf($"content", "PersonMessage", "/path/to/person.desc", options).as("person")
)

Deserialization with FileDescriptorSet

import org.apache.spark.sql.protobuf.utils.ProtobufUtils

// Read descriptor set content
val descriptorSetBytes = ProtobufUtils.readDescriptorFileContent("/path/to/messages.desc")

val decodedFromDescSet = binaryData.select(
  from_protobuf($"content", "PersonMessage", descriptorSetBytes).as("person")
)

Deserialization with Java Class

// Using shaded protobuf Java class
val decodedFromClass = binaryData.select(
  from_protobuf($"content", "com.example.protos.PersonMessage").as("person")
)

// With options
val decodedFromClassWithOptions = binaryData.select(
  from_protobuf(
    $"content", 
    "com.example.protos.PersonMessage",
    Map("mode" -> "PERMISSIVE").asJava
  ).as("person")
)

Handling Complex Nested Messages

// For messages with nested structures and Any fields
val complexOptions: JMap[String, String] = Map(
  "convert.any.fields.to.json" -> "true",
  "recursive.fields.max.depth" -> "3",
  "mode" -> "PERMISSIVE"
).asJava

val decodedComplex = binaryData.select(
  from_protobuf($"content", "ComplexMessage", "/path/to/complex.desc", complexOptions)
    .as("complex_data")
)

// Access nested fields
decodedComplex.select(
  $"complex_data.header.timestamp",
  $"complex_data.payload.data",
  $"complex_data.metadata"
).show()

Type Mapping

The from_protobuf function automatically maps protobuf types to Spark SQL types:

  • INT32/SINT32/SFIXED32 → IntegerType
  • INT64/SINT64/SFIXED64 → LongType
  • FLOAT → FloatType
  • DOUBLE → DoubleType
  • BOOL → BooleanType
  • STRING → StringType
  • BYTES → BinaryType
  • ENUM → StringType (or IntegerType with enums.as.ints option)
  • MESSAGE → StructType (with field mapping)
  • REPEATED → ArrayType
  • MAP → MapType
  • google.protobuf.Timestamp → TimestampType
  • google.protobuf.Duration → DayTimeIntervalType
  • google.protobuf.Any → StringType (with convert.any.fields.to.json option)

Error Handling

The function supports different error handling modes:

  • FAILFAST (default): Fails immediately on any parsing error
  • PERMISSIVE: Continues processing, setting invalid records to null
val permissiveOptions = Map("mode" -> "PERMISSIVE").asJava

val decodedPermissive = binaryData.select(
  from_protobuf($"content", "MessageType", descriptorPath, permissiveOptions).as("data")
)