Data type definitions and encoders for converting between JVM objects and Spark SQL internal representations. Provides rich type system with support for primitive types, complex nested structures, and custom object serialization.
Core data type system for representing structured data.
/**
* Base class for all data types in Spark SQL
*/
abstract class DataType {
/** JSON representation of the type */
def json: String
/** Pretty printed type name */
def prettyJson: String
/** SQL representation of the type */
def sql: String
/** Simplified type name */
def simpleString: String
/** Catalog representation */
def catalogString: String
}
/**
* Factory methods and constants for data types
*/
object DataTypes {
// Primitive types
val NullType: DataType
val BooleanType: DataType
val ByteType: DataType
val ShortType: DataType
val IntegerType: DataType
val LongType: DataType
val FloatType: DataType
val DoubleType: DataType
val StringType: DataType
val BinaryType: DataType
val DateType: DataType
val TimestampType: DataType
val CalendarIntervalType: DataType
/** Create decimal type with precision and scale */
def createDecimalType(): DecimalType
def createDecimalType(precision: Int, scale: Int): DecimalType
/** Create array type */
def createArrayType(elementType: DataType): ArrayType
def createArrayType(elementType: DataType, containsNull: Boolean): ArrayType
/** Create map type */
def createMapType(keyType: DataType, valueType: DataType): MapType
def createMapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean): MapType
/** Create struct type */
def createStructType(fields: Array[StructField]): StructType
def createStructType(fields: java.util.List[StructField]): StructType
/** Create struct field */
def createStructField(name: String, dataType: DataType, nullable: Boolean): StructField
def createStructField(name: String, dataType: DataType, nullable: Boolean,
metadata: Metadata): StructField
}Basic data types for simple values.
// Numeric types
case object ByteType extends IntegralType // 8-bit signed integer
case object ShortType extends IntegralType // 16-bit signed integer
case object IntegerType extends IntegralType // 32-bit signed integer
case object LongType extends IntegralType // 64-bit signed integer
case object FloatType extends FractionalType // 32-bit floating point
case object DoubleType extends FractionalType // 64-bit floating point
// Decimal type with precision and scale
case class DecimalType(precision: Int, scale: Int) extends FractionalType
// Other primitive types
case object BooleanType extends DataType // Boolean true/false
case object StringType extends AtomicType // UTF-8 encoded strings
case object BinaryType extends AtomicType // Byte arrays
case object DateType extends AtomicType // Date values (no time)
case object TimestampType extends AtomicType // Timestamp with timezone
case object NullType extends DataType // Null values only
case object CalendarIntervalType extends DataType // Calendar intervalsNested and collection data types.
/**
* Array type for collections of elements
*/
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {
def this(elementType: DataType) = this(elementType, containsNull = true)
}
/**
* Map type for key-value pairs
*/
case class MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) extends DataType {
def this(keyType: DataType, valueType: DataType) = this(keyType, valueType, valueContainsNull = true)
}
/**
* Struct type for records with named fields
*/
case class StructType(fields: Array[StructField]) extends DataType {
/** Get field by name */
def apply(name: String): StructField
/** Get field by index */
def apply(index: Int): StructField
/** Get field index by name */
def fieldIndex(name: String): Int
/** Get all field names */
def fieldNames: Array[String]
/** Add field */
def add(field: StructField): StructType
def add(name: String, dataType: DataType): StructType
def add(name: String, dataType: DataType, nullable: Boolean): StructType
def add(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata): StructType
/** Check if field exists */
def exists(f: StructField => Boolean): Boolean
}
/**
* Field in a struct type
*/
case class StructField(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata) {
def this(name: String, dataType: DataType, nullable: Boolean) =
this(name, dataType, nullable, Metadata.empty)
def this(name: String, dataType: DataType) =
this(name, dataType, nullable = true, Metadata.empty)
}Usage Examples:
import org.apache.spark.sql.types._
// Simple struct
val personSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = true),
StructField("salary", DoubleType, nullable = true)
))
// Complex nested structure
val orderSchema = StructType(Seq(
StructField("orderId", StringType, nullable = false),
StructField("customerId", LongType, nullable = false),
StructField("items", ArrayType(StructType(Seq(
StructField("productId", StringType, nullable = false),
StructField("quantity", IntegerType, nullable = false),
StructField("price", DecimalType(10, 2), nullable = false)
))), nullable = false),
StructField("metadata", MapType(StringType, StringType), nullable = true)
))
// Using the schema
val df = spark.read
.schema(orderSchema)
.json("orders.json")Interface for accessing row data with type safety.
/**
* Represents one row of output from a relational operator
*/
trait Row extends Serializable {
/** Number of elements in the row */
def size: Int
def length: Int
/** Get value by index (returns Any) */
def apply(i: Int): Any
def get(i: Int): Any
/** Check if value at index is null */
def isNullAt(i: Int): Boolean
/** Typed accessors by index */
def getBoolean(i: Int): Boolean
def getByte(i: Int): Byte
def getShort(i: Int): Short
def getInt(i: Int): Int
def getLong(i: Int): Long
def getFloat(i: Int): Float
def getDouble(i: Int): Double
def getString(i: Int): String
def getDecimal(i: Int): java.math.BigDecimal
def getDate(i: Int): java.sql.Date
def getTimestamp(i: Int): java.sql.Timestamp
/** Complex type accessors */
def getSeq[T](i: Int): Seq[T]
def getList[T](i: Int): java.util.List[T]
def getMap[K, V](i: Int): scala.collection.Map[K, V]
def getJavaMap[K, V](i: Int): java.util.Map[K, V]
def getStruct(i: Int): Row
/** Get value by field name (if available) */
def getAs[T](fieldName: String): T
def getAs[T](i: Int): T
/** Convert to Seq */
def toSeq: Seq[Any]
/** Copy row */
def copy(): Row
/** Schema information (if available) */
def schema: StructType
/** Pretty print */
def prettyJson: String
}
/**
* Factory methods for creating Row instances
*/
object Row {
/** Create row from values */
def apply(values: Any*): Row
/** Create row from sequence */
def fromSeq(values: Seq[Any]): Row
/** Create row from tuple */
def fromTuple(tuple: Product): Row
/** Create empty row */
def empty: Row
/** Merge rows */
def merge(rows: Row*): Row
}Usage Examples:
import org.apache.spark.sql.Row
// Create rows
val row1 = Row("Alice", 25, 50000.0)
val row2 = Row.fromSeq(Seq("Bob", 30, 60000.0))
// Access values
val name = row1.getString(0)
val age = row1.getInt(1)
val salary = row1.getDouble(2)
// Type-safe access
val nameTyped = row1.getAs[String]("name")
val ageTyped = row1.getAs[Int]("age")
// Null checking
if (!row1.isNullAt(1)) {
val age = row1.getInt(1)
}
// Complex types
val complexRow = Row(
"user1",
Seq("tag1", "tag2", "tag3"),
Map("key1" -> "value1", "key2" -> "value2")
)
val tags = complexRow.getSeq[String](1)
val metadata = complexRow.getMap[String, String](2)Conversion between JVM objects and Spark SQL internal representations.
/**
* Used to convert a JVM object of type T to and from the internal
* Spark SQL representation
*/
trait Encoder[T] extends Serializable {
/** Schema of the encoded data */
def schema: StructType
/** Class tag for type T */
def clsTag: ClassTag[T]
/** Convert to different type */
def asInstanceOf[U]: Encoder[U]
}
/**
* Factory methods for creating encoders
*/
object Encoders {
// Primitive encoders
def BOOLEAN: Encoder[Boolean]
def BYTE: Encoder[Byte]
def SHORT: Encoder[Short]
def INT: Encoder[Int]
def LONG: Encoder[Long]
def FLOAT: Encoder[Float]
def DOUBLE: Encoder[Double]
def STRING: Encoder[String]
def DECIMAL: Encoder[java.math.BigDecimal]
def DATE: Encoder[java.sql.Date]
def TIMESTAMP: Encoder[java.sql.Timestamp]
def BINARY: Encoder[Array[Byte]]
// Scala primitive encoders
def scalaBoolean: Encoder[Boolean]
def scalaByte: Encoder[Byte]
def scalaShort: Encoder[Short]
def scalaInt: Encoder[Int]
def scalaLong: Encoder[Long]
def scalaFloat: Encoder[Float]
def scalaDouble: Encoder[Double]
// Complex type encoders
def bean[T](beanClass: Class[T]): Encoder[T]
def javaSerialization[T](clazz: Class[T]): Encoder[T]
def kryo[T](clazz: Class[T]): Encoder[T]
// Product encoders (case classes, tuples)
def product[T <: Product : TypeTag]: Encoder[T]
def tuple[T1, T2](e1: Encoder[T1], e2: Encoder[T2]): Encoder[(T1, T2)]
def tuple[T1, T2, T3](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3]): Encoder[(T1, T2, T3)]
// ... up to Tuple22
}Usage Examples:
import org.apache.spark.sql.Encoders
// Case class encoder (automatic)
case class Person(name: String, age: Int, salary: Double)
implicit val personEncoder = Encoders.product[Person]
val peopleDs = spark.createDataset(Seq(
Person("Alice", 25, 50000.0),
Person("Bob", 30, 60000.0)
))
// Java bean encoder
class Employee {
@BeanProperty var name: String = _
@BeanProperty var age: Int = _
@BeanProperty var department: String = _
}
val employeeEncoder = Encoders.bean(classOf[Employee])
val employeeDs = spark.emptyDataset(employeeEncoder)
// Tuple encoders
val tupleDs = spark.createDataset(Seq(
("Alice", 25),
("Bob", 30)
))(Encoders.tuple(Encoders.STRING, Encoders.INT))
// Primitive encoders
val stringDs = spark.createDataset(Seq("a", "b", "c"))(Encoders.STRING)
val intDs = spark.createDataset(Seq(1, 2, 3))(Encoders.INT)Utilities for working with types and conversions.
/**
* Utilities for data type operations
*/
object DataTypeUtils {
/** Check if two types are compatible */
def sameType(from: DataType, to: DataType): Boolean
/** Get common type for multiple types */
def findCommonType(types: Seq[DataType]): Option[DataType]
/** Convert string to DataType */
def fromString(raw: String): DataType
/** Parse DDL string to StructType */
def fromDDL(ddl: String): StructType
}
/**
* Metadata for fields and types
*/
class Metadata private (private val map: Map[String, Any]) extends Serializable {
/** Check if key exists */
def contains(key: String): Boolean
/** Get value by key */
def getLong(key: String): Long
def getDouble(key: String): Double
def getBoolean(key: String): Boolean
def getString(key: String): String
def getMetadata(key: String): Metadata
def getLongArray(key: String): Array[Long]
def getDoubleArray(key: String): Array[Double]
def getBooleanArray(key: String): Array[Boolean]
def getStringArray(key: String): Array[String]
def getMetadataArray(key: String): Array[Metadata]
/** JSON representation */
def json: String
}
object Metadata {
/** Empty metadata */
def empty: Metadata
/** Create from JSON */
def fromJson(json: String): Metadata
/** Builder for metadata */
class MetadataBuilder {
def putLong(key: String, value: Long): MetadataBuilder
def putDouble(key: String, value: Double): MetadataBuilder
def putBoolean(key: String, value: Boolean): MetadataBuilder
def putString(key: String, value: String): MetadataBuilder
def putMetadata(key: String, value: Metadata): MetadataBuilder
def putLongArray(key: String, value: Array[Long]): MetadataBuilder
def putDoubleArray(key: String, value: Array[Double]): MetadataBuilder
def putBooleanArray(key: String, value: Array[Boolean]): MetadataBuilder
def putStringArray(key: String, value: Array[String]): MetadataBuilder
def putMetadataArray(key: String, value: Array[Metadata]): MetadataBuilder
def build(): Metadata
}
}Schema evolution:
// Merge schemas from multiple sources
val schema1 = StructType(Seq(
StructField("id", LongType),
StructField("name", StringType)
))
val schema2 = StructType(Seq(
StructField("id", LongType),
StructField("name", StringType),
StructField("email", StringType)
))
// Add nullable field for backward compatibility
val evolvedSchema = schema1.add("email", StringType, nullable = true)Custom encoders:
// Custom encoder for complex types
case class Address(street: String, city: String, zipCode: String)
case class PersonWithAddress(name: String, age: Int, address: Address)
// Automatic derivation works for nested case classes
implicit val addressEncoder = Encoders.product[Address]
implicit val personEncoder = Encoders.product[PersonWithAddress]
val complexDs = spark.createDataset(Seq(
PersonWithAddress("Alice", 25, Address("123 Main St", "Seattle", "98101"))
))Type validation:
def validateSchema(df: DataFrame, expectedSchema: StructType): Boolean = {
val actualSchema = df.schema
actualSchema.fields.length == expectedSchema.fields.length &&
actualSchema.fields.zip(expectedSchema.fields).forall { case (actual, expected) =>
actual.name == expected.name &&
actual.dataType == expected.dataType &&
(!expected.nullable || actual.nullable)
}
}