CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-catalyst-2-13

Catalyst is Spark's library for manipulating relational query plans and expressions

Pending
Overview
Eval results
Files

data-types.mddocs/

Data Types

Catalyst's data type system provides comprehensive support for all SQL data types with full type safety, JSON serialization, and metadata support. The type system is the foundation for schema definition, expression evaluation, and query optimization.

Core Imports

import org.apache.spark.sql.types._

Type Hierarchy

DataType Base Class

abstract class DataType extends AbstractDataType {
  def defaultSize: Int
  def typeName: String
  def json: String
  def prettyJson: String
  def simpleString: String
  def catalogString: String
  def sql: String
  private[spark] def sameType(other: DataType): Boolean
  private[spark] def asNullable: DataType
}

The base DataType class provides common functionality for all Spark SQL types including serialization, string representation, and type comparison.

Primitive Types

Numeric Types

object BooleanType extends DataType
object ByteType extends DataType
object ShortType extends DataType
object IntegerType extends DataType
object LongType extends DataType
object FloatType extends DataType
object DoubleType extends DataType

case class DecimalType(precision: Int, scale: Int) extends FractionalType {
  def this() = this(10, 0)
}

object DecimalType {
  val SYSTEM_DEFAULT: DecimalType
  val USER_DEFAULT: DecimalType
  def apply(): DecimalType
  def bounded(precision: Int, scale: Int): DecimalType
  def unbounded: DecimalType
}

Usage Example:

val intCol = StructField("count", IntegerType, nullable = false)
val priceCol = StructField("price", DecimalType(10, 2), nullable = true)
val flagCol = StructField("active", BooleanType, nullable = false)

String Types

class StringType(val collationId: Int) extends AtomicType {
  def this() = this(0)
  def getCollationSql: String
}

object StringType extends StringType(0)

case class CharType(length: Int) extends AtomicType
case class VarcharType(length: Int) extends AtomicType

Usage Example:

val nameCol = StructField("name", StringType, nullable = false)
val codeCol = StructField("code", CharType(10), nullable = false)
val descCol = StructField("description", VarcharType(255), nullable = true)

Binary and Date/Time Types

object BinaryType extends DataType
object DateType extends DataType
object TimestampType extends DataType
object TimestampNTZType extends DataType
object CalendarIntervalType extends DataType

case class DayTimeIntervalType(startField: Byte, endField: Byte) extends DataType
case class YearMonthIntervalType(startField: Byte, endField: Byte) extends DataType

Usage Example:

val createdCol = StructField("created_at", TimestampType, nullable = false)
val birthdateCol = StructField("birthdate", DateType, nullable = true)
val dataCol = StructField("data", BinaryType, nullable = true)

Special Types

object NullType extends DataType
object VariantType extends DataType
case class ObjectType(cls: Class[_]) extends DataType

Complex Types

StructType and StructField

case class StructType(fields: Array[StructField]) extends DataType {
  def this(fields: Seq[StructField]) = this(fields.toArray)
  def this(fields: java.util.List[StructField]) = this(fields.asScala.toArray)
  
  // Field access
  def apply(name: String): StructField
  def apply(names: Seq[String]): StructField
  def fieldNames: Array[String]
  def names: Seq[String]
  def length: Int
  def iterator: Iterator[StructField]
  
  // Field manipulation
  def add(field: StructField): StructType
  def add(name: String, dataType: DataType): StructType
  def add(name: String, dataType: DataType, nullable: Boolean): StructType
  def add(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata): StructType
  def add(name: String, dataType: String): StructType
  def add(name: String, dataType: String, nullable: Boolean): StructType
  def add(name: String, dataType: String, nullable: Boolean, metadata: Metadata): StructType
  
  // Schema operations
  def merge(that: StructType): StructType
  def remove(fieldNames: Set[String]): StructType
  def dropFields(fieldNames: String*): StructType
  def getFieldIndex(name: String): Option[Int]
}

case class StructField(
  name: String, 
  dataType: DataType, 
  nullable: Boolean = true, 
  metadata: Metadata = Metadata.empty
) {
  def getComment(): Option[String]
}

Usage Example:

val schema = StructType(Array(
  StructField("id", LongType, nullable = false),
  StructField("name", StringType, nullable = false),
  StructField("email", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true),
  StructField("created_at", TimestampType, nullable = false)
))

// Add fields
val extendedSchema = schema
  .add("last_login", TimestampType, nullable = true)
  .add("is_active", BooleanType, nullable = false)

// Access fields
val nameField = schema("name")
val fieldNames = schema.fieldNames

ArrayType

case class ArrayType(elementType: DataType, containsNull: Boolean = true) extends DataType {
  def simpleString: String
}

object ArrayType {
  def apply(elementType: DataType): ArrayType = ArrayType(elementType, containsNull = true)
}

Usage Example:

val tagsCol = StructField("tags", ArrayType(StringType), nullable = true)
val scoresCol = StructField("scores", ArrayType(IntegerType, containsNull = false), nullable = false)
val nestedCol = StructField("matrix", ArrayType(ArrayType(DoubleType)), nullable = true)

MapType

case class MapType(
  keyType: DataType, 
  valueType: DataType, 
  valueContainsNull: Boolean = true
) extends DataType

object MapType {
  def apply(keyType: DataType, valueType: DataType): MapType = 
    MapType(keyType, valueType, valueContainsNull = true)
}

