or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-protobuf_2.13@3.5.x

docs

configuration.mddeserialization.mdindex.mdschema-conversion.mdserialization.md
tile.json

tessl/maven-org-apache-spark--spark-protobuf_2-13

tessl install tessl/maven-org-apache-spark--spark-protobuf_2-13@3.5.0

Apache Spark connector for Protocol Buffer (protobuf) data format support, providing SQL functions to convert between binary protobuf data and Catalyst data structures for processing structured data in distributed big data analytics pipelines

serialization.mddocs/

Protobuf Serialization

Convert Spark SQL data structures to binary protobuf format for output, storage, and data interchange with external systems.

Capabilities

To Protobuf with Descriptor File

Convert Catalyst data structures to protobuf binary using a protobuf descriptor file.

/**
 * Converts column to binary protobuf format using descriptor file
 * @param data - The data column to convert
 * @param messageName - The protobuf message name to look for in descriptor file
 * @param descFilePath - Path to protobuf descriptor file (created with protoc --descriptor_set_out)
 * @return Column with binary protobuf data
 * @since 3.4.0
 */
@Experimental
def to_protobuf(data: Column, messageName: String, descFilePath: String): Column

/**
 * Converts column to binary protobuf format using descriptor file with options
 * @param data - The data column to convert
 * @param messageName - The protobuf message name to look for in descriptor file
 * @param descFilePath - Path to protobuf descriptor file
 * @param options - Configuration options for protobuf processing
 * @return Column with binary protobuf data
 * @since 3.4.0
 */
@Experimental
def to_protobuf(
  data: Column,
  messageName: String,
  descFilePath: String,
  options: java.util.Map[String, String]
): Column

To Protobuf with FileDescriptorSet

Convert Catalyst data structures to protobuf binary using a serialized FileDescriptorSet.

/**
 * Converts column to binary protobuf format using FileDescriptorSet
 * @param data - The data column to convert
 * @param messageName - The protobuf message name to look for in the descriptor set
 * @param binaryFileDescriptorSet - Serialized FileDescriptorSet (from protoc --descriptor_set_out)
 * @return Column with binary protobuf data
 * @since 3.5.0
 */
@Experimental
def to_protobuf(
  data: Column,
  messageName: String,
  binaryFileDescriptorSet: Array[Byte]
): Column

/**
 * Converts column to binary protobuf format using FileDescriptorSet with options
 * @param data - The data column to convert
 * @param messageName - The protobuf message name to look for in the descriptor set
 * @param binaryFileDescriptorSet - Serialized FileDescriptorSet
 * @param options - Configuration options for protobuf processing
 * @return Column with binary protobuf data
 * @since 3.5.0
 */
@Experimental
def to_protobuf(
  data: Column,
  messageName: String,
  binaryFileDescriptorSet: Array[Byte],
  options: java.util.Map[String, String]
): Column

To Protobuf with Java Class

Convert Catalyst data structures to protobuf binary using a Java class. The jar must be shaded appropriately.

/**
 * Converts column to binary protobuf format using Java class
 * @param data - The data column to convert
 * @param messageClassName - Full name of protobuf Java class (e.g., com.example.protos.ExampleEvent)
 * @return Column with binary protobuf data
 * @since 3.4.0
 * @note The jar with protobuf classes needs to be shaded
 */
@Experimental
def to_protobuf(data: Column, messageClassName: String): Column

/**
 * Converts column to binary protobuf format using Java class with options
 * @param data - The data column to convert
 * @param messageClassName - Full name of protobuf Java class
 * @param options - Configuration options for protobuf processing
 * @return Column with binary protobuf data
 * @since 3.4.0
 */
@Experimental
def to_protobuf(
  data: Column,
  messageClassName: String,
  options: java.util.Map[String, String]
): Column

Usage Examples

Basic Serialization with Descriptor File

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions._

val spark = SparkSession.builder().appName("ProtobufSerialization").getOrCreate()
import spark.implicits._

// Create structured data
val peopleDF = Seq(
  ("Alice", 25, "Engineer"),
  ("Bob", 30, "Manager"),
  ("Charlie", 35, "Designer")
).toDF("name", "age", "role")

// Convert to protobuf binary
val protobufData = peopleDF.select(
  to_protobuf(
    struct($"name", $"age", $"role"),
    "PersonMessage",
    "/path/to/person.desc"
  ).as("protobuf_data")
)

// Write to file
protobufData.write.mode("overwrite").parquet("/path/to/output")

Serialization with Configuration Options

