or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfrom-protobuf.mdindex.mdschema-utilities.mdto-protobuf.md

to-protobuf.mddocs/

0

# Catalyst to Protobuf Conversion

1

2

Functions for converting Spark SQL structured types to binary Protocol Buffer format with comprehensive serialization options and multiple descriptor source support.

3

4

## Capabilities

5

6

### to_protobuf with Descriptor File Path

7

8

Convert Spark SQL data to binary protobuf using a descriptor file path on the filesystem.

9

10

```scala { .api }

11

/**

12

* Converts a column into binary of protobuf format using descriptor file.

13

* @param data the data column containing Spark SQL structured data

14

* @param messageName the protobuf MessageName to look for in descriptor file

15

* @param descFilePath the protobuf descriptor file path (created with protoc --descriptor_set_out)

16

* @param options configuration options for serialization behavior

17

* @return Column with serialized binary protobuf data

18

*/

19

def to_protobuf(

20

data: Column,

21

messageName: String,

22

descFilePath: String,

23

options: java.util.Map[String, String]

24

): Column

25

26

/**

27

* Converts a column into binary of protobuf format using descriptor file.

28

* @param data the data column containing Spark SQL structured data

29

* @param messageName the protobuf MessageName to look for in descriptor file

30

* @param descFilePath the protobuf descriptor file path (created with protoc --descriptor_set_out)

31

* @return Column with serialized binary protobuf data

32

*/

33

def to_protobuf(

34

data: Column,

35

messageName: String,

36

descFilePath: String

37

): Column

38

```

39

40

### to_protobuf with Binary Descriptor Set

41

42

Convert Spark SQL data to binary protobuf using a pre-loaded binary FileDescriptorSet.

43

44

```scala { .api }

45

/**

46

* Converts a column into binary of protobuf format using FileDescriptorSet.

47

* @param data the data column containing Spark SQL structured data

48

* @param messageName the protobuf MessageName to look for in the descriptor set

49

* @param binaryFileDescriptorSet serialized protobuf descriptor (FileDescriptorSet)

50

* @param options configuration options for serialization behavior

51

* @return Column with serialized binary protobuf data

52

*/

53

def to_protobuf(

54

data: Column,

55

messageName: String,

56

binaryFileDescriptorSet: Array[Byte],

57

options: java.util.Map[String, String]

58

): Column

59

60

/**

61

* Converts a column into binary of protobuf format using FileDescriptorSet.

62

* @param data the data column containing Spark SQL structured data

63

* @param messageName the protobuf MessageName to look for in the descriptor set

64

* @param binaryFileDescriptorSet serialized protobuf descriptor (FileDescriptorSet)

65

* @return Column with serialized binary protobuf data

66

*/

67

def to_protobuf(

68

data: Column,

69

messageName: String,

70

binaryFileDescriptorSet: Array[Byte]

71

): Column

72

```

73

74

### to_protobuf with Java Class Name

75

76

Convert Spark SQL data to binary protobuf using a Java class name (requires shaded protobuf classes).

77

78

```scala { .api }

79

/**

80

* Converts a column into binary of protobuf format using Java class.

81

* The jar containing Java class should be shaded with com.google.protobuf.*

82

* relocated to org.sparkproject.spark_protobuf.protobuf.*

83

* @param data the data column containing Spark SQL structured data

84

* @param messageClassName the full name for Protobuf Java class

85

* @param options configuration options for serialization behavior

86

* @return Column with serialized binary protobuf data

87

*/

88

def to_protobuf(

89

data: Column,

90

messageClassName: String,

91

options: java.util.Map[String, String]

92

): Column

93

94

/**

95

* Converts a column into binary of protobuf format using Java class.

96

* The jar containing Java class should be shaded with com.google.protobuf.*

97

* relocated to org.sparkproject.spark_protobuf.protobuf.*

98

* @param data the data column containing Spark SQL structured data

99

* @param messageClassName the full name for Protobuf Java class

100

* @return Column with serialized binary protobuf data

101

*/

102

def to_protobuf(

103

data: Column,

104

messageClassName: String

105

): Column

106

```

