tessl install tessl/maven-org-apache-spark--spark-protobuf_2-13@3.5.0Apache Spark connector for Protocol Buffer (protobuf) data format support, providing SQL functions to convert between binary protobuf data and Catalyst data structures for processing structured data in distributed big data analytics pipelines
Convert Spark SQL data structures to binary protobuf format for output, storage, and data interchange with external systems.
Convert Catalyst data structures to protobuf binary using a protobuf descriptor file.
/**
* Converts column to binary protobuf format using descriptor file
* @param data - The data column to convert
* @param messageName - The protobuf message name to look for in descriptor file
* @param descFilePath - Path to protobuf descriptor file (created with protoc --descriptor_set_out)
* @return Column with binary protobuf data
* @since 3.4.0
*/
@Experimental
def to_protobuf(data: Column, messageName: String, descFilePath: String): Column
/**
* Converts column to binary protobuf format using descriptor file with options
* @param data - The data column to convert
* @param messageName - The protobuf message name to look for in descriptor file
* @param descFilePath - Path to protobuf descriptor file
* @param options - Configuration options for protobuf processing
* @return Column with binary protobuf data
* @since 3.4.0
*/
@Experimental
def to_protobuf(
data: Column,
messageName: String,
descFilePath: String,
options: java.util.Map[String, String]
): ColumnConvert Catalyst data structures to protobuf binary using a serialized FileDescriptorSet.
/**
* Converts column to binary protobuf format using FileDescriptorSet
* @param data - The data column to convert
* @param messageName - The protobuf message name to look for in the descriptor set
* @param binaryFileDescriptorSet - Serialized FileDescriptorSet (from protoc --descriptor_set_out)
* @return Column with binary protobuf data
* @since 3.5.0
*/
@Experimental
def to_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte]
): Column
/**
* Converts column to binary protobuf format using FileDescriptorSet with options
* @param data - The data column to convert
* @param messageName - The protobuf message name to look for in the descriptor set
* @param binaryFileDescriptorSet - Serialized FileDescriptorSet
* @param options - Configuration options for protobuf processing
* @return Column with binary protobuf data
* @since 3.5.0
*/
@Experimental
def to_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte],
options: java.util.Map[String, String]
): ColumnConvert Catalyst data structures to protobuf binary using a Java class. The jar must be shaded appropriately.
/**
* Converts column to binary protobuf format using Java class
* @param data - The data column to convert
* @param messageClassName - Full name of protobuf Java class (e.g., com.example.protos.ExampleEvent)
* @return Column with binary protobuf data
* @since 3.4.0
* @note The jar with protobuf classes needs to be shaded
*/
@Experimental
def to_protobuf(data: Column, messageClassName: String): Column
/**
* Converts column to binary protobuf format using Java class with options
* @param data - The data column to convert
* @param messageClassName - Full name of protobuf Java class
* @param options - Configuration options for protobuf processing
* @return Column with binary protobuf data
* @since 3.4.0
*/
@Experimental
def to_protobuf(
data: Column,
messageClassName: String,
options: java.util.Map[String, String]
): Columnimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions._
val spark = SparkSession.builder().appName("ProtobufSerialization").getOrCreate()
import spark.implicits._
// Create structured data
val peopleDF = Seq(
("Alice", 25, "Engineer"),
("Bob", 30, "Manager"),
("Charlie", 35, "Designer")
).toDF("name", "age", "role")
// Convert to protobuf binary
val protobufData = peopleDF.select(
to_protobuf(
struct($"name", $"age", $"role"),
"PersonMessage",
"/path/to/person.desc"
).as("protobuf_data")
)
// Write to file
protobufData.write.mode("overwrite").parquet("/path/to/output")import java.util.{Map => JMap}
import scala.collection.JavaConverters._
// Create complex nested data
val complexDF = spark.sql("""
SELECT
struct(
'user_123' as user_id,
struct('John' as first_name, 'Doe' as last_name) as name,
array(
struct('email' as type, 'john@example.com' as value),
struct('phone' as type, '+1234567890' as value)
) as contacts
) as user_data
""")
// Configure serialization options
val options: JMap[String, String] = Map(
"mode" -> "PERMISSIVE"
).asJava
val serialized = complexDF.select(
to_protobuf($"user_data", "UserMessage", "/path/to/user.desc", options).as("binary")
)import org.apache.spark.sql.protobuf.utils.ProtobufUtils
// Load descriptor set
val descriptorSetBytes = ProtobufUtils.readDescriptorFileContent("/path/to/messages.desc")
val serializedFromDescSet = peopleDF.select(
to_protobuf(
struct($"name", $"age", $"role"),
"PersonMessage",
descriptorSetBytes
).as("protobuf_binary")
)// Using shaded protobuf Java class
val serializedFromClass = peopleDF.select(
to_protobuf(
struct($"name", $"age", $"role"),
"com.example.protos.PersonMessage"
).as("protobuf_binary")
)
// With options
val serializedWithOptions = peopleDF.select(
to_protobuf(
struct($"name", $"age", $"role"),
"com.example.protos.PersonMessage",
Map("mode" -> "FAILFAST").asJava
).as("protobuf_binary")
)// Original structured data
val originalDF = Seq(
("Alice", 25, Array("reading", "hiking")),
("Bob", 30, Array("gaming", "cooking"))
).toDF("name", "age", "hobbies")
// Convert to protobuf
val protobufDF = originalDF.select(
to_protobuf(
struct($"name", $"age", $"hobbies"),
"PersonWithHobbies",
"/path/to/person.desc"
).as("protobuf_data")
)
// Convert back to structured data
val roundTripDF = protobufDF.select(
from_protobuf($"protobuf_data", "PersonWithHobbies", "/path/to/person.desc").as("person")
)
// Access original fields
roundTripDF.select(
$"person.name",
$"person.age",
$"person.hobbies"
).show()// Process streaming data and output as protobuf
val streamingDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input-topic")
.load()
// Parse JSON input and convert to protobuf
val processedStream = streamingDF
.select(from_json($"value".cast("string"), inputSchema).as("data"))
.select(
to_protobuf($"data", "EventMessage", "/path/to/event.desc").as("protobuf_value")
)
// Write back to Kafka as protobuf
processedStream.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.option("key", $"protobuf_value")
.start()The to_protobuf function maps Spark SQL types to corresponding protobuf types:
The function validates that the input Catalyst schema matches the expected protobuf message schema:
// Example of schema validation error handling
try {
val result = dataDF.select(
to_protobuf($"data", "MessageType", descriptorPath).as("binary")
)
result.collect()
} catch {
case e: AnalysisException if e.getMessage.contains("Cannot find Catalyst type") =>
println(s"Schema mismatch: ${e.getMessage}")
case e: Exception =>
println(s"Serialization error: ${e.getMessage}")
}// Optimize for repeated serialization with same schema
val cachedDF = originalDF.cache()
val batch1 = cachedDF.select(to_protobuf($"data", "Type1", descriptorPath))
val batch2 = cachedDF.select(to_protobuf($"data", "Type2", descriptorPath))