or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-protobuf_2.13@3.5.x
tile.json

tessl/maven-org-apache-spark--spark-protobuf_2-13

tessl install tessl/maven-org-apache-spark--spark-protobuf_2-13@3.5.0

Apache Spark connector for Protocol Buffer (protobuf) data format support, providing SQL functions to convert between binary protobuf data and Catalyst data structures for processing structured data in distributed big data analytics pipelines

index.mddocs/

Apache Spark Protobuf Connector

Apache Spark connector for Protocol Buffer (protobuf) data format support, providing SQL functions to convert between binary protobuf data and Catalyst data structures for processing structured data in distributed big data analytics pipelines.

Package Information

  • Package Name: spark-protobuf_2.13
  • Package Type: maven
  • Language: Scala
  • Installation: maven:org.apache.spark:spark-protobuf_2.13:3.5.6

Core Imports

import org.apache.spark.sql.protobuf.functions._

For utility classes:

import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, SchemaConverters}

Basic Usage

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.protobuf.functions._

val spark = SparkSession.builder().appName("ProtobufExample").getOrCreate()
import spark.implicits._

// Convert binary protobuf data to Spark DataFrame
val df = Seq(protobufBinaryData).toDF("binary_data")

// Using descriptor file
val decoded = df.select(
  from_protobuf($"binary_data", "MessageName", "/path/to/descriptor.desc").as("decoded")
)

// Convert DataFrame back to protobuf binary
val encoded = decoded.select(
  to_protobuf($"decoded", "MessageName", "/path/to/descriptor.desc").as("binary")
)

Architecture

The Spark Protobuf connector is built around several key components:

  • Functions API: Primary SQL functions (from_protobuf, to_protobuf) for data conversion
  • Schema Conversion: Automatic translation between Protobuf schemas and Spark SQL data types
  • Multiple Input Formats: Support for descriptor files, FileDescriptorSet binaries, and Java classes
  • Configuration Options: Extensive options for handling recursive fields, Any types, enums, and default values
  • Error Handling: Robust error handling with different parse modes (FAILFAST, PERMISSIVE)

Capabilities

Protobuf to Catalyst Conversion

Convert binary protobuf data to Spark SQL data structures with schema inference and type safety.

// Using descriptor file
def from_protobuf(data: Column, messageName: String, descFilePath: String): Column

// Using descriptor file with options
def from_protobuf(
  data: Column, 
  messageName: String, 
  descFilePath: String, 
  options: java.util.Map[String, String]
): Column

// Using binary FileDescriptorSet
def from_protobuf(
  data: Column, 
  messageName: String, 
  binaryFileDescriptorSet: Array[Byte]
): Column

// Using Java class
def from_protobuf(data: Column, messageClassName: String): Column

Protobuf Deserialization

Catalyst to Protobuf Conversion

Convert Spark SQL data structures to binary protobuf format for output and storage.

// Using descriptor file  
def to_protobuf(data: Column, messageName: String, descFilePath: String): Column

// Using descriptor file with options
def to_protobuf(
  data: Column, 
  messageName: String, 
  descFilePath: String, 
  options: java.util.Map[String, String]
): Column

// Using binary FileDescriptorSet
def to_protobuf(
  data: Column, 
  messageName: String, 
  binaryFileDescriptorSet: Array[Byte]
): Column

// Using Java class
def to_protobuf(data: Column, messageClassName: String): Column

Protobuf Serialization

Schema Conversion Utilities

Tools for converting between Protobuf descriptors and Spark SQL schemas with advanced type mapping.

object SchemaConverters {
  def toSqlType(
    descriptor: Descriptor, 
    protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)
  ): SchemaType
}

case class SchemaType(dataType: DataType, nullable: Boolean)

Schema Conversion

Configuration and Options

Comprehensive configuration system for controlling protobuf processing behavior, error handling, and advanced features.

class ProtobufOptions(
  parameters: CaseInsensitiveMap[String], 
  conf: Configuration
)

object ProtobufOptions {
  def apply(parameters: Map[String, String]): ProtobufOptions
}

Configuration Options

Types

// Core Spark types
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.{DataType, StructType, StructField}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, ParseMode, FailFastMode, PermissiveMode}
import org.apache.spark.sql.catalyst.FileSourceOptions
import org.apache.hadoop.conf.Configuration

// Protobuf types
import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
import com.google.protobuf.TypeRegistry

// Utility types
case class ProtoMatchedField(
  catalystField: StructField,
  catalystPosition: Int, 
  fieldDescriptor: FieldDescriptor
)

case class SchemaType(dataType: DataType, nullable: Boolean)

class ProtoSchemaHelper(
  descriptor: Descriptor,
  catalystSchema: StructType,
  protoPath: Seq[String],
  catalystPath: Seq[String]
) {
  val matchedFields: Seq[ProtoMatchedField]
  def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit
  def validateNoExtraRequiredProtoFields(): Unit
  def getFieldByName(name: String): Option[FieldDescriptor]
}

class ProtobufOptions(
  parameters: CaseInsensitiveMap[String],
  conf: Configuration
) extends FileSourceOptions(parameters) {
  val parseMode: ParseMode
  val recursiveFieldMaxDepth: Int
  val convertAnyFieldsToJson: Boolean
  val emitDefaultValues: Boolean
  val enumsAsInts: Boolean
}

object ProtobufOptions {
  def apply(parameters: Map[String, String]): ProtobufOptions
  val CONVERT_ANY_FIELDS_TO_JSON_CONFIG: String
}

object ProtobufUtils {
  def buildDescriptor(messageName: String, binaryFileDescriptorSet: Option[Array[Byte]]): Descriptor
  def buildDescriptor(binaryFileDescriptorSet: Array[Byte], messageName: String): Descriptor
  def buildDescriptorFromJavaClass(protobufClassName: String): Descriptor
  def readDescriptorFileContent(filePath: String): Array[Byte]
  def buildTypeRegistry(descriptorBytes: Array[Byte]): TypeRegistry
  def buildTypeRegistry(descriptor: Descriptor): TypeRegistry
}

object SchemaConverters {
  def toSqlType(descriptor: Descriptor, protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): SchemaType
}