Usage Example:

val propsCol = StructField("properties", MapType(StringType, StringType), nullable = true)
val countsCol = StructField("counts", MapType(StringType, LongType, valueContainsNull = false), nullable = false)
val nestedCol = StructField("nested", MapType(StringType, ArrayType(IntegerType)), nullable = true)

Metadata

class Metadata private (private val map: Map[String, Any]) {
  def contains(key: String): Boolean
  def getLong(key: String): Long
  def getDouble(key: String): Double
  def getBoolean(key: String): Boolean
  def getString(key: String): String
  def getMetadata(key: String): Metadata
  def getLongArray(key: String): Array[Long]
  def getDoubleArray(key: String): Array[Double]
  def getBooleanArray(key: String): Array[Boolean]
  def getStringArray(key: String): Array[String]
  def getMetadataArray(key: String): Array[Metadata]
  def json: String
}

object Metadata {
  val empty: Metadata
  def fromJson(json: String): Metadata
}

class MetadataBuilder {
  def putLong(key: String, value: Long): MetadataBuilder
  def putDouble(key: String, value: Double): MetadataBuilder
  def putBoolean(key: String, value: Boolean): MetadataBuilder
  def putString(key: String, value: String): MetadataBuilder
  def putMetadata(key: String, value: Metadata): MetadataBuilder
  def putLongArray(key: String, value: Array[Long]): MetadataBuilder
  def putDoubleArray(key: String, value: Array[Double]): MetadataBuilder
  def putBooleanArray(key: String, value: Array[Boolean]): MetadataBuilder
  def putStringArray(key: String, value: Array[String]): MetadataBuilder
  def putMetadataArray(key: String, value: Array[Metadata]): MetadataBuilder
  def remove(key: String): MetadataBuilder
  def build(): Metadata
}

Usage Example:

val metadata = new MetadataBuilder()
  .putString("comment", "User identifier")
  .putLong("maxLength", 100)
  .putBoolean("required", true)
  .build()

val fieldWithMetadata = StructField("user_id", StringType, nullable = false, metadata)

User-Defined Types

abstract class UserDefinedType[UserType >: Null] extends DataType {
  def sqlType: DataType
  def serialize(obj: UserType): Any
  def deserialize(datum: Any): UserType
  def userClass: Class[UserType]
  def equals(o: Any): Boolean
  def hashCode(): Int
  def typeName: String
}

object UDTRegistration {
  def register(udtClass: String, udt: String): Unit
  def register(udtClass: Class[_], udt: Class[_ <: UserDefinedType[_]]): Unit
  def exists(udtClass: String): Boolean
  def getUDTFor(udtClass: String): Option[UserDefinedType[_]]
}

Usage Example:

// Define a custom UDT
class PointUDT extends UserDefinedType[Point] {
  override def sqlType: DataType = StructType(Seq(
    StructField("x", DoubleType, false),
    StructField("y", DoubleType, false)
  ))
  
  override def serialize(point: Point): Any = {
    InternalRow(point.x, point.y)
  }
  
  override def deserialize(datum: Any): Point = {
    val row = datum.asInstanceOf[InternalRow]
    Point(row.getDouble(0), row.getDouble(1))
  }
  
  override def userClass: Class[Point] = classOf[Point]
}

// Register the UDT
UDTRegistration.register(classOf[Point].getName, classOf[PointUDT].getName)

Type Conversion and Utilities

object DataType {
  def fromJson(json: String): DataType
  def fromDDL(ddl: String): DataType
  def equalsIgnoreNullability(left: DataType, right: DataType): Boolean
  def equalsIgnoreCaseAndNullability(left: DataType, right: DataType): Boolean
}

abstract class AbstractDataType {
  def defaultConcreteType: DataType
  def acceptsType(other: DataType): Boolean
  def simpleString: String
}

Usage Example:

// Parse DDL
val schema = DataType.fromDDL("struct<name:string,age:int,scores:array<double>>")

// Type checking
val accepts = IntegerType.acceptsType(ByteType) // true
val equal = DataType.equalsIgnoreNullability(
  StructType(Seq(StructField("x", IntegerType, true))),
  StructType(Seq(StructField("x", IntegerType, false)))
) // true

Common Patterns

Schema Evolution

// Start with base schema
val v1Schema = StructType(Array(
  StructField("id", LongType, false),
  StructField("name", StringType, false)
))

// Add new fields (backwards compatible)
val v2Schema = v1Schema
  .add("email", StringType, nullable = true)
  .add("created_at", TimestampType, nullable = true)

// Merge schemas
val mergedSchema = v1Schema.merge(v2Schema)

Dynamic Schema Creation

def createSchema(fields: Seq[(String, DataType, Boolean)]): StructType = {
  StructType(fields.map { case (name, dataType, nullable) =>
    StructField(name, dataType, nullable)
  }.toArray)
}

val dynamicSchema = createSchema(Seq(
  ("user_id", LongType, false),
  ("preferences", MapType(StringType, StringType), true),
  ("tags", ArrayType(StringType), true)
))

The data type system in Catalyst provides comprehensive support for all SQL data types with rich metadata capabilities, making it suitable for complex schema evolution and type-safe query processing.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-catalyst-2-13

docs

connectors.md

data-types.md

expressions.md

index.md

query-plans.md

tile.json