Apache Spark connector for Protocol Buffers data source enabling seamless protobuf serialization and deserialization in Spark SQL.
Comprehensive configuration system for controlling protobuf processing behavior including parse modes, type conversions, schema handling, and field processing options.
Central configuration class for customizing protobuf serialization and deserialization behavior.
/**
* Options for Protobuf Reader and Writer stored in case insensitive manner.
* @param parameters configuration parameters as case-insensitive map
* @param conf Hadoop configuration object
*/
class ProtobufOptions(
parameters: CaseInsensitiveMap[String],
conf: Configuration
) extends FileSourceOptions(parameters) with Logging
/**
* Secondary constructor accepting regular Map
* @param parameters configuration parameters
* @param conf Hadoop configuration object
*/
def this(parameters: Map[String, String], conf: Configuration)
/**
* Factory method for creating ProtobufOptions with default Hadoop configuration
* @param parameters configuration parameters
* @return ProtobufOptions instance
*/
object ProtobufOptions {
def apply(parameters: Map[String, String]): ProtobufOptions
/** Configuration key for converting Any fields to JSON */
val CONVERT_ANY_FIELDS_TO_JSON_CONFIG: String = "convert.any.fields.to.json"
}Controls how malformed protobuf data is handled during deserialization.
/** Parse mode for handling malformed data (PERMISSIVE or FAILFAST) */
val parseMode: ParseModeConfiguration Key: "mode"
Valid Values:
"PERMISSIVE" - Malformed records become null values"FAILFAST" - Malformed records cause job failureControls handling of recursive protobuf message types to prevent infinite schema expansion.
/** Maximum recursion depth for recursive fields (-1 to 10) */
val recursiveFieldMaxDepth: IntConfiguration Key: "recursive.fields.max.depth"
Valid Values: -1 (no recursion), 1-10 (recursion depth limit)
Default: -1
Controls how protobuf types are converted to Spark SQL types.
/** Whether to render enum fields as their integer values instead of strings */
val enumsAsInts: Boolean
/** Whether to upcast unsigned integers to larger types to prevent overflow */
val upcastUnsignedInts: Boolean
/** Whether to unwrap well-known primitive wrapper types */
val unwrapWellKnownTypes: BooleanConfiguration Keys:
"enums.as.ints" - Default: false"upcast.unsigned.ints" - Default: false"unwrap.primitive.wrapper.types" - Default: falseControls emission of default values for zero-value fields in proto3.
/** Whether to render fields with zero values when deserializing */
val emitDefaultValues: BooleanConfiguration Key: "emit.default.values"
Default: false
Controls conversion of protobuf Any fields to JSON strings.
/** Whether to convert protobuf Any fields to JSON strings */
val convertAnyFieldsToJson: BooleanConfiguration Key: "convert.any.fields.to.json"
Default: false
Controls handling of empty protobuf message types in Spark schemas.
/** Whether to retain empty proto message types by inserting dummy fields */
val retainEmptyMessage: BooleanConfiguration Key: "retain.empty.message.types"
Default: false
import org.apache.spark.sql.protobuf.utils.ProtobufOptions
import scala.jdk.CollectionConverters._
val options = Map(
"mode" -> "PERMISSIVE",
"emit.default.values" -> "true"
).asJava
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", descriptorFile, options) as "person_data"
)// Allow 2 levels of recursion for recursive message types
val recursiveOptions = Map(
"recursive.fields.max.depth" -> "2"
).asJava
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "RecursiveMessage", descriptorFile, recursiveOptions) as "data"
)val typeOptions = Map(
"enums.as.ints" -> "true", // Render enums as integers
"upcast.unsigned.ints" -> "true", // Prevent unsigned integer overflow
"unwrap.primitive.wrapper.types" -> "true" // Unwrap wrapper types
).asJava
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "TypeRichMessage", descriptorFile, typeOptions) as "data"
)val anyOptions = Map(
"convert.any.fields.to.json" -> "true"
).asJava
// Any fields will be converted to JSON strings instead of binary structs
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "MessageWithAny", descriptorFile, anyOptions) as "data"
)val emptyMessageOptions = Map(
"retain.empty.message.types" -> "true"
).asJava
// Empty protobuf messages will have dummy fields inserted to retain structure
val deserializedDF = binaryDF.select(
from_protobuf(col("content"), "EmptyMessage", descriptorFile, emptyMessageOptions) as "data"
)val comprehensiveOptions = Map(
"mode" -> "PERMISSIVE",
"recursive.fields.max.depth" -> "3",
"emit.default.values" -> "true",
"enums.as.ints" -> "false",
"upcast.unsigned.ints" -> "true",
"unwrap.primitive.wrapper.types" -> "true",
"convert.any.fields.to.json" -> "true",
"retain.empty.message.types" -> "false"
).asJava
val fullyConfiguredDF = binaryDF.select(
from_protobuf(col("content"), "ComplexMessage", descriptorFile, comprehensiveOptions) as "data"
)// Strategy 1: Permissive mode with null filtering
val permissiveOptions = Map("mode" -> "PERMISSIVE").asJava
val tolerantDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", descriptorFile, permissiveOptions) as "data"
).filter(col("data").isNotNull)
// Strategy 2: Fail-fast mode for strict validation
val strictOptions = Map("mode" -> "FAILFAST").asJava
val strictDF = binaryDF.select(
from_protobuf(col("content"), "PersonMessage", descriptorFile, strictOptions) as "data"
)import org.apache.spark.sql.protobuf.utils.ProtobufOptions
val programmaticOptions = ProtobufOptions(Map(
"mode" -> "PERMISSIVE",
"emit.default.values" -> "true"
))
// Access individual configuration values
println(s"Parse mode: ${programmaticOptions.parseMode}")
println(s"Emit defaults: ${programmaticOptions.emitDefaultValues}")
println(s"Recursion depth: ${programmaticOptions.recursiveFieldMaxDepth}")Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-protobuf-2-13