or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-types.mdencoders.mderror-handling.mdindex.mdrow-operations.mdstreaming-operations.mdutilities.md
tile.json

encoders.mddocs/

Encoders

Type-safe conversion between JVM objects and Spark SQL representations for distributed serialization. Encoders provide efficient serialization while preserving type information in Spark's catalyst optimizer.

Capabilities

Base Encoder Interface

Core encoder interface for type conversion.

/**
 * Converts JVM objects to/from Spark SQL representation
 * @tparam T The JVM type being encoded
 */
trait Encoder[T] extends Serializable {
  /** Returns the schema for the encoded representation */
  def schema: StructType
  
  /** ClassTag for type T */
  def clsTag: ClassTag[T]
}

Agnostic Encoder Interface

Implementation-agnostic encoder with additional type information.

/**
 * Implementation-agnostic encoder
 * @tparam T The type being encoded
 */
trait AgnosticEncoder[T] extends Encoder[T] {
  /** Whether this encoder represents a primitive type */
  def isPrimitive: Boolean
  
  /** Whether the encoded type can be null */
  def nullable: Boolean
  
  /** The Spark SQL data type for this encoder */
  def dataType: DataType
  
  /** Whether serialization allows lenient type conversion */
  def lenientSerialization: Boolean
  
  /** Whether this encoder represents a struct type */
  def isStruct: Boolean
}

Encoder Implementations

Specific encoder implementations for different data types.

/**
 * Encoder for Option types
 * @tparam E Element type encoder
 */
case class OptionEncoder[E](elementEncoder: AgnosticEncoder[E]) extends AgnosticEncoder[Option[E]]

/**
 * Encoder for Array types  
 * @tparam E Element type encoder
 */
case class ArrayEncoder[E](
  elementEncoder: AgnosticEncoder[E],
  containsNull: Boolean = true
) extends AgnosticEncoder[Array[E]]

/**
 * Encoder for Iterable collections
 * @tparam C Collection type
 * @tparam E Element type
 */
case class IterableEncoder[C <: Iterable[E], E](
  elementEncoder: AgnosticEncoder[E],
  containsNull: Boolean = true
) extends AgnosticEncoder[C]

/**
 * Encoder for Map types
 * @tparam C Map collection type  
 * @tparam K Key type
 * @tparam V Value type
 */
case class MapEncoder[C <: Map[K, V], K, V](
  keyEncoder: AgnosticEncoder[K],
  valueEncoder: AgnosticEncoder[V],  
  valueContainsNull: Boolean = true
) extends AgnosticEncoder[C]

/**
 * Represents a field in a struct encoder
 */
case class EncoderField(
  name: String,
  enc: AgnosticEncoder[_],
  nullable: Boolean,
  metadata: Metadata = Metadata.empty,
  readMethod: Option[String] = None,
  writeMethod: Option[String] = None
) {
  /** Convert to StructField for schema representation */
  def structField: StructField = StructField(name, enc.dataType, nullable, metadata)
}

Row Encoder

Specialized encoder for Row objects.

/**
 * Encoder for Row objects
 */
object RowEncoder {
  /** Create encoder for the given schema */
  def apply(schema: StructType): Encoder[Row]
}

Usage Examples

Working with primitive encoders:

import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.types._

// Primitive type encoders are typically provided by the system
// but you can work with their properties

def analyzeEncoder[T](encoder: AgnosticEncoder[T]): Unit = {
  println(s"Is primitive: ${encoder.isPrimitive}")
  println(s"Is nullable: ${encoder.nullable}")
  println(s"Data type: ${encoder.dataType}")
  println(s"Schema: ${encoder.schema}")
  println(s"Is struct: ${encoder.isStruct}")
  println(s"Lenient serialization: ${encoder.lenientSerialization}")
}

Option encoder usage:

// Working with Option types in encoders
case class UserProfile(
  id: Long,
  name: String,
  email: Option[String],  // Optional field
  age: Option[Int]        // Optional field
)

// The encoder system handles Option types automatically
// Option[String] becomes nullable StringType
// Option[Int] becomes nullable IntegerType

def processOptionalFields[T](optionEncoder: OptionEncoder[T]): Unit = {
  val elementEncoder = optionEncoder.elementEncoder
  println(s"Element type: ${elementEncoder.dataType}")
  println(s"Option is nullable: ${optionEncoder.nullable}") // true
}

Array encoder usage:

// Working with Array types
case class Order(
  id: String,
  items: Array[String],           // Array of strings
  quantities: Array[Int],         // Array of integers  
  tags: Array[Option[String]]     // Array of optional strings
)

def processArrayEncoder[E](arrayEncoder: ArrayEncoder[E]): Unit = {
  val elementEncoder = arrayEncoder.elementEncoder
  println(s"Element type: ${elementEncoder.dataType}")
  println(s"Contains null: ${arrayEncoder.containsNull}")
  
  // Array schema will be ArrayType(elementType, containsNull)
  val arrayType = arrayEncoder.dataType.asInstanceOf[ArrayType]
  println(s"Array element type: ${arrayType.elementType}")
  println(s"Array contains null: ${arrayType.containsNull}")
}

