or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mdcolumns-functions.mddata-io.mddataset-dataframe.mdindex.mdsession-management.mdstreaming.mdtypes-encoders.mdudfs.md
tile.json

types-encoders.mddocs/

Type System and Encoders

Data type definitions and encoders for converting between JVM objects and Spark SQL internal representations. Provides rich type system with support for primitive types, complex nested structures, and custom object serialization.

Capabilities

Data Types

Core data type system for representing structured data.

/**
 * Base class for all data types in Spark SQL
 */
abstract class DataType {
  /** JSON representation of the type */
  def json: String
  
  /** Pretty printed type name */
  def prettyJson: String
  
  /** SQL representation of the type */
  def sql: String
  
  /** Simplified type name */
  def simpleString: String
  
  /** Catalog representation */
  def catalogString: String
}

/**
 * Factory methods and constants for data types
 */
object DataTypes {
  // Primitive types
  val NullType: DataType
  val BooleanType: DataType
  val ByteType: DataType
  val ShortType: DataType
  val IntegerType: DataType
  val LongType: DataType
  val FloatType: DataType
  val DoubleType: DataType
  val StringType: DataType
  val BinaryType: DataType
  val DateType: DataType
  val TimestampType: DataType
  val CalendarIntervalType: DataType
  
  /** Create decimal type with precision and scale */
  def createDecimalType(): DecimalType
  def createDecimalType(precision: Int, scale: Int): DecimalType
  
  /** Create array type */
  def createArrayType(elementType: DataType): ArrayType
  def createArrayType(elementType: DataType, containsNull: Boolean): ArrayType
  
  /** Create map type */
  def createMapType(keyType: DataType, valueType: DataType): MapType
  def createMapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean): MapType
  
  /** Create struct type */
  def createStructType(fields: Array[StructField]): StructType
  def createStructType(fields: java.util.List[StructField]): StructType
  
  /** Create struct field */
  def createStructField(name: String, dataType: DataType, nullable: Boolean): StructField
  def createStructField(name: String, dataType: DataType, nullable: Boolean, 
                       metadata: Metadata): StructField
}

Primitive Data Types

Basic data types for simple values.

// Numeric types
case object ByteType extends IntegralType      // 8-bit signed integer
case object ShortType extends IntegralType     // 16-bit signed integer  
case object IntegerType extends IntegralType   // 32-bit signed integer
case object LongType extends IntegralType      // 64-bit signed integer
case object FloatType extends FractionalType   // 32-bit floating point
case object DoubleType extends FractionalType  // 64-bit floating point

// Decimal type with precision and scale
case class DecimalType(precision: Int, scale: Int) extends FractionalType

// Other primitive types
case object BooleanType extends DataType       // Boolean true/false
case object StringType extends AtomicType      // UTF-8 encoded strings
case object BinaryType extends AtomicType      // Byte arrays
case object DateType extends AtomicType        // Date values (no time)
case object TimestampType extends AtomicType   // Timestamp with timezone
case object NullType extends DataType          // Null values only
case object CalendarIntervalType extends DataType // Calendar intervals

Complex Data Types

Nested and collection data types.

/**
 * Array type for collections of elements
 */
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {
  def this(elementType: DataType) = this(elementType, containsNull = true)
}

/**
 * Map type for key-value pairs
 */
case class MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) extends DataType {
  def this(keyType: DataType, valueType: DataType) = this(keyType, valueType, valueContainsNull = true)
}

/**
 * Struct type for records with named fields
 */
case class StructType(fields: Array[StructField]) extends DataType {
  /** Get field by name */
  def apply(name: String): StructField
  
  /** Get field by index */
  def apply(index: Int): StructField
  
  /** Get field index by name */  
  def fieldIndex(name: String): Int
  
  /** Get all field names */
  def fieldNames: Array[String]
  
  /** Add field */
  def add(field: StructField): StructType
  def add(name: String, dataType: DataType): StructType
  def add(name: String, dataType: DataType, nullable: Boolean): StructType
  def add(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata): StructType
  
  /** Check if field exists */
  def exists(f: StructField => Boolean): Boolean
}

/**
 * Field in a struct type
 */
case class StructField(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata) {
  def this(name: String, dataType: DataType, nullable: Boolean) = 
    this(name, dataType, nullable, Metadata.empty)
  def this(name: String, dataType: DataType) = 
    this(name, dataType, nullable = true, Metadata.empty)
}

Usage Examples:

import org.apache.spark.sql.types._

// Simple struct
val personSchema = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = true),
  StructField("salary", DoubleType, nullable = true)
))

// Complex nested structure
val orderSchema = StructType(Seq(
  StructField("orderId", StringType, nullable = false),
  StructField("customerId", LongType, nullable = false),
  StructField("items", ArrayType(StructType(Seq(
    StructField("productId", StringType, nullable = false),
    StructField("quantity", IntegerType, nullable = false),
    StructField("price", DecimalType(10, 2), nullable = false)
  ))), nullable = false),
  StructField("metadata", MapType(StringType, StringType), nullable = true)
))