import java.util.{Map => JMap}
import scala.collection.JavaConverters._

// Create complex nested data
val complexDF = spark.sql("""
  SELECT 
    struct(
      'user_123' as user_id,
      struct('John' as first_name, 'Doe' as last_name) as name,
      array(
        struct('email' as type, 'john@example.com' as value),
        struct('phone' as type, '+1234567890' as value)
      ) as contacts
    ) as user_data
""")

// Configure serialization options
val options: JMap[String, String] = Map(
  "mode" -> "PERMISSIVE"
).asJava

val serialized = complexDF.select(
  to_protobuf($"user_data", "UserMessage", "/path/to/user.desc", options).as("binary")
)

Serialization with FileDescriptorSet

import org.apache.spark.sql.protobuf.utils.ProtobufUtils

// Load descriptor set
val descriptorSetBytes = ProtobufUtils.readDescriptorFileContent("/path/to/messages.desc")

val serializedFromDescSet = peopleDF.select(
  to_protobuf(
    struct($"name", $"age", $"role"),
    "PersonMessage",
    descriptorSetBytes
  ).as("protobuf_binary")
)

Serialization with Java Class

// Using shaded protobuf Java class
val serializedFromClass = peopleDF.select(
  to_protobuf(
    struct($"name", $"age", $"role"),
    "com.example.protos.PersonMessage"
  ).as("protobuf_binary")
)

// With options
val serializedWithOptions = peopleDF.select(
  to_protobuf(
    struct($"name", $"age", $"role"),
    "com.example.protos.PersonMessage",
    Map("mode" -> "FAILFAST").asJava
  ).as("protobuf_binary")
)

Round-trip Conversion Example

// Original structured data
val originalDF = Seq(
  ("Alice", 25, Array("reading", "hiking")),
  ("Bob", 30, Array("gaming", "cooking"))
).toDF("name", "age", "hobbies")

// Convert to protobuf
val protobufDF = originalDF.select(
  to_protobuf(
    struct($"name", $"age", $"hobbies"),
    "PersonWithHobbies",
    "/path/to/person.desc"
  ).as("protobuf_data")
)

// Convert back to structured data
val roundTripDF = protobufDF.select(
  from_protobuf($"protobuf_data", "PersonWithHobbies", "/path/to/person.desc").as("person")
)

// Access original fields
roundTripDF.select(
  $"person.name",
  $"person.age", 
  $"person.hobbies"
).show()

Batch Processing with Protobuf Output

// Process streaming data and output as protobuf
val streamingDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "input-topic")
  .load()

// Parse JSON input and convert to protobuf
val processedStream = streamingDF
  .select(from_json($"value".cast("string"), inputSchema).as("data"))
  .select(
    to_protobuf($"data", "EventMessage", "/path/to/event.desc").as("protobuf_value")
  )

// Write back to Kafka as protobuf
processedStream.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .option("key", $"protobuf_value")
  .start()

Type Mapping

The to_protobuf function maps Spark SQL types to corresponding protobuf types:

  • IntegerType → INT32
  • LongType → INT64
  • FloatType → FLOAT
  • DoubleType → DOUBLE
  • BooleanType → BOOL
  • StringType → STRING
  • BinaryType → BYTES
  • StructType → MESSAGE
  • ArrayType → REPEATED field
  • MapType → MAP field
  • TimestampType → google.protobuf.Timestamp
  • DayTimeIntervalType → google.protobuf.Duration

Schema Validation

The function validates that the input Catalyst schema matches the expected protobuf message schema:

// Example of schema validation error handling
try {
  val result = dataDF.select(
    to_protobuf($"data", "MessageType", descriptorPath).as("binary")
  )
  result.collect()
} catch {
  case e: AnalysisException if e.getMessage.contains("Cannot find Catalyst type") =>
    println(s"Schema mismatch: ${e.getMessage}")
  case e: Exception =>
    println(s"Serialization error: ${e.getMessage}")
}

Performance Considerations

  • Descriptor Caching: Descriptors are cached per task for performance
  • Schema Inference: Schema conversion happens at query planning time
  • Lazy Evaluation: Serialization is performed only when actions are called
  • Memory Usage: Large descriptor sets are broadcast to avoid repeated transfers
// Optimize for repeated serialization with same schema
val cachedDF = originalDF.cache()

val batch1 = cachedDF.select(to_protobuf($"data", "Type1", descriptorPath))
val batch2 = cachedDF.select(to_protobuf($"data", "Type2", descriptorPath))