0
# Catalyst to Protobuf Conversion
1
2
Functions for converting Spark SQL structured types to binary Protocol Buffer format with comprehensive serialization options and multiple descriptor source support.
3
4
## Capabilities
5
6
### to_protobuf with Descriptor File Path
7
8
Convert Spark SQL data to binary protobuf using a descriptor file path on the filesystem.
9
10
```scala { .api }
11
/**
12
* Converts a column into binary of protobuf format using descriptor file.
13
* @param data the data column containing Spark SQL structured data
14
* @param messageName the protobuf MessageName to look for in descriptor file
15
* @param descFilePath the protobuf descriptor file path (created with protoc --descriptor_set_out)
16
* @param options configuration options for serialization behavior
17
* @return Column with serialized binary protobuf data
18
*/
19
def to_protobuf(
20
data: Column,
21
messageName: String,
22
descFilePath: String,
23
options: java.util.Map[String, String]
24
): Column
25
26
/**
27
* Converts a column into binary of protobuf format using descriptor file.
28
* @param data the data column containing Spark SQL structured data
29
* @param messageName the protobuf MessageName to look for in descriptor file
30
* @param descFilePath the protobuf descriptor file path (created with protoc --descriptor_set_out)
31
* @return Column with serialized binary protobuf data
32
*/
33
def to_protobuf(
34
data: Column,
35
messageName: String,
36
descFilePath: String
37
): Column
38
```
39
40
### to_protobuf with Binary Descriptor Set
41
42
Convert Spark SQL data to binary protobuf using a pre-loaded binary FileDescriptorSet.
43
44
```scala { .api }
45
/**
46
* Converts a column into binary of protobuf format using FileDescriptorSet.
47
* @param data the data column containing Spark SQL structured data
48
* @param messageName the protobuf MessageName to look for in the descriptor set
49
* @param binaryFileDescriptorSet serialized protobuf descriptor (FileDescriptorSet)
50
* @param options configuration options for serialization behavior
51
* @return Column with serialized binary protobuf data
52
*/
53
def to_protobuf(
54
data: Column,
55
messageName: String,
56
binaryFileDescriptorSet: Array[Byte],
57
options: java.util.Map[String, String]
58
): Column
59
60
/**
61
* Converts a column into binary of protobuf format using FileDescriptorSet.
62
* @param data the data column containing Spark SQL structured data
63
* @param messageName the protobuf MessageName to look for in the descriptor set
64
* @param binaryFileDescriptorSet serialized protobuf descriptor (FileDescriptorSet)
65
* @return Column with serialized binary protobuf data
66
*/
67
def to_protobuf(
68
data: Column,
69
messageName: String,
70
binaryFileDescriptorSet: Array[Byte]
71
): Column
72
```
73
74
### to_protobuf with Java Class Name
75
76
Convert Spark SQL data to binary protobuf using a Java class name (requires shaded protobuf classes).
77
78
```scala { .api }
79
/**
80
* Converts a column into binary of protobuf format using Java class.
81
* The jar containing Java class should be shaded with com.google.protobuf.*
82
* relocated to org.sparkproject.spark_protobuf.protobuf.*
83
* @param data the data column containing Spark SQL structured data
84
* @param messageClassName the full name for Protobuf Java class
85
* @param options configuration options for serialization behavior
86
* @return Column with serialized binary protobuf data
87
*/
88
def to_protobuf(
89
data: Column,
90
messageClassName: String,
91
options: java.util.Map[String, String]
92
): Column
93
94
/**
95
* Converts a column into binary of protobuf format using Java class.
96
* The jar containing Java class should be shaded with com.google.protobuf.*
97
* relocated to org.sparkproject.spark_protobuf.protobuf.*
98
* @param data the data column containing Spark SQL structured data
99
* @param messageClassName the full name for Protobuf Java class
100
* @return Column with serialized binary protobuf data
101
*/
102
def to_protobuf(
103
data: Column,
104
messageClassName: String
105
): Column
106
```
107
108
## Usage Examples
109
110
### Basic Serialization
111
112
```scala
113
import org.apache.spark.sql.protobuf.functions._
114
import org.apache.spark.sql.functions.{col, struct}
115
116
// Create structured data
117
val peopleDF = Seq(
118
("Alice", 25, true),
119
("Bob", 30, false),
120
("Charlie", 35, true)
121
).toDF("name", "age", "is_active")
122
123
// Convert to struct column for protobuf serialization
124
val structuredDF = peopleDF.select(
125
struct(col("name"), col("age"), col("is_active")) as "person_struct"
126
)
127
128
// Serialize to binary protobuf
129
val serializedDF = structuredDF.select(
130
to_protobuf(col("person_struct"), "PersonMessage", "/path/to/person.desc") as "protobuf_binary"
131
)
132
133
serializedDF.show(false)
134
```
135
136
### Serialization with Options
137
138
```scala
139
import scala.jdk.CollectionConverters._
140
141
val options = Map(
142
"emit.default.values" -> "true",
143
"enums.as.ints" -> "false"
144
).asJava
145
146
val serializedDF = structuredDF.select(
147
to_protobuf(col("person_struct"), "PersonMessage", descriptorBytes, options) as "protobuf_binary"
148
)
149
```
150
151
### Using Java Class Names
152
153
```scala
154
// Requires shaded protobuf JAR on classpath
155
val serializedDF = structuredDF.select(
156
to_protobuf(col("person_struct"), "com.example.protos.PersonMessage") as "protobuf_binary"
157
)
158
```
159
160
### Round-trip Conversion
161
162
```scala
163
// Serialize Spark data to protobuf
164
val serializedDF = structuredDF.select(
165
to_protobuf(col("person_struct"), "PersonMessage", descriptorFile) as "protobuf_binary"
166
)
167
168
// Deserialize back to Spark data
169
val roundTripDF = serializedDF.select(
170
from_protobuf(col("protobuf_binary"), "PersonMessage", descriptorFile) as "person_data"
171
)
172
173
roundTripDF.show(false)
174
```
175
176
### Complex Data Types
177
178
```scala
179
// Handle nested structures, arrays, and maps
180
val complexDF = Seq(
181
("Alice", Array("reading", "swimming"), Map("city" -> "NYC", "country" -> "USA")),
182
("Bob", Array("running", "cycling"), Map("city" -> "LA", "country" -> "USA"))
183
).toDF("name", "hobbies", "address")
184
185
val complexStructDF = complexDF.select(
186
struct(col("name"), col("hobbies"), col("address")) as "complex_struct"
187
)
188
189
val serializedComplexDF = complexStructDF.select(
190
to_protobuf(col("complex_struct"), "ComplexPersonMessage", descriptorFile) as "protobuf_binary"
191
)
192
```
193
194
### Handling Null Values
195
196
```scala
197
val dataWithNulls = Seq(
198
("Alice", Some(25), Some(true)),
199
("Bob", None, Some(false)),
200
("Charlie", Some(35), None)
201
).toDF("name", "age", "is_active")
202
203
val structWithNulls = dataWithNulls.select(
204
struct(col("name"), col("age"), col("is_active")) as "person_struct"
205
)
206
207
// Protobuf serialization handles nulls according to proto field definitions
208
val serializedWithNulls = structWithNulls.select(
209
to_protobuf(col("person_struct"), "PersonMessage", descriptorFile) as "protobuf_binary"
210
)
211
```
212
213
### Schema Validation
214
215
```scala
216
// The to_protobuf function validates that Spark schema matches protobuf schema
217
// Extra fields in Catalyst schema will cause validation errors
218
// Missing required fields in protobuf schema will cause validation errors
219
220
try {
221
val serializedDF = structuredDF.select(
222
to_protobuf(col("person_struct"), "PersonMessage", descriptorFile) as "protobuf_binary"
223
)
224
serializedDF.collect() // Triggers validation
225
} catch {
226
case e: AnalysisException =>
227
println(s"Schema validation failed: ${e.getMessage}")
228
}
229
```