Collection encoder usage:

import scala.collection.mutable

// Working with different collection types
case class Analytics(
  userIds: List[String],              // List collection
  scores: Vector[Double],             // Vector collection  
  tags: Set[String],                  // Set collection
  buffer: mutable.Buffer[Int]         // Mutable collection
)

def processIterableEncoder[C <: Iterable[E], E](iterableEncoder: IterableEncoder[C, E]): Unit = {
  val elementEncoder = iterableEncoder.elementEncoder
  println(s"Collection element type: ${elementEncoder.dataType}")
  println(s"Collection contains null: ${iterableEncoder.containsNull}")
}

Map encoder usage:

// Working with Map types
case class Configuration(
  settings: Map[String, String],           // String to String mapping
  counters: Map[String, Int],              // String to Int mapping
  metadata: Map[String, Option[String]]    // String to optional String
)

def processMapEncoder[C <: Map[K, V], K, V](mapEncoder: MapEncoder[C, K, V]): Unit = {
  val keyEncoder = mapEncoder.keyEncoder
  val valueEncoder = mapEncoder.valueEncoder
  
  println(s"Key type: ${keyEncoder.dataType}")
  println(s"Value type: ${valueEncoder.dataType}")
  println(s"Value contains null: ${mapEncoder.valueContainsNull}")
  
  // Map schema will be MapType(keyType, valueType, valueContainsNull)
  val mapType = mapEncoder.dataType.asInstanceOf[MapType]
  println(s"Map key type: ${mapType.keyType}")
  println(s"Map value type: ${mapType.valueType}")
  println(s"Map value contains null: ${mapType.valueContainsNull}")
}

Struct field encoding:

// Working with struct fields
case class Person(
  id: Long,
  name: String,
  email: Option[String],
  addresses: Array[Address],
  metadata: Map[String, String]
)

case class Address(
  street: String,
  city: String,
  zipCode: String
)

// Encoder fields represent the structure
val personFields = Array(
  EncoderField("id", longEncoder, nullable = false),
  EncoderField("name", stringEncoder, nullable = false), 
  EncoderField("email", optionStringEncoder, nullable = true),
  EncoderField("addresses", addressArrayEncoder, nullable = false),
  EncoderField("metadata", stringMapEncoder, nullable = false)
)

def processEncoderField(field: EncoderField): Unit = {
  println(s"Field name: ${field.name}")
  println(s"Field nullable: ${field.nullable}")
  println(s"Field type: ${field.enc.dataType}")
  
  // Access metadata if present
  if (field.metadata != Metadata.empty) {
    println(s"Field has metadata")
  }
}

Row encoder usage:

import org.apache.spark.sql.{Row, Encoder}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._

// Create schema for Row encoder
val schema = StructType(Array(
  StructField("id", LongType, nullable = false),
  StructField("name", StringType, nullable = false),
  StructField("scores", ArrayType(DoubleType, containsNull = false), nullable = false),
  StructField("metadata", MapType(StringType, StringType, valueContainsNull = true), nullable = true)
))

// Create Row encoder
val rowEncoder: Encoder[Row] = RowEncoder(schema)

// Use encoder properties
println(s"Row encoder schema: ${rowEncoder.schema}")
println(s"Row encoder class tag: ${rowEncoder.clsTag}")

// Create rows that match the schema
val row1 = Row(1L, "Alice", Array(95.5, 87.2, 92.0), Map("department" -> "Engineering"))
val row2 = Row(2L, "Bob", Array(88.0, 91.5), null) // null metadata

// The encoder ensures type safety for these rows

Custom encoder patterns:

// Working with encoders in custom functions
def processEncodedData[T](data: T, encoder: Encoder[T]): Unit = {
  val schema = encoder.schema
  println(s"Processing data with schema: ${schema.treeString}")
  
  // Access schema fields
  schema.fields.foreach { field =>
    println(s"Field: ${field.name}, Type: ${field.dataType}, Nullable: ${field.nullable}")
  }
}

// Type-safe encoder operations
def validateEncoder[T](encoder: AgnosticEncoder[T]): Boolean = {
  val schema = encoder.schema
  val dataType = encoder.dataType
  
  // Ensure schema matches data type
  schema.fields.length match {
    case 0 if encoder.isPrimitive => true
    case n if n > 0 && encoder.isStruct => true
    case _ => false
  }
}

Encoder composition patterns:

// Building complex encoders from simpler ones
case class NestedData(
  simple: String,
  optional: Option[Int],
  collection: List[Double],
  mapping: Map[String, Boolean],
  nested: InnerData
)

case class InnerData(value: String, count: Int)

// Encoders compose naturally:
// - NestedData encoder contains:
//   - String encoder for simple
//   - OptionEncoder[Int] for optional  
//   - IterableEncoder[List[Double], Double] for collection
//   - MapEncoder[Map[String, Boolean], String, Boolean] for mapping
//   - Struct encoder for nested InnerData

def analyzeNestedEncoder(encoder: AgnosticEncoder[NestedData]): Unit = {
  println(s"Nested data schema:")
  println(encoder.schema.treeString)
  
  // The schema will show the full nested structure
  // with appropriate nullability and types
}