Catalyst is a library for manipulating relational query plans used as the foundation for Spark SQL's query optimizer and execution engine
npx @tessl/cli install tessl/maven-org-apache-spark--spark-catalyst_2-10@1.6.0Spark Catalyst is an extensible query optimizer and execution planning framework that serves as the foundation of Apache Spark's SQL engine. It provides a comprehensive set of tools for representing, manipulating, and optimizing relational query plans through a tree-based representation system that supports complex transformations including predicate pushdown, constant folding, and projection pruning.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.10</artifactId>
<version>1.6.3</version>
</dependency>import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees._
import org.apache.spark.sql.types._import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
// Create a simple logical plan
val schema = StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true)
))
// Work with expressions
val idRef = AttributeReference("id", IntegerType, nullable = false)()
val nameRef = AttributeReference("name", StringType, nullable = true)()
val literal = Literal(42, IntegerType)
// Create a simple filter expression
val filterExpr = EqualTo(idRef, literal)
// Create a Row
val row = Row(1, "Alice")
val name = row.getString(1)
val id = row.getInt(0)Spark Catalyst is built around several key architectural components:
Core interface for working with structured row data, providing both generic and type-safe access methods.
trait Row extends Serializable {
def size: Int
def length: Int
def schema: StructType
def apply(i: Int): Any
def get(i: Int): Any
def isNullAt(i: Int): Boolean
def getBoolean(i: Int): Boolean
def getByte(i: Int): Byte
def getShort(i: Int): Short
def getInt(i: Int): Int
def getLong(i: Int): Long
def getFloat(i: Int): Float
def getDouble(i: Int): Double
def getString(i: Int): String
def getDecimal(i: Int): java.math.BigDecimal
def getDate(i: Int): java.sql.Date
def getTimestamp(i: Int): java.sql.Timestamp
def getAs[T](i: Int): T
def getAs[T](fieldName: String): T
def copy(): Row
}
object Row {
def apply(values: Any*): Row
def fromSeq(values: Seq[Any]): Row
def fromTuple(tuple: Product): Row
def merge(rows: Row*): Row
val empty: Row
}Foundation for all Catalyst data structures, providing tree traversal, transformation, and manipulation capabilities.
abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
def children: Seq[BaseType]
def fastEquals(other: TreeNode[_]): Boolean
// Tree traversal and transformation methods
}
case class Origin(
line: Option[Int] = None,
startPosition: Option[Int] = None
)
object CurrentOrigin {
def get: Origin
def set(o: Origin): Unit
def reset(): Unit
def withOrigin[A](o: Origin)(f: => A): A
}Comprehensive type system supporting all SQL types including primitives, complex nested types, and user-defined types.
abstract class DataType extends AbstractDataType {
def defaultSize: Int
def typeName: String
def json: String
def prettyJson: String
def simpleString: String
def sameType(other: DataType): Boolean
}
// Primitive types (objects)
object BooleanType extends DataType
object ByteType extends DataType
object ShortType extends DataType
object IntegerType extends DataType
object LongType extends DataType
object FloatType extends DataType
object DoubleType extends DataType
object StringType extends DataType
object BinaryType extends DataType
object DateType extends DataType
object TimestampType extends DataTypeExtensible expression evaluation framework with support for code generation and complex expression trees.
abstract class Expression extends TreeNode[Expression] {
def foldable: Boolean
def deterministic: Boolean
def nullable: Boolean
def references: AttributeSet
def eval(input: InternalRow): Any
def dataType: DataType
}
// Expression hierarchy
trait LeafExpression extends Expression
trait UnaryExpression extends Expression
trait BinaryExpression extends Expression
trait BinaryOperator extends BinaryExpression
// Key expression types
case class Literal(value: Any, dataType: DataType) extends LeafExpression
case class AttributeReference(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata = Metadata.empty) extends Attribute
case class Cast(child: Expression, dataType: DataType) extends UnaryExpressionLogical and physical query plan representations with transformation and optimization capabilities.
abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
def output: Seq[Attribute]
def outputSet: AttributeSet
def references: AttributeSet
def inputSet: AttributeSet
def transformExpressions(rule: PartialFunction[Expression, Expression]): this.type
}
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
def analyzed: Boolean
def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan
def resolveExpressions(r: PartialFunction[Expression, Expression]): LogicalPlan
}Semantic analysis system for resolving unresolved references, type checking, and plan validation.
class Analyzer(catalog: Catalog, registry: FunctionRegistry, conf: CatalystConf) extends RuleExecutor[LogicalPlan] {
def execute(plan: LogicalPlan): LogicalPlan
}
trait Catalog {
def lookupRelation(name: Seq[String]): LogicalPlan
def functionExists(name: String): Boolean
def lookupFunction(name: String, children: Seq[Expression]): Expression
}
trait FunctionRegistry {
def registerFunction(name: String, info: ExpressionInfo, builder: Seq[Expression] => Expression): Unit
def lookupFunction(name: String, children: Seq[Expression]): Expression
def functionExists(name: String): Boolean
}Rule-based optimization system with built-in optimizations for query plan improvement.
object Optimizer extends RuleExecutor[LogicalPlan] {
// Key optimization rules available as objects
object ConstantFolding extends Rule[LogicalPlan]
object BooleanSimplification extends Rule[LogicalPlan]
object ColumnPruning extends Rule[LogicalPlan]
object FilterPushdown extends Rule[LogicalPlan]
object ProjectCollapsing extends Rule[LogicalPlan]
}
abstract class Rule[TreeType <: TreeNode[TreeType]] {
def apply(plan: TreeType): TreeType
}
abstract class RuleExecutor[TreeType <: TreeNode[TreeType]] {
def execute(plan: TreeType): TreeType
def batches: Seq[Batch]
}// Complex data types
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType
case class MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) extends DataType
case class StructType(fields: Array[StructField]) extends DataType
case class StructField(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata)
// Decimal types
case class DecimalType(precision: Int, scale: Int) extends DataType
case class Decimal(value: java.math.BigDecimal) extends Ordered[Decimal]
// Analysis types
type Resolver = (String, String) => Boolean
case class TypeCheckResult(isSuccess: Boolean, errorMessage: Option[String])
// Configuration
trait CatalystConf {
def caseSensitiveAnalysis: Boolean
def orderByOrdinal: Boolean
def groupByOrdinal: Boolean
}
// Table identification
case class TableIdentifier(table: String, database: Option[String])
// Exceptions
class AnalysisException(message: String, line: Option[Int], startPosition: Option[Int]) extends Exception