Catalyst query optimization framework and expression evaluation engine for Apache Spark SQL
—
This section covers the core data type system of Spark Catalyst, including primitive types, complex types (arrays, maps, structs), Row interface for data access, and encoders for type conversion.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.EncodersThe Row trait provides the primary interface for accessing structured data in Spark SQL.
trait Row {
def apply(i: Int): Any
def get(i: Int): Any
def isNullAt(i: Int): Boolean
def getInt(i: Int): Int
def getLong(i: Int): Long
def getFloat(i: Int): Float
def getDouble(i: Int): Double
def getString(i: Int): String
def getBoolean(i: Int): Boolean
def getByte(i: Int): Byte
def getShort(i: Int): Short
def getDecimal(i: Int): java.math.BigDecimal
def getDate(i: Int): java.sql.Date
def getTimestamp(i: Int): java.sql.Timestamp
def getSeq[T](i: Int): Seq[T]
def getList[T](i: Int): java.util.List[T]
def getMap[K, V](i: Int): scala.collection.Map[K, V]
def getJavaMap[K, V](i: Int): java.util.Map[K, V]
def getStruct(i: Int): Row
def getAs[T](i: Int): T
def getAs[T](fieldName: String): T
def length: Int
def size: Int
def schema: StructType
def copy(): Row
def toSeq: Seq[Any]
}Factory methods for creating Row instances.
object Row {
def unapplySeq(row: Row): Some[Seq[Any]]
def apply(values: Any*): Row
def fromSeq(values: Seq[Any]): Row
def fromTuple(tuple: Product): Row
def merge(rows: Row*): Row
def empty: Row
}import org.apache.spark.sql.Row
// Create a row
val row = Row("Alice", 25, true)
// Access data by index
val name: String = row.getString(0)
val age: Int = row.getInt(1)
val isActive: Boolean = row.getBoolean(2)
// Generic access
val nameGeneric: String = row.getAs[String](0)
// Check for null values
if (!row.isNullAt(1)) {
val age = row.getInt(1)
}Base class for all Spark SQL data types.
abstract class DataType {
def defaultSize: Int
def typeName: String
def json: String
def prettyJson: String
def simpleString: String
def catalogString: String
def sql: String
def sameType(other: DataType): Boolean
def asNullable: DataType
def existsRecursively(f: (DataType) => Boolean): Boolean
}Factory methods and utilities for DataType instances.
object DataType {
def fromJson(json: String): DataType
def fromDDL(ddl: String): DataType
def equalsIgnoreCompatibleNullability(from: DataType, to: DataType): Boolean
def equalsIgnoreNullability(from: DataType, to: DataType): Boolean
}case object ByteType extends IntegralType
case object ShortType extends IntegralType
case object IntegerType extends IntegralType
case object LongType extends IntegralType
case object FloatType extends FractionalType
case object DoubleType extends FractionalType
case class DecimalType(precision: Int, scale: Int) extends FractionalType {
def this() = this(10, 0)
}
object DecimalType {
val USER_DEFAULT: DecimalType
val SYSTEM_DEFAULT: DecimalType
def apply(): DecimalType
def bounded(precision: Int, scale: Int): DecimalType
def unbounded: DecimalType
def unapply(t: DataType): Option[(Int, Int)]
}case object StringType extends AtomicType
case object BinaryType extends AtomicTypecase object BooleanType extends AtomicTypecase object DateType extends AtomicType
case object TimestampType extends AtomicType
case object CalendarIntervalType extends DataTypecase object NullType extends DataTypecase class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {
def this(elementType: DataType) = this(elementType, containsNull = true)
def buildFormattedString(prefix: String, buffer: StringBuffer): Unit
}
object ArrayType {
def apply(elementType: DataType): ArrayType
}case class MapType(
keyType: DataType,
valueType: DataType,
valueContainsNull: Boolean
) extends DataType {
def this(keyType: DataType, valueType: DataType) =
this(keyType, valueType, valueContainsNull = true)
def buildFormattedString(prefix: String, buffer: StringBuffer): Unit
}
object MapType {
def apply(keyType: DataType, valueType: DataType): MapType
}case class StructType(fields: Array[StructField]) extends DataType {
def this(fields: Seq[StructField]) = this(fields.toArray)
def this(fields: java.util.List[StructField]) = this(fields.asScala.toArray)
def apply(name: String): StructField
def apply(names: Set[String]): StructType
def fieldNames: Array[String]
def names: Seq[String]
def length: Int
def iterator: Iterator[StructField]
def getFieldIndex(name: String): Option[Int]
def indexOf(name: String): Int
def add(field: StructField): StructType
def add(name: String, dataType: DataType): StructType
def add(name: String, dataType: DataType, nullable: Boolean): StructType
def add(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata): StructType
def add(name: String, dataType: String): StructType
def add(name: String, dataType: String, nullable: Boolean): StructType
def add(name: String, dataType: String, nullable: Boolean, metadata: Metadata): StructType
}
object StructType {
def apply(fields: Seq[StructField]): StructType
def apply(fields: java.util.List[StructField]): StructType
def fromDDL(ddl: String): StructType
}case class StructField(
name: String,
dataType: DataType,
nullable: Boolean,
metadata: Metadata
) {
def this(name: String, dataType: DataType, nullable: Boolean) =
this(name, dataType, nullable, Metadata.empty)
def this(name: String, dataType: DataType) =
this(name, dataType, nullable = true, Metadata.empty)
def getComment(): Option[String]
def withComment(comment: String): StructField
}
object StructField {
def apply(name: String, dataType: DataType): StructField
def apply(name: String, dataType: DataType, nullable: Boolean): StructField
}import org.apache.spark.sql.types._
// Create a struct type
val schema = StructType(Array(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = true),
StructField("scores", ArrayType(DoubleType), nullable = true),
StructField("metadata", MapType(StringType, StringType), nullable = true)
))
// Access field information
val nameField = schema("name")
val fieldNames = schema.fieldNames
val nameIndex = schema.indexOf("name")Type-safe conversion between JVM objects and Spark SQL's internal representation.
trait Encoder[T] {
def schema: StructType
def clsTag: ClassTag[T]
}Factory methods for creating encoder instances.
object Encoders {
def BOOLEAN: Encoder[java.lang.Boolean]
def BYTE: Encoder[java.lang.Byte]
def SHORT: Encoder[java.lang.Short]
def INT: Encoder[java.lang.Integer]
def LONG: Encoder[java.lang.Long]
def FLOAT: Encoder[java.lang.Float]
def DOUBLE: Encoder[java.lang.Double]
def STRING: Encoder[java.lang.String]
def DECIMAL: Encoder[java.math.BigDecimal]
def DATE: Encoder[java.sql.Date]
def TIMESTAMP: Encoder[java.sql.Timestamp]
def BINARY: Encoder[Array[Byte]]
def bean[T](beanClass: Class[T]): Encoder[T]
def kryo[T: ClassTag]: Encoder[T]
def kryo[T](clazz: Class[T]): Encoder[T]
def javaSerialization[T: ClassTag]: Encoder[T]
def javaSerialization[T](clazz: Class[T]): Encoder[T]
def tuple[T1, T2](e1: Encoder[T1], e2: Encoder[T2]): Encoder[(T1, T2)]
def tuple[T1, T2, T3](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3]): Encoder[(T1, T2, T3)]
def tuple[T1, T2, T3, T4](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)]
def tuple[T1, T2, T3, T4, T5](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4], e5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)]
def product[T <: Product : TypeTag]: Encoder[T]
def scalaInt: Encoder[Int]
def scalaLong: Encoder[Long]
def scalaDouble: Encoder[Double]
def scalaFloat: Encoder[Float]
def scalaByte: Encoder[Byte]
def scalaShort: Encoder[Short]
def scalaBoolean: Encoder[Boolean]
}import org.apache.spark.sql.Encoders
// Primitive encoders
val stringEncoder = Encoders.STRING
val intEncoder = Encoders.scalaInt
// Bean encoder for Java objects
case class Person(name: String, age: Int)
val personEncoder = Encoders.product[Person]
// Kryo encoder for complex objects
val kryoEncoder = Encoders.kryo[MyComplexClass]Associated metadata for struct fields.
case class Metadata(map: Map[String, Any]) {
def contains(key: String): Boolean
def getLong(key: String): Long
def getDouble(key: String): Double
def getBoolean(key: String): Boolean
def getString(key: String): String
def getMetadata(key: String): Metadata
def getLongArray(key: String): Array[Long]
def getDoubleArray(key: String): Array[Double]
def getBooleanArray(key: String): Array[Boolean]
def getStringArray(key: String): Array[String]
def getMetadataArray(key: String): Array[Metadata]
def json: String
}
object Metadata {
def empty: Metadata
def fromJson(json: String): Metadata
}Exception thrown when query analysis fails.
class AnalysisException(
message: String,
line: Option[Int] = None,
startPosition: Option[Int] = None,
plan: Option[LogicalPlan] = None,
cause: Option[Throwable] = None
) extends Exception {
def withPosition(line: Option[Int], startPosition: Option[Int]): AnalysisException
override def getMessage: String
def getSimpleMessage: String
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-catalyst