or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdfrom-protobuf.mdindex.mdschema-utilities.mdto-protobuf.md
tile.json

configuration.mddocs/

Configuration and Options

Comprehensive configuration system for controlling protobuf processing behavior including parse modes, type conversions, schema handling, and field processing options.

Capabilities

ProtobufOptions Class

Central configuration class for customizing protobuf serialization and deserialization behavior.

/**
 * Options for Protobuf Reader and Writer stored in case insensitive manner.
 * @param parameters configuration parameters as case-insensitive map
 * @param conf Hadoop configuration object
 */
class ProtobufOptions(
  parameters: CaseInsensitiveMap[String], 
  conf: Configuration
) extends FileSourceOptions(parameters) with Logging

/**
 * Secondary constructor accepting regular Map
 * @param parameters configuration parameters
 * @param conf Hadoop configuration object  
 */
def this(parameters: Map[String, String], conf: Configuration)

/**
 * Factory method for creating ProtobufOptions with default Hadoop configuration
 * @param parameters configuration parameters
 * @return ProtobufOptions instance
 */
object ProtobufOptions {
  def apply(parameters: Map[String, String]): ProtobufOptions
  
  /** Configuration key for converting Any fields to JSON */
  val CONVERT_ANY_FIELDS_TO_JSON_CONFIG: String = "convert.any.fields.to.json"
}

Parse Mode Configuration

Controls how malformed protobuf data is handled during deserialization.

/** Parse mode for handling malformed data (PERMISSIVE or FAILFAST) */
val parseMode: ParseMode

Configuration Key: "mode" Valid Values:

  • "PERMISSIVE" - Malformed records become null values
  • "FAILFAST" - Malformed records cause job failure

Recursive Field Configuration

Controls handling of recursive protobuf message types to prevent infinite schema expansion.

/** Maximum recursion depth for recursive fields (-1 to 10) */
val recursiveFieldMaxDepth: Int

Configuration Key: "recursive.fields.max.depth" Valid Values: -1 (no recursion), 1-10 (recursion depth limit) Default: -1

Type Conversion Options

Controls how protobuf types are converted to Spark SQL types.

/** Whether to render enum fields as their integer values instead of strings */
val enumsAsInts: Boolean

/** Whether to upcast unsigned integers to larger types to prevent overflow */  
val upcastUnsignedInts: Boolean

/** Whether to unwrap well-known primitive wrapper types */
val unwrapWellKnownTypes: Boolean

Configuration Keys:

  • "enums.as.ints" - Default: false
  • "upcast.unsigned.ints" - Default: false
  • "unwrap.primitive.wrapper.types" - Default: false

Default Value Configuration

Controls emission of default values for zero-value fields in proto3.

/** Whether to render fields with zero values when deserializing */
val emitDefaultValues: Boolean

Configuration Key: "emit.default.values" Default: false

Any Field Configuration

Controls conversion of protobuf Any fields to JSON strings.

/** Whether to convert protobuf Any fields to JSON strings */
val convertAnyFieldsToJson: Boolean

Configuration Key: "convert.any.fields.to.json" Default: false

Empty Message Configuration

Controls handling of empty protobuf message types in Spark schemas.

/** Whether to retain empty proto message types by inserting dummy fields */
val retainEmptyMessage: Boolean

Configuration Key: "retain.empty.message.types" Default: false

Usage Examples

Basic Configuration

import org.apache.spark.sql.protobuf.utils.ProtobufOptions
import scala.jdk.CollectionConverters._

val options = Map(
  "mode" -> "PERMISSIVE",
  "emit.default.values" -> "true"
).asJava

val deserializedDF = binaryDF.select(
  from_protobuf(col("content"), "PersonMessage", descriptorFile, options) as "person_data"
)

Recursive Field Handling

// Allow 2 levels of recursion for recursive message types
val recursiveOptions = Map(
  "recursive.fields.max.depth" -> "2"
).asJava

val deserializedDF = binaryDF.select(
  from_protobuf(col("content"), "RecursiveMessage", descriptorFile, recursiveOptions) as "data"
)

Type Conversion Options

val typeOptions = Map(
  "enums.as.ints" -> "true",           // Render enums as integers
  "upcast.unsigned.ints" -> "true",    // Prevent unsigned integer overflow
  "unwrap.primitive.wrapper.types" -> "true"  // Unwrap wrapper types
).asJava

val deserializedDF = binaryDF.select(
  from_protobuf(col("content"), "TypeRichMessage", descriptorFile, typeOptions) as "data"
)

Any Field JSON Conversion

val anyOptions = Map(
  "convert.any.fields.to.json" -> "true"
).asJava

// Any fields will be converted to JSON strings instead of binary structs
val deserializedDF = binaryDF.select(
  from_protobuf(col("content"), "MessageWithAny", descriptorFile, anyOptions) as "data"
)

Empty Message Handling

val emptyMessageOptions = Map(
  "retain.empty.message.types" -> "true"
).asJava

// Empty protobuf messages will have dummy fields inserted to retain structure
val deserializedDF = binaryDF.select(
  from_protobuf(col("content"), "EmptyMessage", descriptorFile, emptyMessageOptions) as "data"
)

Comprehensive Configuration

val comprehensiveOptions = Map(
  "mode" -> "PERMISSIVE",
  "recursive.fields.max.depth" -> "3",
  "emit.default.values" -> "true",
  "enums.as.ints" -> "false",
  "upcast.unsigned.ints" -> "true",
  "unwrap.primitive.wrapper.types" -> "true",
  "convert.any.fields.to.json" -> "true",
  "retain.empty.message.types" -> "false"
).asJava

val fullyConfiguredDF = binaryDF.select(
  from_protobuf(col("content"), "ComplexMessage", descriptorFile, comprehensiveOptions) as "data"
)

Error Handling Strategies

// Strategy 1: Permissive mode with null filtering
val permissiveOptions = Map("mode" -> "PERMISSIVE").asJava
val tolerantDF = binaryDF.select(
  from_protobuf(col("content"), "PersonMessage", descriptorFile, permissiveOptions) as "data"
).filter(col("data").isNotNull)

// Strategy 2: Fail-fast mode for strict validation
val strictOptions = Map("mode" -> "FAILFAST").asJava
val strictDF = binaryDF.select(
  from_protobuf(col("content"), "PersonMessage", descriptorFile, strictOptions) as "data"
)

Using ProtobufOptions Programmatically

import org.apache.spark.sql.protobuf.utils.ProtobufOptions

val programmaticOptions = ProtobufOptions(Map(
  "mode" -> "PERMISSIVE",
  "emit.default.values" -> "true"
))

// Access individual configuration values
println(s"Parse mode: ${programmaticOptions.parseMode}")
println(s"Emit defaults: ${programmaticOptions.emitDefaultValues}")
println(s"Recursion depth: ${programmaticOptions.recursiveFieldMaxDepth}")