Comprehensive configuration system for controlling protobuf processing behavior including parse modes, type conversions, schema handling, and field processing options.
Central configuration class for customizing protobuf serialization and deserialization behavior.
/**
* Options for Protobuf Reader and Writer stored in case insensitive manner.
* @param parameters configuration parameters as case-insensitive map
* @param conf Hadoop configuration object
*/
class ProtobufOptions(
parameters: CaseInsensitiveMap[String],
conf: Configuration
) extends FileSourceOptions(parameters) with Logging
/**
* Secondary constructor accepting regular Map
* @param parameters configuration parameters
* @param conf Hadoop configuration object
*/
def this(parameters: Map[String, String], conf: Configuration)
/**
* Factory method for creating ProtobufOptions with default Hadoop configuration
* @param parameters configuration parameters
* @return ProtobufOptions instance
*/
object ProtobufOptions {
def apply(parameters: Map[String, String]): ProtobufOptions
/** Configuration key for converting Any fields to JSON */
val CONVERT_ANY_FIELDS_TO_JSON_CONFIG: String = "convert.any.fields.to.json"
}Controls how malformed protobuf data is handled during deserialization.
/** Parse mode for handling malformed data (PERMISSIVE or FAILFAST) */
val parseMode: ParseModeConfiguration Key: "mode"
Valid Values:
"PERMISSIVE" - Malformed records become null values"FAILFAST" - Malformed records cause job failureControls handling of recursive protobuf message types to prevent infinite schema expansion.
/** Maximum recursion depth for recursive fields (-1 to 10) */
val recursiveFieldMaxDepth: IntConfiguration Key: "recursive.fields.max.depth"
Valid Values: -1 (no recursion), 1-10 (recursion depth limit)
Default: -1
Controls how protobuf types are converted to Spark SQL types.
/** Whether to render enum fields as their integer values instead of strings */
val enumsAsInts: Boolean
/** Whether to upcast unsigned integers to larger types to prevent overflow */
val upcastUnsignedInts: Boolean
/** Whether to unwrap well-known primitive wrapper types */
val unwrapWellKnownTypes: BooleanConfiguration Keys:
"enums.as.ints" - Default: false"upcast.unsigned.ints" - Default: false"unwrap.primitive.wrapper.types" - Default: falseControls emission of default values for zero-value fields in proto3.
/** Whether to render fields with zero values when deserializing */
val emitDefaultValues: BooleanConfiguration Key: "emit.default.values"
Default: false
Controls conversion of protobuf Any fields to JSON strings.
/** Whether to convert protobuf Any fields to JSON strings */
val convertAnyFieldsToJson: BooleanConfiguration Key: "convert.any.fields.to.json"
Default: false
Controls handling of empty protobuf message types in Spark schemas.
/** Whether to retain empty proto message types by inserting dummy fields */
val retainEmptyMessage: BooleanConfiguration Key: "retain.empty.message.types"
Default: false
import org.apache.spark.sql.protobuf.utils.ProtobufOptions
import scala.jdk.CollectionConverters._
val options = Map(
"mode" -> "PERMISSIVE",
"emit.default.values" -> "true"
).asJava
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", descriptorFile, options) as "person_data"
)// Allow 2 levels of recursion for recursive message types
val recursiveOptions = Map(
"recursive.fields.max.depth" -> "2"
).asJava
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "RecursiveMessage", descriptorFile, recursiveOptions) as "data"
)val typeOptions = Map(
"enums.as.ints" -> "true", // Render enums as integers
"upcast.unsigned.ints" -> "true", // Prevent unsigned integer overflow
"unwrap.primitive.wrapper.types" -> "true" // Unwrap wrapper types
).asJava
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "TypeRichMessage", descriptorFile, typeOptions) as "data"
)val anyOptions = Map(
"convert.any.fields.to.json" -> "true"
).asJava
// Any fields will be converted to JSON strings instead of binary structs
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "MessageWithAny", descriptorFile, anyOptions) as "data"
)val emptyMessageOptions = Map(
"retain.empty.message.types" -> "true"
).asJava
// Empty protobuf messages will have dummy fields inserted to retain structure
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "EmptyMessage", descriptorFile, emptyMessageOptions) as "data"
)val comprehensiveOptions = Map(
"mode" -> "PERMISSIVE",
"recursive.fields.max.depth" -> "3",
"emit.default.values" -> "true",
"enums.as.ints" -> "false",
"upcast.unsigned.ints" -> "true",
"unwrap.primitive.wrapper.types" -> "true",
"convert.any.fields.to.json" -> "true",
"retain.empty.message.types" -> "false"
).asJava
val fullyConfiguredDF = binaryDF.select(
from_protobuf(col("content"), "ComplexMessage", descriptorFile, comprehensiveOptions) as "data"
)// Strategy 1: Permissive mode with null filtering
val permissiveOptions = Map("mode" -> "PERMISSIVE").asJava
val tolerantDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", descriptorFile, permissiveOptions) as "data"
).filter(col("data").isNotNull)
// Strategy 2: Fail-fast mode for strict validation
val strictOptions = Map("mode" -> "FAILFAST").asJava
val strictDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", descriptorFile, strictOptions) as "data"
)import org.apache.spark.sql.protobuf.utils.ProtobufOptions
val programmaticOptions = ProtobufOptions(Map(
"mode" -> "PERMISSIVE",
"emit.default.values" -> "true"
))
// Access individual configuration values
println(s"Parse mode: ${programmaticOptions.parseMode}")
println(s"Emit defaults: ${programmaticOptions.emitDefaultValues}")
println(s"Recursion depth: ${programmaticOptions.recursiveFieldMaxDepth}")