tessl install tessl/maven-org-apache-spark--spark-protobuf_2-13@3.5.0Apache Spark connector for Protocol Buffer (protobuf) data format support, providing SQL functions to convert between binary protobuf data and Catalyst data structures for processing structured data in distributed big data analytics pipelines
Comprehensive configuration system for controlling protobuf processing behavior, error handling, and advanced features.
Main configuration class providing case-insensitive parameter handling and Hadoop configuration integration.
/**
* Options for Protobuf Reader and Writer stored in case insensitive manner
* @param parameters - Case insensitive map of configuration parameters
* @param conf - Hadoop configuration for file system access
*/
class ProtobufOptions(
parameters: CaseInsensitiveMap[String],
conf: Configuration
) extends FileSourceOptions(parameters) with Logging {
/** Parse mode for error handling (FailFastMode by default) */
val parseMode: ParseMode
/** Maximum depth for recursive fields (default: -1, max: 10) */
val recursiveFieldMaxDepth: Int
/** Enable converting Protobuf Any fields to JSON (default: false) */
val convertAnyFieldsToJson: Boolean
/** Render fields with zero values when deserializing (default: false) */
val emitDefaultValues: Boolean
/** Render enum fields as integer values (default: false) */
val enumsAsInts: Boolean
}Factory methods for creating ProtobufOptions instances.
/**
* Factory object for creating ProtobufOptions instances
*/
object ProtobufOptions {
/**
* Create ProtobufOptions from parameter map using active Spark session
* @param parameters - Configuration parameters as key-value pairs
* @return ProtobufOptions instance with Hadoop configuration
*/
def apply(parameters: Map[String, String]): ProtobufOptions
/** Configuration key for Any field JSON conversion */
val CONVERT_ANY_FIELDS_TO_JSON_CONFIG: String = "convert.any.fields.to.json"
}Control error handling behavior during protobuf processing.
// Available parse modes
sealed trait ParseMode
case object FailFastMode extends ParseMode // Fail immediately on errors (default)
case object PermissiveMode extends ParseMode // Continue processing, set invalid records to nullUsage:
import java.util.{Map => JMap}
import scala.collection.JavaConverters._
val options: JMap[String, String] = Map(
"mode" -> "FAILFAST" // or "PERMISSIVE"
).asJavaControls handling of recursive message fields to prevent infinite schema expansion.
val recursiveOptions: JMap[String, String] = Map(
"recursive.fields.max.depth" -> "3" // Allow 3 levels of recursion
).asJavaValues:
-1 (default): Recursive fields not permitted0: Drop recursive fields1-10: Allow recursion up to specified depth>10: Not allowed to avoid large schemasExample with recursive Person message:
// message Person { string name = 1; Person friend = 2; }
// Depth 0: struct<name: string>
// Depth 1: struct<name: string, friend: struct<name: string>>
// Depth 2: struct<name: string, friend: struct<name: string, friend: struct<name: string>>>Enable converting Protobuf google.protobuf.Any fields to JSON strings.
val anyFieldOptions: JMap[String, String] = Map(
"convert.any.fields.to.json" -> "true"
).asJavaBehavior:
false (default): Any fields as STRUCT<type_url: STRING, value: BINARY>true: Any fields as JSON strings with type informationRequirements:
Control whether to render type-specific zero values for empty protobuf fields.
val defaultValueOptions: JMap[String, String] = Map(
"emit.default.values" -> "true"
).asJavaBehavior:
false (default): Empty fields become nulltrue: Empty fields get type-specific default values (empty string, 0, false, etc.)Example:
// Proto3 message: Person { string name = 1; int64 age = 2; optional string middle_name = 3; }
// Serialized as: Person(age=0, middle_name="")
// With emit.default.values = false:
// {"name": null, "age": null, "middle_name": "", "salary": null}
// With emit.default.values = true:
// {"name": "", "age": 0, "middle_name": "", "salary": null}Control whether enum fields are rendered as strings or integers.
val enumOptions: JMap[String, String] = Map(
"enums.as.ints" -> "true"
).asJavaBehavior:
false (default): Enums as string names ("ACTIVE")true: Enums as integer values (1)Schema Impact:
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val basicOptions: java.util.Map[String, String] = Map(
"mode" -> "PERMISSIVE"
).asJava
val df = binaryData.select(
from_protobuf($"data", "MessageType", descriptorPath, basicOptions).as("parsed")
)val advancedOptions: java.util.Map[String, String] = Map(
"mode" -> "PERMISSIVE",
"recursive.fields.max.depth" -> "3",
"convert.any.fields.to.json" -> "true",
"emit.default.values" -> "true",
"enums.as.ints" -> "false"
).asJava
val processedDF = inputDF.select(
from_protobuf($"protobuf_data", "ComplexMessage", descriptorBytes, advancedOptions)
.as("complex_message")
)// For messages with deep nesting
val deepNestingOptions = Map(
"recursive.fields.max.depth" -> "5",
"mode" -> "FAILFAST"
).asJava
// For messages with Any fields
val anyFieldOptions = Map(
"convert.any.fields.to.json" -> "true",
"mode" -> "PERMISSIVE" // Continue processing if Any field conversion fails
).asJava
// For legacy proto2 messages
val proto2Options = Map(
"emit.default.values" -> "false", // Preserve original proto2 behavior
"enums.as.ints" -> "false"
).asJavaimport org.apache.spark.sql.protobuf.utils.ProtobufOptions
// Create options instance for schema conversion
val schemaOptions = ProtobufOptions(Map(
"recursive.fields.max.depth" -> "2",
"enums.as.ints" -> "true"
))
// Use with schema conversion
val schema = SchemaConverters.toSqlType(descriptor, schemaOptions)def validateOptions(options: Map[String, String]): Map[String, String] = {
val validatedOptions = scala.collection.mutable.Map(options.toSeq: _*)
// Validate recursive depth
options.get("recursive.fields.max.depth").foreach { depth =>
val depthInt = depth.toInt
if (depthInt < -1 || depthInt > 10) {
throw new IllegalArgumentException(
s"recursive.fields.max.depth must be -1 to 10, got: $depthInt"
)
}
}
// Validate parse mode
options.get("mode").foreach { mode =>
if (!Set("FAILFAST", "PERMISSIVE").contains(mode.toUpperCase)) {
throw new IllegalArgumentException(
s"mode must be FAILFAST or PERMISSIVE, got: $mode"
)
}
}
// Validate boolean options
Seq("convert.any.fields.to.json", "emit.default.values", "enums.as.ints").foreach { key =>
options.get(key).foreach { value =>
if (!Set("true", "false").contains(value.toLowerCase)) {
throw new IllegalArgumentException(
s"$key must be true or false, got: $value"
)
}
}
}
validatedOptions.toMap
}
// Usage
try {
val validOptions = validateOptions(userOptions)
val processedData = inputDF.select(
from_protobuf($"data", "MessageType", descriptorPath, validOptions.asJava)
)
} catch {
case e: IllegalArgumentException =>
println(s"Invalid configuration: ${e.getMessage}")
}import org.apache.spark.sql.SparkSession
// Development environment - permissive with detailed error info
val devOptions = Map(
"mode" -> "PERMISSIVE",
"emit.default.values" -> "true",
"convert.any.fields.to.json" -> "true"
).asJava
// Production environment - strict validation
val prodOptions = Map(
"mode" -> "FAILFAST",
"emit.default.values" -> "false",
"recursive.fields.max.depth" -> "3"
).asJava
// Configuration based on environment
val spark = SparkSession.builder().getOrCreate()
val environment = spark.conf.get("spark.app.environment", "development")
val options = environment match {
case "production" => prodOptions
case _ => devOptions
}// Optimized for high-throughput processing
val performanceOptions = Map(
"mode" -> "PERMISSIVE", // Don't fail on individual record errors
"recursive.fields.max.depth" -> "2", // Limit deep nesting
"convert.any.fields.to.json" -> "false", // Avoid expensive JSON conversion
"emit.default.values" -> "false" // Skip unnecessary default processing
).asJava
// Optimized for data quality
val qualityOptions = Map(
"mode" -> "FAILFAST", // Strict validation
"emit.default.values" -> "true", // Complete data representation
"convert.any.fields.to.json" -> "true" // Full Any field processing
).asJavaobject ProtobufOptions {
/** Configuration key for Any field JSON conversion */
val CONVERT_ANY_FIELDS_TO_JSON_CONFIG: String = "convert.any.fields.to.json"
// Other commonly used configuration keys
val MODE_CONFIG = "mode"
val RECURSIVE_DEPTH_CONFIG = "recursive.fields.max.depth"
val EMIT_DEFAULTS_CONFIG = "emit.default.values"
val ENUMS_AS_INTS_CONFIG = "enums.as.ints"
}Configuration-related errors and their solutions:
// Handle configuration errors
try {
val result = inputDF.select(
from_protobuf($"data", "MessageType", descriptorPath, options).as("parsed")
)
result.show()
} catch {
case e: IllegalArgumentException if e.getMessage.contains("recursive.fields.max.depth") =>
println("Invalid recursive depth configuration")
case e: IllegalArgumentException if e.getMessage.contains("mode") =>
println("Invalid parse mode configuration")
case e: RuntimeException if e.getMessage.contains("Any field") =>
println("Any field processing failed - check descriptor completeness")
case e: Exception =>
println(s"Configuration error: ${e.getMessage}")
}