Catalyst is Spark's library for manipulating relational query plans and expressions
—
Catalyst's data type system provides comprehensive support for all SQL data types with full type safety, JSON serialization, and metadata support. The type system is the foundation for schema definition, expression evaluation, and query optimization.
import org.apache.spark.sql.types._abstract class DataType extends AbstractDataType {
def defaultSize: Int
def typeName: String
def json: String
def prettyJson: String
def simpleString: String
def catalogString: String
def sql: String
private[spark] def sameType(other: DataType): Boolean
private[spark] def asNullable: DataType
}The base DataType class provides common functionality for all Spark SQL types including serialization, string representation, and type comparison.
object BooleanType extends DataType
object ByteType extends DataType
object ShortType extends DataType
object IntegerType extends DataType
object LongType extends DataType
object FloatType extends DataType
object DoubleType extends DataType
case class DecimalType(precision: Int, scale: Int) extends FractionalType {
def this() = this(10, 0)
}
object DecimalType {
val SYSTEM_DEFAULT: DecimalType
val USER_DEFAULT: DecimalType
def apply(): DecimalType
def bounded(precision: Int, scale: Int): DecimalType
def unbounded: DecimalType
}Usage Example:
val intCol = StructField("count", IntegerType, nullable = false)
val priceCol = StructField("price", DecimalType(10, 2), nullable = true)
val flagCol = StructField("active", BooleanType, nullable = false)class StringType(val collationId: Int) extends AtomicType {
def this() = this(0)
def getCollationSql: String
}
object StringType extends StringType(0)
case class CharType(length: Int) extends AtomicType
case class VarcharType(length: Int) extends AtomicTypeUsage Example:
val nameCol = StructField("name", StringType, nullable = false)
val codeCol = StructField("code", CharType(10), nullable = false)
val descCol = StructField("description", VarcharType(255), nullable = true)object BinaryType extends DataType
object DateType extends DataType
object TimestampType extends DataType
object TimestampNTZType extends DataType
object CalendarIntervalType extends DataType
case class DayTimeIntervalType(startField: Byte, endField: Byte) extends DataType
case class YearMonthIntervalType(startField: Byte, endField: Byte) extends DataTypeUsage Example:
val createdCol = StructField("created_at", TimestampType, nullable = false)
val birthdateCol = StructField("birthdate", DateType, nullable = true)
val dataCol = StructField("data", BinaryType, nullable = true)object NullType extends DataType
object VariantType extends DataType
case class ObjectType(cls: Class[_]) extends DataTypecase class StructType(fields: Array[StructField]) extends DataType {
def this(fields: Seq[StructField]) = this(fields.toArray)
def this(fields: java.util.List[StructField]) = this(fields.asScala.toArray)
// Field access
def apply(name: String): StructField
def apply(names: Seq[String]): StructField
def fieldNames: Array[String]
def names: Seq[String]
def length: Int
def iterator: Iterator[StructField]
// Field manipulation
def add(field: StructField): StructType
def add(name: String, dataType: DataType): StructType
def add(name: String, dataType: DataType, nullable: Boolean): StructType
def add(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata): StructType
def add(name: String, dataType: String): StructType
def add(name: String, dataType: String, nullable: Boolean): StructType
def add(name: String, dataType: String, nullable: Boolean, metadata: Metadata): StructType
// Schema operations
def merge(that: StructType): StructType
def remove(fieldNames: Set[String]): StructType
def dropFields(fieldNames: String*): StructType
def getFieldIndex(name: String): Option[Int]
}
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty
) {
def getComment(): Option[String]
}Usage Example:
val schema = StructType(Array(
StructField("id", LongType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("email", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("created_at", TimestampType, nullable = false)
))
// Add fields
val extendedSchema = schema
.add("last_login", TimestampType, nullable = true)
.add("is_active", BooleanType, nullable = false)
// Access fields
val nameField = schema("name")
val fieldNames = schema.fieldNamescase class ArrayType(elementType: DataType, containsNull: Boolean = true) extends DataType {
def simpleString: String
}
object ArrayType {
def apply(elementType: DataType): ArrayType = ArrayType(elementType, containsNull = true)
}Usage Example:
val tagsCol = StructField("tags", ArrayType(StringType), nullable = true)
val scoresCol = StructField("scores", ArrayType(IntegerType, containsNull = false), nullable = false)
val nestedCol = StructField("matrix", ArrayType(ArrayType(DoubleType)), nullable = true)case class MapType(
keyType: DataType,
valueType: DataType,
valueContainsNull: Boolean = true
) extends DataType
object MapType {
def apply(keyType: DataType, valueType: DataType): MapType =
MapType(keyType, valueType, valueContainsNull = true)
}Usage Example:
val propsCol = StructField("properties", MapType(StringType, StringType), nullable = true)
val countsCol = StructField("counts", MapType(StringType, LongType, valueContainsNull = false), nullable = false)
val nestedCol = StructField("nested", MapType(StringType, ArrayType(IntegerType)), nullable = true)class Metadata private (private val map: Map[String, Any]) {
def contains(key: String): Boolean
def getLong(key: String): Long
def getDouble(key: String): Double
def getBoolean(key: String): Boolean
def getString(key: String): String
def getMetadata(key: String): Metadata
def getLongArray(key: String): Array[Long]
def getDoubleArray(key: String): Array[Double]
def getBooleanArray(key: String): Array[Boolean]
def getStringArray(key: String): Array[String]
def getMetadataArray(key: String): Array[Metadata]
def json: String
}
object Metadata {
val empty: Metadata
def fromJson(json: String): Metadata
}
class MetadataBuilder {
def putLong(key: String, value: Long): MetadataBuilder
def putDouble(key: String, value: Double): MetadataBuilder
def putBoolean(key: String, value: Boolean): MetadataBuilder
def putString(key: String, value: String): MetadataBuilder
def putMetadata(key: String, value: Metadata): MetadataBuilder
def putLongArray(key: String, value: Array[Long]): MetadataBuilder
def putDoubleArray(key: String, value: Array[Double]): MetadataBuilder
def putBooleanArray(key: String, value: Array[Boolean]): MetadataBuilder
def putStringArray(key: String, value: Array[String]): MetadataBuilder
def putMetadataArray(key: String, value: Array[Metadata]): MetadataBuilder
def remove(key: String): MetadataBuilder
def build(): Metadata
}Usage Example:
val metadata = new MetadataBuilder()
.putString("comment", "User identifier")
.putLong("maxLength", 100)
.putBoolean("required", true)
.build()
val fieldWithMetadata = StructField("user_id", StringType, nullable = false, metadata)abstract class UserDefinedType[UserType >: Null] extends DataType {
def sqlType: DataType
def serialize(obj: UserType): Any
def deserialize(datum: Any): UserType
def userClass: Class[UserType]
def equals(o: Any): Boolean
def hashCode(): Int
def typeName: String
}
object UDTRegistration {
def register(udtClass: String, udt: String): Unit
def register(udtClass: Class[_], udt: Class[_ <: UserDefinedType[_]]): Unit
def exists(udtClass: String): Boolean
def getUDTFor(udtClass: String): Option[UserDefinedType[_]]
}Usage Example:
// Define a custom UDT
class PointUDT extends UserDefinedType[Point] {
override def sqlType: DataType = StructType(Seq(
StructField("x", DoubleType, false),
StructField("y", DoubleType, false)
))
override def serialize(point: Point): Any = {
InternalRow(point.x, point.y)
}
override def deserialize(datum: Any): Point = {
val row = datum.asInstanceOf[InternalRow]
Point(row.getDouble(0), row.getDouble(1))
}
override def userClass: Class[Point] = classOf[Point]
}
// Register the UDT
UDTRegistration.register(classOf[Point].getName, classOf[PointUDT].getName)object DataType {
def fromJson(json: String): DataType
def fromDDL(ddl: String): DataType
def equalsIgnoreNullability(left: DataType, right: DataType): Boolean
def equalsIgnoreCaseAndNullability(left: DataType, right: DataType): Boolean
}
abstract class AbstractDataType {
def defaultConcreteType: DataType
def acceptsType(other: DataType): Boolean
def simpleString: String
}Usage Example:
// Parse DDL
val schema = DataType.fromDDL("struct<name:string,age:int,scores:array<double>>")
// Type checking
val accepts = IntegerType.acceptsType(ByteType) // true
val equal = DataType.equalsIgnoreNullability(
StructType(Seq(StructField("x", IntegerType, true))),
StructType(Seq(StructField("x", IntegerType, false)))
) // true// Start with base schema
val v1Schema = StructType(Array(
StructField("id", LongType, false),
StructField("name", StringType, false)
))
// Add new fields (backwards compatible)
val v2Schema = v1Schema
.add("email", StringType, nullable = true)
.add("created_at", TimestampType, nullable = true)
// Merge schemas
val mergedSchema = v1Schema.merge(v2Schema)def createSchema(fields: Seq[(String, DataType, Boolean)]): StructType = {
StructType(fields.map { case (name, dataType, nullable) =>
StructField(name, dataType, nullable)
}.toArray)
}
val dynamicSchema = createSchema(Seq(
("user_id", LongType, false),
("preferences", MapType(StringType, StringType), true),
("tags", ArrayType(StringType), true)
))The data type system in Catalyst provides comprehensive support for all SQL data types with rich metadata capabilities, making it suitable for complex schema evolution and type-safe query processing.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-catalyst-2-13