Type-safe conversion between JVM objects and Spark SQL representations for distributed serialization. Encoders provide efficient serialization while preserving type information in Spark's catalyst optimizer.
Core encoder interface for type conversion.
/**
* Converts JVM objects to/from Spark SQL representation
* @tparam T The JVM type being encoded
*/
trait Encoder[T] extends Serializable {
/** Returns the schema for the encoded representation */
def schema: StructType
/** ClassTag for type T */
def clsTag: ClassTag[T]
}Implementation-agnostic encoder with additional type information.
/**
* Implementation-agnostic encoder
* @tparam T The type being encoded
*/
trait AgnosticEncoder[T] extends Encoder[T] {
/** Whether this encoder represents a primitive type */
def isPrimitive: Boolean
/** Whether the encoded type can be null */
def nullable: Boolean
/** The Spark SQL data type for this encoder */
def dataType: DataType
/** Whether serialization allows lenient type conversion */
def lenientSerialization: Boolean
/** Whether this encoder represents a struct type */
def isStruct: Boolean
}Specific encoder implementations for different data types.
/**
* Encoder for Option types
* @tparam E Element type encoder
*/
case class OptionEncoder[E](elementEncoder: AgnosticEncoder[E]) extends AgnosticEncoder[Option[E]]
/**
* Encoder for Array types
* @tparam E Element type encoder
*/
case class ArrayEncoder[E](
elementEncoder: AgnosticEncoder[E],
containsNull: Boolean = true
) extends AgnosticEncoder[Array[E]]
/**
* Encoder for Iterable collections
* @tparam C Collection type
* @tparam E Element type
*/
case class IterableEncoder[C <: Iterable[E], E](
elementEncoder: AgnosticEncoder[E],
containsNull: Boolean = true
) extends AgnosticEncoder[C]
/**
* Encoder for Map types
* @tparam C Map collection type
* @tparam K Key type
* @tparam V Value type
*/
case class MapEncoder[C <: Map[K, V], K, V](
keyEncoder: AgnosticEncoder[K],
valueEncoder: AgnosticEncoder[V],
valueContainsNull: Boolean = true
) extends AgnosticEncoder[C]
/**
* Represents a field in a struct encoder
*/
case class EncoderField(
name: String,
enc: AgnosticEncoder[_],
nullable: Boolean,
metadata: Metadata = Metadata.empty,
readMethod: Option[String] = None,
writeMethod: Option[String] = None
) {
/** Convert to StructField for schema representation */
def structField: StructField = StructField(name, enc.dataType, nullable, metadata)
}Specialized encoder for Row objects.
/**
* Encoder for Row objects
*/
object RowEncoder {
/** Create encoder for the given schema */
def apply(schema: StructType): Encoder[Row]
}Working with primitive encoders:
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.types._
// Primitive type encoders are typically provided by the system
// but you can work with their properties
def analyzeEncoder[T](encoder: AgnosticEncoder[T]): Unit = {
println(s"Is primitive: ${encoder.isPrimitive}")
println(s"Is nullable: ${encoder.nullable}")
println(s"Data type: ${encoder.dataType}")
println(s"Schema: ${encoder.schema}")
println(s"Is struct: ${encoder.isStruct}")
println(s"Lenient serialization: ${encoder.lenientSerialization}")
}Option encoder usage:
// Working with Option types in encoders
case class UserProfile(
id: Long,
name: String,
email: Option[String], // Optional field
age: Option[Int] // Optional field
)
// The encoder system handles Option types automatically
// Option[String] becomes nullable StringType
// Option[Int] becomes nullable IntegerType
def processOptionalFields[T](optionEncoder: OptionEncoder[T]): Unit = {
val elementEncoder = optionEncoder.elementEncoder
println(s"Element type: ${elementEncoder.dataType}")
println(s"Option is nullable: ${optionEncoder.nullable}") // true
}Array encoder usage:
// Working with Array types
case class Order(
id: String,
items: Array[String], // Array of strings
quantities: Array[Int], // Array of integers
tags: Array[Option[String]] // Array of optional strings
)
def processArrayEncoder[E](arrayEncoder: ArrayEncoder[E]): Unit = {
val elementEncoder = arrayEncoder.elementEncoder
println(s"Element type: ${elementEncoder.dataType}")
println(s"Contains null: ${arrayEncoder.containsNull}")
// Array schema will be ArrayType(elementType, containsNull)
val arrayType = arrayEncoder.dataType.asInstanceOf[ArrayType]
println(s"Array element type: ${arrayType.elementType}")
println(s"Array contains null: ${arrayType.containsNull}")
}Collection encoder usage:
import scala.collection.mutable
// Working with different collection types
case class Analytics(
userIds: List[String], // List collection
scores: Vector[Double], // Vector collection
tags: Set[String], // Set collection
buffer: mutable.Buffer[Int] // Mutable collection
)
def processIterableEncoder[C <: Iterable[E], E](iterableEncoder: IterableEncoder[C, E]): Unit = {
val elementEncoder = iterableEncoder.elementEncoder
println(s"Collection element type: ${elementEncoder.dataType}")
println(s"Collection contains null: ${iterableEncoder.containsNull}")
}Map encoder usage:
// Working with Map types
case class Configuration(
settings: Map[String, String], // String to String mapping
counters: Map[String, Int], // String to Int mapping
metadata: Map[String, Option[String]] // String to optional String
)
def processMapEncoder[C <: Map[K, V], K, V](mapEncoder: MapEncoder[C, K, V]): Unit = {
val keyEncoder = mapEncoder.keyEncoder
val valueEncoder = mapEncoder.valueEncoder
println(s"Key type: ${keyEncoder.dataType}")
println(s"Value type: ${valueEncoder.dataType}")
println(s"Value contains null: ${mapEncoder.valueContainsNull}")
// Map schema will be MapType(keyType, valueType, valueContainsNull)
val mapType = mapEncoder.dataType.asInstanceOf[MapType]
println(s"Map key type: ${mapType.keyType}")
println(s"Map value type: ${mapType.valueType}")
println(s"Map value contains null: ${mapType.valueContainsNull}")
}Struct field encoding:
// Working with struct fields
case class Person(
id: Long,
name: String,
email: Option[String],
addresses: Array[Address],
metadata: Map[String, String]
)
case class Address(
street: String,
city: String,
zipCode: String
)
// Encoder fields represent the structure
val personFields = Array(
EncoderField("id", longEncoder, nullable = false),
EncoderField("name", stringEncoder, nullable = false),
EncoderField("email", optionStringEncoder, nullable = true),
EncoderField("addresses", addressArrayEncoder, nullable = false),
EncoderField("metadata", stringMapEncoder, nullable = false)
)
def processEncoderField(field: EncoderField): Unit = {
println(s"Field name: ${field.name}")
println(s"Field nullable: ${field.nullable}")
println(s"Field type: ${field.enc.dataType}")
// Access metadata if present
if (field.metadata != Metadata.empty) {
println(s"Field has metadata")
}
}Row encoder usage:
import org.apache.spark.sql.{Row, Encoder}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
// Create schema for Row encoder
val schema = StructType(Array(
StructField("id", LongType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("scores", ArrayType(DoubleType, containsNull = false), nullable = false),
StructField("metadata", MapType(StringType, StringType, valueContainsNull = true), nullable = true)
))
// Create Row encoder
val rowEncoder: Encoder[Row] = RowEncoder(schema)
// Use encoder properties
println(s"Row encoder schema: ${rowEncoder.schema}")
println(s"Row encoder class tag: ${rowEncoder.clsTag}")
// Create rows that match the schema
val row1 = Row(1L, "Alice", Array(95.5, 87.2, 92.0), Map("department" -> "Engineering"))
val row2 = Row(2L, "Bob", Array(88.0, 91.5), null) // null metadata
// The encoder ensures type safety for these rowsCustom encoder patterns:
// Working with encoders in custom functions
def processEncodedData[T](data: T, encoder: Encoder[T]): Unit = {
val schema = encoder.schema
println(s"Processing data with schema: ${schema.treeString}")
// Access schema fields
schema.fields.foreach { field =>
println(s"Field: ${field.name}, Type: ${field.dataType}, Nullable: ${field.nullable}")
}
}
// Type-safe encoder operations
def validateEncoder[T](encoder: AgnosticEncoder[T]): Boolean = {
val schema = encoder.schema
val dataType = encoder.dataType
// Ensure schema matches data type
schema.fields.length match {
case 0 if encoder.isPrimitive => true
case n if n > 0 && encoder.isStruct => true
case _ => false
}
}Encoder composition patterns:
// Building complex encoders from simpler ones
case class NestedData(
simple: String,
optional: Option[Int],
collection: List[Double],
mapping: Map[String, Boolean],
nested: InnerData
)
case class InnerData(value: String, count: Int)
// Encoders compose naturally:
// - NestedData encoder contains:
// - String encoder for simple
// - OptionEncoder[Int] for optional
// - IterableEncoder[List[Double], Double] for collection
// - MapEncoder[Map[String, Boolean], String, Boolean] for mapping
// - Struct encoder for nested InnerData
def analyzeNestedEncoder(encoder: AgnosticEncoder[NestedData]): Unit = {
println(s"Nested data schema:")
println(encoder.schema.treeString)
// The schema will show the full nested structure
// with appropriate nullability and types
}