// Using the schema
val df = spark.read
  .schema(orderSchema)
  .json("orders.json")

Row Interface

Interface for accessing row data with type safety.

/**
 * Represents one row of output from a relational operator
 */
trait Row extends Serializable {
  /** Number of elements in the row */
  def size: Int
  def length: Int
  
  /** Get value by index (returns Any) */
  def apply(i: Int): Any
  def get(i: Int): Any
  
  /** Check if value at index is null */
  def isNullAt(i: Int): Boolean
  
  /** Typed accessors by index */
  def getBoolean(i: Int): Boolean
  def getByte(i: Int): Byte
  def getShort(i: Int): Short
  def getInt(i: Int): Int
  def getLong(i: Int): Long
  def getFloat(i: Int): Float
  def getDouble(i: Int): Double
  def getString(i: Int): String
  def getDecimal(i: Int): java.math.BigDecimal
  def getDate(i: Int): java.sql.Date
  def getTimestamp(i: Int): java.sql.Timestamp
  
  /** Complex type accessors */
  def getSeq[T](i: Int): Seq[T]
  def getList[T](i: Int): java.util.List[T]
  def getMap[K, V](i: Int): scala.collection.Map[K, V]
  def getJavaMap[K, V](i: Int): java.util.Map[K, V]
  def getStruct(i: Int): Row
  
  /** Get value by field name (if available) */
  def getAs[T](fieldName: String): T
  def getAs[T](i: Int): T
  
  /** Convert to Seq */
  def toSeq: Seq[Any]
  
  /** Copy row */
  def copy(): Row
  
  /** Schema information (if available) */
  def schema: StructType
  
  /** Pretty print */
  def prettyJson: String
}

/**
 * Factory methods for creating Row instances
 */
object Row {
  /** Create row from values */
  def apply(values: Any*): Row
  
  /** Create row from sequence */
  def fromSeq(values: Seq[Any]): Row
  
  /** Create row from tuple */
  def fromTuple(tuple: Product): Row
  
  /** Create empty row */
  def empty: Row
  
  /** Merge rows */
  def merge(rows: Row*): Row
}

Usage Examples:

import org.apache.spark.sql.Row

// Create rows
val row1 = Row("Alice", 25, 50000.0)
val row2 = Row.fromSeq(Seq("Bob", 30, 60000.0))

// Access values
val name = row1.getString(0)
val age = row1.getInt(1)
val salary = row1.getDouble(2)

// Type-safe access
val nameTyped = row1.getAs[String]("name")
val ageTyped = row1.getAs[Int]("age")

// Null checking
if (!row1.isNullAt(1)) {
  val age = row1.getInt(1)
}

// Complex types
val complexRow = Row(
  "user1",
  Seq("tag1", "tag2", "tag3"),
  Map("key1" -> "value1", "key2" -> "value2")
)

val tags = complexRow.getSeq[String](1)
val metadata = complexRow.getMap[String, String](2)

Encoders

Conversion between JVM objects and Spark SQL internal representations.

/**
 * Used to convert a JVM object of type T to and from the internal
 * Spark SQL representation
 */
trait Encoder[T] extends Serializable {
  /** Schema of the encoded data */
  def schema: StructType
  
  /** Class tag for type T */  
  def clsTag: ClassTag[T]
  
  /** Convert to different type */
  def asInstanceOf[U]: Encoder[U]
}

/**
 * Factory methods for creating encoders
 */
object Encoders {
  // Primitive encoders
  def BOOLEAN: Encoder[Boolean]
  def BYTE: Encoder[Byte]
  def SHORT: Encoder[Short]
  def INT: Encoder[Int]
  def LONG: Encoder[Long]
  def FLOAT: Encoder[Float]
  def DOUBLE: Encoder[Double]
  def STRING: Encoder[String]
  def DECIMAL: Encoder[java.math.BigDecimal]
  def DATE: Encoder[java.sql.Date]
  def TIMESTAMP: Encoder[java.sql.Timestamp]
  def BINARY: Encoder[Array[Byte]]
  
  // Scala primitive encoders  
  def scalaBoolean: Encoder[Boolean]
  def scalaByte: Encoder[Byte]
  def scalaShort: Encoder[Short]
  def scalaInt: Encoder[Int]
  def scalaLong: Encoder[Long]
  def scalaFloat: Encoder[Float]
  def scalaDouble: Encoder[Double]
  
  // Complex type encoders
  def bean[T](beanClass: Class[T]): Encoder[T]
  def javaSerialization[T](clazz: Class[T]): Encoder[T]
  def kryo[T](clazz: Class[T]): Encoder[T]
  
