Functions for converting Spark SQL structured types to binary Protocol Buffer format with comprehensive serialization options and multiple descriptor source support.
Convert Spark SQL data to binary protobuf using a descriptor file path on the filesystem.
/**
* Converts a column into binary of protobuf format using descriptor file.
* @param data the data column containing Spark SQL structured data
* @param messageName the protobuf MessageName to look for in descriptor file
* @param descFilePath the protobuf descriptor file path (created with protoc --descriptor_set_out)
* @param options configuration options for serialization behavior
* @return Column with serialized binary protobuf data
*/
def to_protobuf(
data: Column,
messageName: String,
descFilePath: String,
options: java.util.Map[String, String]
): Column
/**
* Converts a column into binary of protobuf format using descriptor file.
* @param data the data column containing Spark SQL structured data
* @param messageName the protobuf MessageName to look for in descriptor file
* @param descFilePath the protobuf descriptor file path (created with protoc --descriptor_set_out)
* @return Column with serialized binary protobuf data
*/
def to_protobuf(
data: Column,
messageName: String,
descFilePath: String
): ColumnConvert Spark SQL data to binary protobuf using a pre-loaded binary FileDescriptorSet.
/**
* Converts a column into binary of protobuf format using FileDescriptorSet.
* @param data the data column containing Spark SQL structured data
* @param messageName the protobuf MessageName to look for in the descriptor set
* @param binaryFileDescriptorSet serialized protobuf descriptor (FileDescriptorSet)
* @param options configuration options for serialization behavior
* @return Column with serialized binary protobuf data
*/
def to_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte],
options: java.util.Map[String, String]
): Column
/**
* Converts a column into binary of protobuf format using FileDescriptorSet.
* @param data the data column containing Spark SQL structured data
* @param messageName the protobuf MessageName to look for in the descriptor set
* @param binaryFileDescriptorSet serialized protobuf descriptor (FileDescriptorSet)
* @return Column with serialized binary protobuf data
*/
def to_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte]
): ColumnConvert Spark SQL data to binary protobuf using a Java class name (requires shaded protobuf classes).
/**
* Converts a column into binary of protobuf format using Java class.
* The jar containing Java class should be shaded with com.google.protobuf.*
* relocated to org.sparkproject.spark_protobuf.protobuf.*
* @param data the data column containing Spark SQL structured data
* @param messageClassName the full name for Protobuf Java class
* @param options configuration options for serialization behavior
* @return Column with serialized binary protobuf data
*/
def to_protobuf(
data: Column,
messageClassName: String,
options: java.util.Map[String, String]
): Column
/**
* Converts a column into binary of protobuf format using Java class.
* The jar containing Java class should be shaded with com.google.protobuf.*
* relocated to org.sparkproject.spark_protobuf.protobuf.*
* @param data the data column containing Spark SQL structured data
* @param messageClassName the full name for Protobuf Java class
* @return Column with serialized binary protobuf data
*/
def to_protobuf(
data: Column,
messageClassName: String
): Columnimport org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.{col, struct}
// Create structured data
val peopleDF = Seq(
("Alice", 25, true),
("Bob", 30, false),
("Charlie", 35, true)
).toDF("name", "age", "is_active")
// Convert to struct column for protobuf serialization
val structuredDF = peopleDF.select(
struct(col("name"), col("age"), col("is_active")) as "person_struct"
)
// Serialize to binary protobuf
val serializedDF = structuredDF.select(
to_protobuf(col("person_struct"), "PersonMessage", "/path/to/person.desc") as "protobuf_binary"
)
serializedDF.show(false)import scala.jdk.CollectionConverters._
val options = Map(
"emit.default.values" -> "true",
"enums.as.ints" -> "false"
).asJava
val serializedDF = structuredDF.select(
to_protobuf(col("person_struct"), "PersonMessage", descriptorBytes, options) as "protobuf_binary"
)// Requires shaded protobuf JAR on classpath
val serializedDF = structuredDF.select(
to_protobuf(col("person_struct"), "com.example.protos.PersonMessage") as "protobuf_binary"
)// Serialize Spark data to protobuf
val serializedDF = structuredDF.select(
to_protobuf(col("person_struct"), "PersonMessage", descriptorFile) as "protobuf_binary"
)
// Deserialize back to Spark data
val roundTripDF = serializedDF.select(
from_protobuf(col("protobuf_binary"), "PersonMessage", descriptorFile) as "person_data"
)
roundTripDF.show(false)// Handle nested structures, arrays, and maps
val complexDF = Seq(
("Alice", Array("reading", "swimming"), Map("city" -> "NYC", "country" -> "USA")),
("Bob", Array("running", "cycling"), Map("city" -> "LA", "country" -> "USA"))
).toDF("name", "hobbies", "address")
val complexStructDF = complexDF.select(
struct(col("name"), col("hobbies"), col("address")) as "complex_struct"
)
val serializedComplexDF = complexStructDF.select(
to_protobuf(col("complex_struct"), "ComplexPersonMessage", descriptorFile) as "protobuf_binary"
)val dataWithNulls = Seq(
("Alice", Some(25), Some(true)),
("Bob", None, Some(false)),
("Charlie", Some(35), None)
).toDF("name", "age", "is_active")
val structWithNulls = dataWithNulls.select(
struct(col("name"), col("age"), col("is_active")) as "person_struct"
)
// Protobuf serialization handles nulls according to proto field definitions
val serializedWithNulls = structWithNulls.select(
to_protobuf(col("person_struct"), "PersonMessage", descriptorFile) as "protobuf_binary"
)// The to_protobuf function validates that Spark schema matches protobuf schema
// Extra fields in Catalyst schema will cause validation errors
// Missing required fields in protobuf schema will cause validation errors
try {
val serializedDF = structuredDF.select(
to_protobuf(col("person_struct"), "PersonMessage", descriptorFile) as "protobuf_binary"
)
serializedDF.collect() // Triggers validation
} catch {
case e: AnalysisException =>
println(s"Schema validation failed: ${e.getMessage}")
}