or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-protobuf_2.13@3.5.x

docs

configuration.mddeserialization.mdindex.mdschema-conversion.mdserialization.md
tile.json

tessl/maven-org-apache-spark--spark-protobuf_2-13

tessl install tessl/maven-org-apache-spark--spark-protobuf_2-13@3.5.0

Apache Spark connector for Protocol Buffer (protobuf) data format support, providing SQL functions to convert between binary protobuf data and Catalyst data structures for processing structured data in distributed big data analytics pipelines

configuration.mddocs/

Configuration Options

Comprehensive configuration system for controlling protobuf processing behavior, error handling, and advanced features.

Capabilities

ProtobufOptions Class

Main configuration class providing case-insensitive parameter handling and Hadoop configuration integration.

/**
 * Options for Protobuf Reader and Writer stored in case insensitive manner
 * @param parameters - Case insensitive map of configuration parameters
 * @param conf - Hadoop configuration for file system access
 */
class ProtobufOptions(
  parameters: CaseInsensitiveMap[String],
  conf: Configuration
) extends FileSourceOptions(parameters) with Logging {
  
  /** Parse mode for error handling (FailFastMode by default) */
  val parseMode: ParseMode
  
  /** Maximum depth for recursive fields (default: -1, max: 10) */
  val recursiveFieldMaxDepth: Int
  
  /** Enable converting Protobuf Any fields to JSON (default: false) */
  val convertAnyFieldsToJson: Boolean
  
  /** Render fields with zero values when deserializing (default: false) */
  val emitDefaultValues: Boolean
  
  /** Render enum fields as integer values (default: false) */
  val enumsAsInts: Boolean
}

ProtobufOptions Factory

Factory methods for creating ProtobufOptions instances.

/**
 * Factory object for creating ProtobufOptions instances
 */
object ProtobufOptions {
  /**
   * Create ProtobufOptions from parameter map using active Spark session
   * @param parameters - Configuration parameters as key-value pairs
   * @return ProtobufOptions instance with Hadoop configuration
   */
  def apply(parameters: Map[String, String]): ProtobufOptions
  
  /** Configuration key for Any field JSON conversion */
  val CONVERT_ANY_FIELDS_TO_JSON_CONFIG: String = "convert.any.fields.to.json"
}

Configuration Options Reference

Parse Mode Options

Control error handling behavior during protobuf processing.

// Available parse modes
sealed trait ParseMode
case object FailFastMode extends ParseMode  // Fail immediately on errors (default)
case object PermissiveMode extends ParseMode  // Continue processing, set invalid records to null

Usage:

import java.util.{Map => JMap}
import scala.collection.JavaConverters._

val options: JMap[String, String] = Map(
  "mode" -> "FAILFAST"  // or "PERMISSIVE"
).asJava

Recursive Field Depth

Controls handling of recursive message fields to prevent infinite schema expansion.

val recursiveOptions: JMap[String, String] = Map(
  "recursive.fields.max.depth" -> "3"  // Allow 3 levels of recursion
).asJava

Values:

  • -1 (default): Recursive fields not permitted
  • 0: Drop recursive fields
  • 1-10: Allow recursion up to specified depth
  • >10: Not allowed to avoid large schemas

Example with recursive Person message:

// message Person { string name = 1; Person friend = 2; }

// Depth 0: struct<name: string>
// Depth 1: struct<name: string, friend: struct<name: string>>  
// Depth 2: struct<name: string, friend: struct<name: string, friend: struct<name: string>>>

Any Field JSON Conversion

Enable converting Protobuf google.protobuf.Any fields to JSON strings.

val anyFieldOptions: JMap[String, String] = Map(
  "convert.any.fields.to.json" -> "true"
).asJava

Behavior:

  • false (default): Any fields as STRUCT<type_url: STRING, value: BINARY>
  • true: Any fields as JSON strings with type information

Requirements:

  • All possible protobuf types must be available in descriptor file
  • JSON conversion is less efficient than binary processing
  • Schema safety is reduced

Default Value Emission

Control whether to render type-specific zero values for empty protobuf fields.

val defaultValueOptions: JMap[String, String] = Map(
  "emit.default.values" -> "true"
).asJava

Behavior:

  • false (default): Empty fields become null
  • true: Empty fields get type-specific default values (empty string, 0, false, etc.)

Example:

// Proto3 message: Person { string name = 1; int64 age = 2; optional string middle_name = 3; }
// Serialized as: Person(age=0, middle_name="")

// With emit.default.values = false:
// {"name": null, "age": null, "middle_name": "", "salary": null}

// With emit.default.values = true:  
// {"name": "", "age": 0, "middle_name": "", "salary": null}

Enum Value Representation

Control whether enum fields are rendered as strings or integers.

val enumOptions: JMap[String, String] = Map(
  "enums.as.ints" -> "true"
).asJava

Behavior:

  • false (default): Enums as string names ("ACTIVE")
  • true: Enums as integer values (1)

Schema Impact:

  • String mode: Results in StringType columns
  • Integer mode: Results in IntegerType columns

Usage Examples

Basic Configuration

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val basicOptions: java.util.Map[String, String] = Map(
  "mode" -> "PERMISSIVE"
).asJava

val df = binaryData.select(
  from_protobuf($"data", "MessageType", descriptorPath, basicOptions).as("parsed")
)

Comprehensive Configuration

