0
# Configuration and Options
1
2
Comprehensive configuration system for controlling protobuf processing behavior including parse modes, type conversions, schema handling, and field processing options.
3
4
## Capabilities
5
6
### ProtobufOptions Class
7
8
Central configuration class for customizing protobuf serialization and deserialization behavior.
9
10
```scala { .api }
11
/**
12
* Options for Protobuf Reader and Writer stored in case insensitive manner.
13
* @param parameters configuration parameters as case-insensitive map
14
* @param conf Hadoop configuration object
15
*/
16
class ProtobufOptions(
17
parameters: CaseInsensitiveMap[String],
18
conf: Configuration
19
) extends FileSourceOptions(parameters) with Logging
20
21
/**
22
* Secondary constructor accepting regular Map
23
* @param parameters configuration parameters
24
* @param conf Hadoop configuration object
25
*/
26
def this(parameters: Map[String, String], conf: Configuration)
27
28
/**
29
* Factory method for creating ProtobufOptions with default Hadoop configuration
30
* @param parameters configuration parameters
31
* @return ProtobufOptions instance
32
*/
33
object ProtobufOptions {
34
def apply(parameters: Map[String, String]): ProtobufOptions
35
36
/** Configuration key for converting Any fields to JSON */
37
val CONVERT_ANY_FIELDS_TO_JSON_CONFIG: String = "convert.any.fields.to.json"
38
}
39
```
40
41
### Parse Mode Configuration
42
43
Controls how malformed protobuf data is handled during deserialization.
44
45
```scala { .api }
46
/** Parse mode for handling malformed data (PERMISSIVE or FAILFAST) */
47
val parseMode: ParseMode
48
```
49
50
**Configuration Key**: `"mode"`
51
**Valid Values**:
52
- `"PERMISSIVE"` - Malformed records become null values
53
- `"FAILFAST"` - Malformed records cause job failure
54
55
### Recursive Field Configuration
56
57
Controls handling of recursive protobuf message types to prevent infinite schema expansion.
58
59
```scala { .api }
60
/** Maximum recursion depth for recursive fields (-1 to 10) */
61
val recursiveFieldMaxDepth: Int
62
```
63
64
**Configuration Key**: `"recursive.fields.max.depth"`
65
**Valid Values**: `-1` (no recursion), `1-10` (recursion depth limit)
66
**Default**: `-1`
67
68
### Type Conversion Options
69
70
Controls how protobuf types are converted to Spark SQL types.
71
72
```scala { .api }
73
/** Whether to render enum fields as their integer values instead of strings */
74
val enumsAsInts: Boolean
75
76
/** Whether to upcast unsigned integers to larger types to prevent overflow */
77
val upcastUnsignedInts: Boolean
78
79
/** Whether to unwrap well-known primitive wrapper types */
80
val unwrapWellKnownTypes: Boolean
81
```
82
83
**Configuration Keys**:
84
- `"enums.as.ints"` - Default: `false`
85
- `"upcast.unsigned.ints"` - Default: `false`
86
- `"unwrap.primitive.wrapper.types"` - Default: `false`
87
88
### Default Value Configuration
89
90
Controls emission of default values for zero-value fields in proto3.
91
92
```scala { .api }
93
/** Whether to render fields with zero values when deserializing */
94
val emitDefaultValues: Boolean
95
```
96
97
**Configuration Key**: `"emit.default.values"`
98
**Default**: `false`
99
100
### Any Field Configuration
101
102
Controls conversion of protobuf Any fields to JSON strings.
103
104
```scala { .api }
105
/** Whether to convert protobuf Any fields to JSON strings */
106
val convertAnyFieldsToJson: Boolean
107
```
108
109
**Configuration Key**: `"convert.any.fields.to.json"`
110
**Default**: `false`
111
112
### Empty Message Configuration
113
114
Controls handling of empty protobuf message types in Spark schemas.
115
116
```scala { .api }
117
/** Whether to retain empty proto message types by inserting dummy fields */
118
val retainEmptyMessage: Boolean
119
```
120
121
**Configuration Key**: `"retain.empty.message.types"`
122
**Default**: `false`
123
124
## Usage Examples
125
126
### Basic Configuration
127
128
```scala
129
import org.apache.spark.sql.protobuf.utils.ProtobufOptions
130
import scala.jdk.CollectionConverters._
131
132
val options = Map(
133
"mode" -> "PERMISSIVE",
134
"emit.default.values" -> "true"
135
).asJava
136
137
val deserializedDF = binaryDF.select(
138
from_protobuf(col("content"), "PersonMessage", descriptorFile, options) as "person_data"
139
)
140
```
141
142
### Recursive Field Handling
143
144
```scala
145
// Allow 2 levels of recursion for recursive message types
146
val recursiveOptions = Map(
147
"recursive.fields.max.depth" -> "2"
148
).asJava
149
150
val deserializedDF = binaryDF.select(
151
from_protobuf(col("content"), "RecursiveMessage", descriptorFile, recursiveOptions) as "data"
152
)
153
```
154
155
### Type Conversion Options
156
157
```scala
158
val typeOptions = Map(
159
"enums.as.ints" -> "true", // Render enums as integers
160
"upcast.unsigned.ints" -> "true", // Prevent unsigned integer overflow
161
"unwrap.primitive.wrapper.types" -> "true" // Unwrap wrapper types
162
).asJava
163
164
val deserializedDF = binaryDF.select(
165
from_protobuf(col("content"), "TypeRichMessage", descriptorFile, typeOptions) as "data"
166
)
167
```
168
169
### Any Field JSON Conversion
170
171
```scala
172
val anyOptions = Map(
173
"convert.any.fields.to.json" -> "true"
174
).asJava
175
176
// Any fields will be converted to JSON strings instead of binary structs
177
val deserializedDF = binaryDF.select(
178
from_protobuf(col("content"), "MessageWithAny", descriptorFile, anyOptions) as "data"
179
)
180
```
181
182
### Empty Message Handling
183
184
```scala
185
val emptyMessageOptions = Map(
186
"retain.empty.message.types" -> "true"
187
).asJava
188
189
// Empty protobuf messages will have dummy fields inserted to retain structure
190
val deserializedDF = binaryDF.select(
191
from_protobuf(col("content"), "EmptyMessage", descriptorFile, emptyMessageOptions) as "data"
192
)
193
```
194
195
### Comprehensive Configuration
196
197
```scala
198
val comprehensiveOptions = Map(
199
"mode" -> "PERMISSIVE",
200
"recursive.fields.max.depth" -> "3",
201
"emit.default.values" -> "true",
202
"enums.as.ints" -> "false",
203
"upcast.unsigned.ints" -> "true",
204
"unwrap.primitive.wrapper.types" -> "true",
205
"convert.any.fields.to.json" -> "true",
206
"retain.empty.message.types" -> "false"
207
).asJava
208
209
val fullyConfiguredDF = binaryDF.select(
210
from_protobuf(col("content"), "ComplexMessage", descriptorFile, comprehensiveOptions) as "data"
211
)
212
```
213
214
### Error Handling Strategies
215
216
```scala
217
// Strategy 1: Permissive mode with null filtering
218
val permissiveOptions = Map("mode" -> "PERMISSIVE").asJava
219
val tolerantDF = binaryDF.select(
220
from_protobuf(col("content"), "PersonMessage", descriptorFile, permissiveOptions) as "data"
221
).filter(col("data").isNotNull)
222
223
// Strategy 2: Fail-fast mode for strict validation
224
val strictOptions = Map("mode" -> "FAILFAST").asJava
225
val strictDF = binaryDF.select(
226
from_protobuf(col("content"), "PersonMessage", descriptorFile, strictOptions) as "data"
227
)
228
```
229
230
### Using ProtobufOptions Programmatically
231
232
```scala
233
import org.apache.spark.sql.protobuf.utils.ProtobufOptions
234
235
val programmaticOptions = ProtobufOptions(Map(
236
"mode" -> "PERMISSIVE",
237
"emit.default.values" -> "true"
238
))
239
240
// Access individual configuration values
241
println(s"Parse mode: ${programmaticOptions.parseMode}")
242
println(s"Emit defaults: ${programmaticOptions.emitDefaultValues}")
243
println(s"Recursion depth: ${programmaticOptions.recursiveFieldMaxDepth}")
244
```