  // Product encoders (case classes, tuples)
  def product[T <: Product : TypeTag]: Encoder[T]
  def tuple[T1, T2](e1: Encoder[T1], e2: Encoder[T2]): Encoder[(T1, T2)]
  def tuple[T1, T2, T3](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3]): Encoder[(T1, T2, T3)]
  // ... up to Tuple22
}

Usage Examples:

import org.apache.spark.sql.Encoders

// Case class encoder (automatic)
case class Person(name: String, age: Int, salary: Double)
implicit val personEncoder = Encoders.product[Person]

val peopleDs = spark.createDataset(Seq(
  Person("Alice", 25, 50000.0),
  Person("Bob", 30, 60000.0)
))

// Java bean encoder
class Employee {
  @BeanProperty var name: String = _
  @BeanProperty var age: Int = _
  @BeanProperty var department: String = _
}

val employeeEncoder = Encoders.bean(classOf[Employee])
val employeeDs = spark.emptyDataset(employeeEncoder)

// Tuple encoders
val tupleDs = spark.createDataset(Seq(
  ("Alice", 25),
  ("Bob", 30)
))(Encoders.tuple(Encoders.STRING, Encoders.INT))

// Primitive encoders
val stringDs = spark.createDataset(Seq("a", "b", "c"))(Encoders.STRING)
val intDs = spark.createDataset(Seq(1, 2, 3))(Encoders.INT)

Type Conversions and Utilities

Utilities for working with types and conversions.

/**
 * Utilities for data type operations
 */
object DataTypeUtils {
  /** Check if two types are compatible */
  def sameType(from: DataType, to: DataType): Boolean
  
  /** Get common type for multiple types */
  def findCommonType(types: Seq[DataType]): Option[DataType]
  
  /** Convert string to DataType */
  def fromString(raw: String): DataType
  
  /** Parse DDL string to StructType */
  def fromDDL(ddl: String): StructType
}

/**
 * Metadata for fields and types
 */
class Metadata private (private val map: Map[String, Any]) extends Serializable {
  /** Check if key exists */
  def contains(key: String): Boolean
  
  /** Get value by key */
  def getLong(key: String): Long
  def getDouble(key: String): Double
  def getBoolean(key: String): Boolean  
  def getString(key: String): String
  def getMetadata(key: String): Metadata
  def getLongArray(key: String): Array[Long]
  def getDoubleArray(key: String): Array[Double]
  def getBooleanArray(key: String): Array[Boolean]
  def getStringArray(key: String): Array[String]
  def getMetadataArray(key: String): Array[Metadata]
  
  /** JSON representation */
  def json: String
}

object Metadata {
  /** Empty metadata */
  def empty: Metadata
  
  /** Create from JSON */
  def fromJson(json: String): Metadata
  
  /** Builder for metadata */
  class MetadataBuilder {
    def putLong(key: String, value: Long): MetadataBuilder
    def putDouble(key: String, value: Double): MetadataBuilder
    def putBoolean(key: String, value: Boolean): MetadataBuilder
    def putString(key: String, value: String): MetadataBuilder
    def putMetadata(key: String, value: Metadata): MetadataBuilder
    def putLongArray(key: String, value: Array[Long]): MetadataBuilder
    def putDoubleArray(key: String, value: Array[Double]): MetadataBuilder
    def putBooleanArray(key: String, value: Array[Boolean]): MetadataBuilder
    def putStringArray(key: String, value: Array[String]): MetadataBuilder
    def putMetadataArray(key: String, value: Array[Metadata]): MetadataBuilder
    def build(): Metadata
  }
}

Advanced Type Patterns

Schema evolution:

// Merge schemas from multiple sources
val schema1 = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType)
))

val schema2 = StructType(Seq(
  StructField("id", LongType),
  StructField("name", StringType),
  StructField("email", StringType)
))

// Add nullable field for backward compatibility
val evolvedSchema = schema1.add("email", StringType, nullable = true)

Custom encoders:

// Custom encoder for complex types
case class Address(street: String, city: String, zipCode: String)
case class PersonWithAddress(name: String, age: Int, address: Address)

// Automatic derivation works for nested case classes
implicit val addressEncoder = Encoders.product[Address]
implicit val personEncoder = Encoders.product[PersonWithAddress]

val complexDs = spark.createDataset(Seq(
  PersonWithAddress("Alice", 25, Address("123 Main St", "Seattle", "98101"))
))

Type validation:

def validateSchema(df: DataFrame, expectedSchema: StructType): Boolean = {
  val actualSchema = df.schema
  actualSchema.fields.length == expectedSchema.fields.length &&
  actualSchema.fields.zip(expectedSchema.fields).forall { case (actual, expected) =>
    actual.name == expected.name && 
    actual.dataType == expected.dataType &&
    (!expected.nullable || actual.nullable)
  }
}