0
# Apache Spark Protobuf Connector
1
2
Apache Spark Protobuf Connector enables seamless serialization and deserialization of Protocol Buffers data within Apache Spark SQL. It provides bidirectional conversion between protobuf binary data and Spark's Catalyst data structures through specialized SQL functions and comprehensive configuration options.
3
4
## Package Information
5
6
- **Package Name**: spark-protobuf_2.13
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Maven Coordinates**: `org.apache.spark:spark-protobuf_2.13:4.0.0`
10
- **Installation**: Add dependency to your Spark application
11
12
## Core Imports
13
14
```scala
15
import org.apache.spark.sql.protobuf.functions._
16
```
17
18
For Java compatibility:
19
20
```java
21
import static org.apache.spark.sql.protobuf.functions.*;
22
```
23
24
## Basic Usage
25
26
```scala
27
import org.apache.spark.sql.protobuf.functions._
28
import org.apache.spark.sql.DataFrame
29
30
// Convert binary protobuf data to Spark rows
31
val df: DataFrame = spark.read.format("binaryFile").load("protobuf-data")
32
33
// Deserialize protobuf binary to structured data
34
val deserializedDF = df.select(
35
from_protobuf(col("content"), "MessageName", descriptorFile) as "data"
36
)
37
38
// Serialize structured data back to protobuf binary
39
val serializedDF = deserializedDF.select(
40
to_protobuf(col("data"), "MessageName", descriptorFile) as "protobuf_binary"
41
)
42
```
43
44
## Architecture
45
46
The Spark Protobuf Connector is built around several key components:
47
48
- **SQL Functions**: `from_protobuf` and `to_protobuf` functions for DataFrame operations
49
- **Expression Layer**: Internal Catalyst expressions for efficient protobuf processing
50
- **Schema Conversion**: Automatic mapping between protobuf schemas and Spark SQL schemas
51
- **Configuration System**: Comprehensive options for controlling serialization/deserialization behavior
52
- **Type Registry**: Support for dynamic message handling and Any field processing
53
54
## Capabilities
55
56
### Protobuf to Catalyst Conversion
57
58
Convert binary protobuf data to Spark SQL structured types using `from_protobuf` functions with support for various descriptor sources and configuration options.
59
60
```scala { .api }
61
def from_protobuf(
62
data: Column,
63
messageName: String,
64
descFilePath: String,
65
options: java.util.Map[String, String]
66
): Column
67
68
def from_protobuf(
69
data: Column,
70
messageName: String,
71
binaryFileDescriptorSet: Array[Byte],
72
options: java.util.Map[String, String]
73
): Column
74
75
def from_protobuf(
76
data: Column,
77
messageClassName: String,
78
options: java.util.Map[String, String]
79
): Column
80
```
81
82
[Protobuf to Catalyst Conversion](./from-protobuf.md)
83
84
### Catalyst to Protobuf Conversion
85
86
Convert Spark SQL structured types to binary protobuf data using `to_protobuf` functions with comprehensive serialization options.
87
88
```scala { .api }
89
def to_protobuf(
90
data: Column,
91
messageName: String,
92
descFilePath: String,
93
options: java.util.Map[String, String]
94
): Column
95
96
def to_protobuf(
97
data: Column,
98
messageName: String,
99
binaryFileDescriptorSet: Array[Byte],
100
options: java.util.Map[String, String]
101
): Column
102
103
def to_protobuf(
104
data: Column,
105
messageClassName: String,
106
options: java.util.Map[String, String]
107
): Column
108
```
109
110
[Catalyst to Protobuf Conversion](./to-protobuf.md)
111
112
### Configuration and Options
113
114
Comprehensive configuration system for controlling protobuf processing behavior including parse modes, type conversions, and schema handling.
115
116
```scala { .api }
117
class ProtobufOptions(
118
parameters: Map[String, String],
119
conf: Configuration
120
)
121
122
object ProtobufOptions {
123
def apply(parameters: Map[String, String]): ProtobufOptions
124
val CONVERT_ANY_FIELDS_TO_JSON_CONFIG: String
125
}
126
```
127
128
[Configuration and Options](./configuration.md)
129
130
### Schema Utilities
131
132
Utilities for protobuf descriptor management, schema conversion, and type registry operations.
133
134
```scala { .api }
135
object ProtobufUtils {
136
def buildDescriptor(
137
messageName: String,
138
binaryFileDescriptorSet: Option[Array[Byte]]
139
): Descriptor
140
141
def buildDescriptorFromJavaClass(protobufClassName: String): Descriptor
142
143
def buildTypeRegistry(descriptorBytes: Array[Byte]): TypeRegistry
144
}
145
146
object SchemaConverters {
147
def toSqlType(
148
descriptor: Descriptor,
149
protobufOptions: ProtobufOptions
150
): SchemaType
151
}
152
```
153
154
[Schema Utilities](./schema-utilities.md)
155
156
## Types
157
158
```scala { .api }
159
case class SchemaType(dataType: DataType, nullable: Boolean)
160
161
case class ProtoMatchedField(
162
catalystField: StructField,
163
catalystPosition: Int,
164
fieldDescriptor: FieldDescriptor
165
)
166
167
class ProtoSchemaHelper(
168
descriptor: Descriptor,
169
catalystSchema: StructType,
170
protoPath: Seq[String],
171
catalystPath: Seq[String]
172
) {
173
val matchedFields: Seq[ProtoMatchedField]
174
def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit
175
def validateNoExtraRequiredProtoFields(): Unit
176
def getFieldByName(name: String): Option[FieldDescriptor]
177
}
178
```