Apache Spark connector for Protocol Buffers data source enabling seamless protobuf serialization and deserialization in Spark SQL.
Functions for converting Spark SQL structured types to binary Protocol Buffer format with comprehensive serialization options and multiple descriptor source support.
Convert Spark SQL data to binary protobuf using a descriptor file path on the filesystem.
/**
* Converts a column into binary of protobuf format using descriptor file.
* @param data the data column containing Spark SQL structured data
* @param messageName the protobuf MessageName to look for in descriptor file
* @param descFilePath the protobuf descriptor file path (created with protoc --descriptor_set_out)
* @param options configuration options for serialization behavior
* @return Column with serialized binary protobuf data
*/
def to_protobuf(
data: Column,
messageName: String,
descFilePath: String,
options: java.util.Map[String, String]
): Column
/**
* Converts a column into binary of protobuf format using descriptor file.
* @param data the data column containing Spark SQL structured data
* @param messageName the protobuf MessageName to look for in descriptor file
* @param descFilePath the protobuf descriptor file path (created with protoc --descriptor_set_out)
* @return Column with serialized binary protobuf data
*/
def to_protobuf(
data: Column,
messageName: String,
descFilePath: String
): ColumnConvert Spark SQL data to binary protobuf using a pre-loaded binary FileDescriptorSet.
/**
* Converts a column into binary of protobuf format using FileDescriptorSet.
* @param data the data column containing Spark SQL structured data
* @param messageName the protobuf MessageName to look for in the descriptor set
* @param binaryFileDescriptorSet serialized protobuf descriptor (FileDescriptorSet)
* @param options configuration options for serialization behavior
* @return Column with serialized binary protobuf data
*/
def to_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte],
options: java.util.Map[String, String]
): Column
/**
* Converts a column into binary of protobuf format using FileDescriptorSet.
* @param data the data column containing Spark SQL structured data
* @param messageName the protobuf MessageName to look for in the descriptor set
* @param binaryFileDescriptorSet serialized protobuf descriptor (FileDescriptorSet)
* @return Column with serialized binary protobuf data
*/
def to_protobuf(
data: Column,
messageName: String,
binaryFileDescriptorSet: Array[Byte]
): ColumnConvert Spark SQL data to binary protobuf using a Java class name (requires shaded protobuf classes).
/**
* Converts a column into binary of protobuf format using Java class.
* The jar containing Java class should be shaded with com.google.protobuf.*
* relocated to org.sparkproject.spark_protobuf.protobuf.*
* @param data the data column containing Spark SQL structured data
* @param messageClassName the full name for Protobuf Java class
* @param options configuration options for serialization behavior
* @return Column with serialized binary protobuf data
*/
def to_protobuf(
data: Column,
messageClassName: String,
options: java.util.Map[String, String]
): Column
/**
* Converts a column into binary of protobuf format using Java class.
* The jar containing Java class should be shaded with com.google.protobuf.*
* relocated to org.sparkproject.spark_protobuf.protobuf.*
* @param data the data column containing Spark SQL structured data
* @param messageClassName the full name for Protobuf Java class
* @return Column with serialized binary protobuf data
*/
def to_protobuf(
data: Column,
messageClassName: String
): Columnimport org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.{col, struct}
// Create structured data
val peopleDF = Seq(
("Alice", 25, true),
("Bob", 30, false),
("Charlie", 35, true)
).toDF("name", "age", "is_active")
// Convert to struct column for protobuf serialization
val structuredDF = peopleDF.select(
struct(col("name"), col("age"), col("is_active")) as "person_struct"
)
// Serialize to binary protobuf
val serializedDF = structuredDF.select(
to_protobuf(col("person_struct"), "PersonMessage", "/path/to/person.desc") as "protobuf_binary"
)
serializedDF.show(false)import scala.jdk.CollectionConverters._
val options = Map(
"emit.default.values" -> "true",
"enums.as.ints" -> "false"
).asJava
val serializedDF = structuredDF.select(
to_protobuf(col("person_struct"), "PersonMessage", descriptorBytes, options) as "protobuf_binary"
)// Requires shaded protobuf JAR on classpath
val serializedDF = structuredDF.select(
to_protobuf(col("person_struct"), "com.example.protos.PersonMessage") as "protobuf_binary"
)// Serialize Spark data to protobuf
val serializedDF = structuredDF.select(
to_protobuf(col("person_struct"), "PersonMessage", descriptorFile) as "protobuf_binary"
)
// Deserialize back to Spark data
val roundTripDF = serializedDF.select(
from_protobuf(col("protobuf_binary"), "PersonMessage", descriptorFile) as "person_data"
)
roundTripDF.show(false)// Handle nested structures, arrays, and maps
val complexDF = Seq(
("Alice", Array("reading", "swimming"), Map("city" -> "NYC", "country" -> "USA")),
("Bob", Array("running", "cycling"), Map("city" -> "LA", "country" -> "USA"))
).toDF("name", "hobbies", "address")
val complexStructDF = complexDF.select(
struct(col("name"), col("hobbies"), col("address")) as "complex_struct"
)
val serializedComplexDF = complexStructDF.select(
to_protobuf(col("complex_struct"), "ComplexPersonMessage", descriptorFile) as "protobuf_binary"
)val dataWithNulls = Seq(
("Alice", Some(25), Some(true)),
("Bob", None, Some(false)),
("Charlie", Some(35), None)
).toDF("name", "age", "is_active")
val structWithNulls = dataWithNulls.select(
struct(col("name"), col("age"), col("is_active")) as "person_struct"
)
// Protobuf serialization handles nulls according to proto field definitions
val serializedWithNulls = structWithNulls.select(
to_protobuf(col("person_struct"), "PersonMessage", descriptorFile) as "protobuf_binary"
)// The to_protobuf function validates that Spark schema matches protobuf schema
// Extra fields in Catalyst schema will cause validation errors
// Missing required fields in protobuf schema will cause validation errors
try {
val serializedDF = structuredDF.select(
to_protobuf(col("person_struct"), "PersonMessage", descriptorFile) as "protobuf_binary"
)
serializedDF.collect() // Triggers validation
} catch {
case e: AnalysisException =>
println(s"Schema validation failed: ${e.getMessage}")
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-protobuf-2-13