107

108

## Usage Examples

109

110

### Basic Serialization

111

112

```scala

113

import org.apache.spark.sql.protobuf.functions._

114

import org.apache.spark.sql.functions.{col, struct}

115

116

// Create structured data

117

val peopleDF = Seq(

118

("Alice", 25, true),

119

("Bob", 30, false),

120

("Charlie", 35, true)

121

).toDF("name", "age", "is_active")

122

123

// Convert to struct column for protobuf serialization

124

val structuredDF = peopleDF.select(

125

struct(col("name"), col("age"), col("is_active")) as "person_struct"

126

)

127

128

// Serialize to binary protobuf

129

val serializedDF = structuredDF.select(

130

to_protobuf(col("person_struct"), "PersonMessage", "/path/to/person.desc") as "protobuf_binary"

131

)

132

133

serializedDF.show(false)

134

```

135

136

### Serialization with Options

137

138

```scala

139

import scala.jdk.CollectionConverters._

140

141

val options = Map(

142

"emit.default.values" -> "true",

143

"enums.as.ints" -> "false"

144

).asJava

145

146

val serializedDF = structuredDF.select(

147

to_protobuf(col("person_struct"), "PersonMessage", descriptorBytes, options) as "protobuf_binary"

148

)

149

```

150

151

### Using Java Class Names

152

153

```scala

154

// Requires shaded protobuf JAR on classpath

155

val serializedDF = structuredDF.select(

156

to_protobuf(col("person_struct"), "com.example.protos.PersonMessage") as "protobuf_binary"

157

)

158

```

159

160

### Round-trip Conversion

161

162

```scala

163

// Serialize Spark data to protobuf

164

val serializedDF = structuredDF.select(

165

to_protobuf(col("person_struct"), "PersonMessage", descriptorFile) as "protobuf_binary"

166

)

167

168

// Deserialize back to Spark data

169

val roundTripDF = serializedDF.select(

170

from_protobuf(col("protobuf_binary"), "PersonMessage", descriptorFile) as "person_data"

171

)

172

173

roundTripDF.show(false)

174

```

175

176

### Complex Data Types

177

178

```scala

179

// Handle nested structures, arrays, and maps

180

val complexDF = Seq(

181

("Alice", Array("reading", "swimming"), Map("city" -> "NYC", "country" -> "USA")),

182

("Bob", Array("running", "cycling"), Map("city" -> "LA", "country" -> "USA"))

183

).toDF("name", "hobbies", "address")

184

185

val complexStructDF = complexDF.select(

186

struct(col("name"), col("hobbies"), col("address")) as "complex_struct"

187

)

188

189

val serializedComplexDF = complexStructDF.select(

190

to_protobuf(col("complex_struct"), "ComplexPersonMessage", descriptorFile) as "protobuf_binary"

191

)

192

```

193

194

### Handling Null Values

195

196

```scala

197

val dataWithNulls = Seq(

198

("Alice", Some(25), Some(true)),

199

("Bob", None, Some(false)),

200

("Charlie", Some(35), None)

201

).toDF("name", "age", "is_active")

202

203

val structWithNulls = dataWithNulls.select(

204

struct(col("name"), col("age"), col("is_active")) as "person_struct"

205

)

206

207

// Protobuf serialization handles nulls according to proto field definitions

208

val serializedWithNulls = structWithNulls.select(

209

to_protobuf(col("person_struct"), "PersonMessage", descriptorFile) as "protobuf_binary"

210

)

211

```

212

213

### Schema Validation

214

215

```scala

216

// The to_protobuf function validates that Spark schema matches protobuf schema

217

// Extra fields in Catalyst schema will cause validation errors

218

// Missing required fields in protobuf schema will cause validation errors

219

220

try {

221

val serializedDF = structuredDF.select(

222

to_protobuf(col("person_struct"), "PersonMessage", descriptorFile) as "protobuf_binary"

223

)

224

serializedDF.collect() // Triggers validation

225

} catch {

226

case e: AnalysisException =>

227

println(s"Schema validation failed: ${e.getMessage}")

228

}

229

```