or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdfrom-protobuf.mdindex.mdschema-utilities.mdto-protobuf.md
tile.json

tessl/maven-org-apache-spark--spark-protobuf_2-13

Apache Spark connector for Protocol Buffers data source enabling seamless protobuf serialization and deserialization in Spark SQL.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-protobuf_2.13@4.0.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-protobuf_2-13@4.0.0

index.mddocs/

Apache Spark Protobuf Connector

Apache Spark Protobuf Connector enables seamless serialization and deserialization of Protocol Buffers data within Apache Spark SQL. It provides bidirectional conversion between protobuf binary data and Spark's Catalyst data structures through specialized SQL functions and comprehensive configuration options.

Package Information

  • Package Name: spark-protobuf_2.13
  • Package Type: maven
  • Language: Scala
  • Maven Coordinates: org.apache.spark:spark-protobuf_2.13:4.0.0
  • Installation: Add dependency to your Spark application

Core Imports

import org.apache.spark.sql.protobuf.functions._

For Java compatibility:

import static org.apache.spark.sql.protobuf.functions.*;

Basic Usage

import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.DataFrame

// Convert binary protobuf data to Spark rows
val df: DataFrame = spark.read.format("binaryFile").load("protobuf-data")

// Deserialize protobuf binary to structured data
val deserializedDF = df.select(
  from_protobuf(col("content"), "MessageName", descriptorFile) as "data"
)

// Serialize structured data back to protobuf binary
val serializedDF = deserializedDF.select(
  to_protobuf(col("data"), "MessageName", descriptorFile) as "protobuf_binary"
)

Architecture

The Spark Protobuf Connector is built around several key components:

  • SQL Functions: from_protobuf and to_protobuf functions for DataFrame operations
  • Expression Layer: Internal Catalyst expressions for efficient protobuf processing
  • Schema Conversion: Automatic mapping between protobuf schemas and Spark SQL schemas
  • Configuration System: Comprehensive options for controlling serialization/deserialization behavior
  • Type Registry: Support for dynamic message handling and Any field processing

Capabilities

Protobuf to Catalyst Conversion

Convert binary protobuf data to Spark SQL structured types using from_protobuf functions with support for various descriptor sources and configuration options.

def from_protobuf(
  data: Column, 
  messageName: String, 
  descFilePath: String, 
  options: java.util.Map[String, String]
): Column

def from_protobuf(
  data: Column, 
  messageName: String, 
  binaryFileDescriptorSet: Array[Byte], 
  options: java.util.Map[String, String]
): Column

def from_protobuf(
  data: Column, 
  messageClassName: String, 
  options: java.util.Map[String, String]
): Column

Protobuf to Catalyst Conversion

Catalyst to Protobuf Conversion

Convert Spark SQL structured types to binary protobuf data using to_protobuf functions with comprehensive serialization options.

def to_protobuf(
  data: Column, 
  messageName: String, 
  descFilePath: String, 
  options: java.util.Map[String, String]
): Column

def to_protobuf(
  data: Column, 
  messageName: String, 
  binaryFileDescriptorSet: Array[Byte], 
  options: java.util.Map[String, String]
): Column

def to_protobuf(
  data: Column, 
  messageClassName: String, 
  options: java.util.Map[String, String]
): Column

Catalyst to Protobuf Conversion

Configuration and Options

Comprehensive configuration system for controlling protobuf processing behavior including parse modes, type conversions, and schema handling.

class ProtobufOptions(
  parameters: Map[String, String], 
  conf: Configuration
)

object ProtobufOptions {
  def apply(parameters: Map[String, String]): ProtobufOptions
  val CONVERT_ANY_FIELDS_TO_JSON_CONFIG: String
}

Configuration and Options

Schema Utilities

Utilities for protobuf descriptor management, schema conversion, and type registry operations.

object ProtobufUtils {
  def buildDescriptor(
    messageName: String, 
    binaryFileDescriptorSet: Option[Array[Byte]]
  ): Descriptor
  
  def buildDescriptorFromJavaClass(protobufClassName: String): Descriptor
  
  def buildTypeRegistry(descriptorBytes: Array[Byte]): TypeRegistry
}

object SchemaConverters {
  def toSqlType(
    descriptor: Descriptor, 
    protobufOptions: ProtobufOptions
  ): SchemaType
}

Schema Utilities

Types

case class SchemaType(dataType: DataType, nullable: Boolean)

case class ProtoMatchedField(
  catalystField: StructField,
  catalystPosition: Int, 
  fieldDescriptor: FieldDescriptor
)

class ProtoSchemaHelper(
  descriptor: Descriptor,
  catalystSchema: StructType,
  protoPath: Seq[String],
  catalystPath: Seq[String]
) {
  val matchedFields: Seq[ProtoMatchedField]
  def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit
  def validateNoExtraRequiredProtoFields(): Unit
  def getFieldByName(name: String): Option[FieldDescriptor]
}