val advancedOptions: java.util.Map[String, String] = Map(
  "mode" -> "PERMISSIVE",
  "recursive.fields.max.depth" -> "3",
  "convert.any.fields.to.json" -> "true", 
  "emit.default.values" -> "true",
  "enums.as.ints" -> "false"
).asJava

val processedDF = inputDF.select(
  from_protobuf($"protobuf_data", "ComplexMessage", descriptorBytes, advancedOptions)
    .as("complex_message")
)

Schema-Specific Configuration

// For messages with deep nesting
val deepNestingOptions = Map(
  "recursive.fields.max.depth" -> "5",
  "mode" -> "FAILFAST"
).asJava

// For messages with Any fields
val anyFieldOptions = Map(
  "convert.any.fields.to.json" -> "true",
  "mode" -> "PERMISSIVE"  // Continue processing if Any field conversion fails
).asJava

// For legacy proto2 messages
val proto2Options = Map(
  "emit.default.values" -> "false",  // Preserve original proto2 behavior
  "enums.as.ints" -> "false"
).asJava

Runtime Configuration

import org.apache.spark.sql.protobuf.utils.ProtobufOptions

// Create options instance for schema conversion
val schemaOptions = ProtobufOptions(Map(
  "recursive.fields.max.depth" -> "2",
  "enums.as.ints" -> "true"
))

// Use with schema conversion
val schema = SchemaConverters.toSqlType(descriptor, schemaOptions)

Configuration Validation

def validateOptions(options: Map[String, String]): Map[String, String] = {
  val validatedOptions = scala.collection.mutable.Map(options.toSeq: _*)
  
  // Validate recursive depth
  options.get("recursive.fields.max.depth").foreach { depth =>
    val depthInt = depth.toInt
    if (depthInt < -1 || depthInt > 10) {
      throw new IllegalArgumentException(
        s"recursive.fields.max.depth must be -1 to 10, got: $depthInt"
      )
    }
  }
  
  // Validate parse mode
  options.get("mode").foreach { mode =>
    if (!Set("FAILFAST", "PERMISSIVE").contains(mode.toUpperCase)) {
      throw new IllegalArgumentException(
        s"mode must be FAILFAST or PERMISSIVE, got: $mode"
      )
    }
  }
  
  // Validate boolean options
  Seq("convert.any.fields.to.json", "emit.default.values", "enums.as.ints").foreach { key =>
    options.get(key).foreach { value =>
      if (!Set("true", "false").contains(value.toLowerCase)) {
        throw new IllegalArgumentException(
          s"$key must be true or false, got: $value"
        )
      }
    }
  }
  
  validatedOptions.toMap
}

// Usage
try {
  val validOptions = validateOptions(userOptions)
  val processedData = inputDF.select(
    from_protobuf($"data", "MessageType", descriptorPath, validOptions.asJava)
  )
} catch {
  case e: IllegalArgumentException =>
    println(s"Invalid configuration: ${e.getMessage}")
}

Environment-Specific Configuration

import org.apache.spark.sql.SparkSession

// Development environment - permissive with detailed error info
val devOptions = Map(
  "mode" -> "PERMISSIVE",
  "emit.default.values" -> "true",
  "convert.any.fields.to.json" -> "true"
).asJava

// Production environment - strict validation  
val prodOptions = Map(
  "mode" -> "FAILFAST",
  "emit.default.values" -> "false",
  "recursive.fields.max.depth" -> "3"
).asJava

// Configuration based on environment
val spark = SparkSession.builder().getOrCreate()
val environment = spark.conf.get("spark.app.environment", "development")

val options = environment match {
  case "production" => prodOptions
  case _ => devOptions
}

Performance Optimization Configuration

// Optimized for high-throughput processing
val performanceOptions = Map(
  "mode" -> "PERMISSIVE",  // Don't fail on individual record errors
  "recursive.fields.max.depth" -> "2",  // Limit deep nesting  
  "convert.any.fields.to.json" -> "false",  // Avoid expensive JSON conversion
  "emit.default.values" -> "false"  // Skip unnecessary default processing
).asJava

// Optimized for data quality
val qualityOptions = Map(
  "mode" -> "FAILFAST",  // Strict validation
  "emit.default.values" -> "true",  // Complete data representation
  "convert.any.fields.to.json" -> "true"  // Full Any field processing
).asJava

Configuration Constants

object ProtobufOptions {
  /** Configuration key for Any field JSON conversion */
  val CONVERT_ANY_FIELDS_TO_JSON_CONFIG: String = "convert.any.fields.to.json"
  
  // Other commonly used configuration keys
  val MODE_CONFIG = "mode"
  val RECURSIVE_DEPTH_CONFIG = "recursive.fields.max.depth"  
  val EMIT_DEFAULTS_CONFIG = "emit.default.values"
  val ENUMS_AS_INTS_CONFIG = "enums.as.ints"
}

Error Handling

Configuration-related errors and their solutions:

// Handle configuration errors
try {
  val result = inputDF.select(
    from_protobuf($"data", "MessageType", descriptorPath, options).as("parsed")
  )
  result.show()
} catch {
  case e: IllegalArgumentException if e.getMessage.contains("recursive.fields.max.depth") =>
    println("Invalid recursive depth configuration")
  case e: IllegalArgumentException if e.getMessage.contains("mode") =>
    println("Invalid parse mode configuration") 
  case e: RuntimeException if e.getMessage.contains("Any field") =>
    println("Any field processing failed - check descriptor completeness")
  case e: Exception =>
    println(s"Configuration error: ${e.getMessage}")
}