Catalyst query optimization framework and expression evaluation engine for Apache Spark SQL
—
Catalyst's analysis framework transforms unresolved logical plans into fully resolved, type-checked plans. The analyzer resolves references, validates types, binds to catalog metadata, and ensures query correctness before optimization.
Main analysis engine that resolves logical plans through rule application:
/**
* Main analysis engine that resolves logical plans
* Applies resolution rules iteratively until fixed point or max iterations
*/
class Analyzer(
catalog: SessionCatalog,
conf: SQLConf,
maxIterations: Int = 100)
extends RuleExecutor[LogicalPlan] with CheckAnalysis {
/** Execute analysis on a logical plan */
def execute(plan: LogicalPlan): LogicalPlan = {
AnalysisContext.reset()
try {
executeSameContext(plan)
} finally {
AnalysisContext.reset()
}
}
/** Execute analysis and validate the result */
def executeAndCheck(plan: LogicalPlan): LogicalPlan = {
val analyzed = execute(plan)
try {
checkAnalysis(analyzed)
analyzed
} catch {
case e: AnalysisException =>
val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed))
ae.setStackTrace(e.getStackTrace)
throw ae
}
}
/** Check if plan is fully analyzed */
def checkAnalysis(plan: LogicalPlan): Unit = {
def checkExpressions(exprs: Seq[Expression]): Unit = {
exprs.foreach { expr =>
expr.foreach {
case _: UnresolvedAttribute =>
throw new AnalysisException(s"Unresolved attribute: ${expr.prettyName}")
case _: UnresolvedFunction =>
throw new AnalysisException(s"Unresolved function: ${expr.prettyName}")
case _ =>
}
}
}
plan.foreach {
case p if !p.resolved =>
throw new AnalysisException(s"Unresolved plan: ${p.nodeName}")
case p =>
checkExpressions(p.expressions)
}
}
/** Rule batches for iterative analysis */
lazy val batches: Seq[Batch] = Seq(
Batch("Hints", Once,
new ResolveHints.ResolveBroadcastHints(conf)),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
EliminateUnions),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveRelations :: ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
ResolveNewInstance ::
ResolveUpCast ::
ResolveGroupingAnalytics ::
ResolvePivot ::
ResolveOrdinalInOrderByAndGroupBy ::
ResolveMissingReferences ::
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions ::
ResolveAliases ::
ResolveSubquery ::
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalAndUsingJoin ::
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
TimeWindowing ::
ResolveInlineTables ::
TypeCoercion.typeCoercionRules ++
extendedResolutionRules: _*),
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
Batch("View", Once,
AliasViewChild),
Batch("Nondeterministic", Once,
PullOutNondeterministic),
Batch("UDF", Once,
HandleNullInputsForUDF),
Batch("Cleanup", fixedPoint,
CleanupAliases)
)
}Usage Examples:
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
// Create analyzer with catalog and configuration
val catalog = new SessionCatalog(externalCatalog, globalTempViewManager,
functionResourceLoader, functionRegistry, conf, hadoopConf)
val analyzer = new Analyzer(catalog, conf, maxIterations = 100)
// Parse and analyze a query
val sqlText = "SELECT name, age FROM users WHERE age > 25"
val unresolvedPlan = CatalystSqlParser.parsePlan(sqlText)
val analyzedPlan = analyzer.execute(unresolvedPlan)
// Verify analysis succeeded
analyzer.checkAnalysis(analyzedPlan)
require(analyzedPlan.resolved, "Plan must be fully resolved after analysis")Core rules that resolve different aspects of logical plans:
/**
* Resolves relations (tables) against the session catalog
*/
object ResolveRelations extends Rule[LogicalPlan] {
def ruleName: String = "ResolveRelations"
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(tableIdentifier, alias) =>
try {
val table = lookupTableFromCatalog(tableIdentifier)
val relationPlan = UnresolvedCatalogRelation(table, alias)
relationPlan
} catch {
case _: NoSuchTableException =>
u
}
}
}
/**
* Resolves attribute references against available input attributes
*/
object ResolveReferences extends Rule[LogicalPlan] {
def ruleName: String = "ResolveReferences"
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan => p.transformExpressionsUp {
case u @ UnresolvedAttribute(nameParts) =>
val resolved = resolveAttribute(u, p.inputSet)
resolved.getOrElse(u)
}
}
private def resolveAttribute(
u: UnresolvedAttribute,
input: AttributeSet): Option[NamedExpression] = {
val candidates = input.filter { attr =>
// Match attribute name (case-insensitive)
resolver(attr.name, u.name)
}
candidates.toSeq match {
case Seq(a) => Some(a)
case Nil => None
case ambiguous =>
throw new AnalysisException(s"Ambiguous reference to ${u.name}: $ambiguous")
}
}
}
/**
* Resolves function calls to registered functions
*/
object ResolveFunctions extends Rule[LogicalPlan] {
def ruleName: String = "ResolveFunctions"
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case q: LogicalPlan => q.transformExpressionsUp {
case u @ UnresolvedFunction(name, children, isDistinct) =>
withPosition(u) {
try {
val info = catalog.lookupFunction(name, children)
val function = info.builder(children)
function
} catch {
case _: AnalysisException => u
}
}
}
}
}
/**
* Resolves aliases in SELECT lists and ensures proper naming
*/
object ResolveAliases extends Rule[LogicalPlan] {
def ruleName: String = "ResolveAliases"
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case Project(projectList, child) if child.resolved =>
val resolvedProjectList = projectList.map {
case u @ UnresolvedAlias(child, aliasFunc) =>
aliasFunc.map(_(child)).getOrElse(Alias(child, toPrettySQL(child))())
case other => other
}
Project(resolvedProjectList, child)
}
}
/**
* Resolves aggregate functions and ensures proper grouping
*/
object ResolveAggregateFunctions extends Rule[LogicalPlan] {
def ruleName: String = "ResolveAggregateFunctions"
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case a @ Aggregate(grouping, aggregates, child) if child.resolved =>
val resolvedAggregates = aggregates.map { expr =>
expr.transformUp {
case UnresolvedFunction(name, children, isDistinct) =>
// Resolve to appropriate aggregate function
catalog.lookupFunction(name, children) match {
case info if info.isAggregate =>
info.builder(children)
case _ =>
throw new AnalysisException(s"${name.unquotedString} is not an aggregate function")
}
}
}
a.copy(aggregateExpressions = resolvedAggregates)
}
}Type validation and automatic type conversions:
/**
* Result of expression type checking
*/
sealed trait TypeCheckResult {
def isSuccess: Boolean
}
case object TypeCheckSuccess extends TypeCheckResult {
def isSuccess: Boolean = true
}
case class TypeCheckFailure(message: String) extends TypeCheckResult {
def isSuccess: Boolean = false
}
/**
* Trait for expressions that expect specific input types
*/
trait ExpectsInputTypes extends Expression {
/** Expected input data types in order */
def inputTypes: Seq[AbstractDataType]
/** Check if input types match expectations */
override def checkInputDataTypes(): TypeCheckResult = {
val expectedTypes = inputTypes
val actualTypes = children.map(_.dataType)
if (expectedTypes.length != actualTypes.length) {
return TypeCheckFailure(
s"${prettyName} requires ${expectedTypes.length} arguments, " +
s"but ${actualTypes.length} were provided"
)
}
expectedTypes.zip(actualTypes).zipWithIndex.foreach {
case ((expected, actual), index) =>
if (!expected.acceptsType(actual)) {
return TypeCheckFailure(
s"${prettyName} argument ${index + 1} requires ${expected.simpleString} type, " +
s"not ${actual.catalogString}"
)
}
}
TypeCheckSuccess
}
}
/**
* Type coercion rules that automatically convert compatible types
*/
object TypeCoercion {
/** All type coercion rules */
val typeCoercionRules: List[Rule[LogicalPlan]] = List(
InConversion,
WidenSetOperationTypes,
PromoteStrings,
DecimalPrecision,
BooleanEquality,
FunctionArgumentConversion,
ConcatCoercion,
EltCoercion,
CaseWhenCoercion,
IfCoercion,
StackCoercion,
Division,
ImplicitTypeCasts,
DateTimeOperations,
WindowFrameCoercion
)
}
/**
* Promotes string literals in numeric contexts
*/
object PromoteStrings extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case q: LogicalPlan => q.transformExpressionsUp {
case a @ Add(left @ StringType(), right) if right.dataType.isInstanceOf[NumericType] =>
a.copy(left = Cast(left, right.dataType))
case a @ Add(left, right @ StringType()) if left.dataType.isInstanceOf[NumericType] =>
a.copy(right = Cast(right, left.dataType))
// Similar rules for other arithmetic operations...
}
}
}Usage Examples:
// Type checking example
val addExpr = Add(
AttributeReference("age", IntegerType)(),
Literal("25", StringType) // String literal
)
// Before type coercion - would fail type check
val typeCheckResult = addExpr.checkInputDataTypes()
// typeCheckResult: TypeCheckFailure
// After applying PromoteStrings rule
val coercedExpr = addExpr.transform {
case Add(left, right @ Literal(_, StringType)) =>
Add(left, Cast(right, IntegerType))
}
// Now type check passes
val newResult = coercedExpr.checkInputDataTypes()
// newResult: TypeCheckSuccessRegistry system for SQL functions and user-defined functions:
/**
* Registry for SQL functions
*/
trait FunctionRegistry {
/** Register a function */
def registerFunction(
name: FunctionIdentifier,
info: ExpressionInfo,
builder: FunctionBuilder): Unit
/** Look up function metadata */
def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo]
/** Look up function and create expression */
def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression
/** List all registered functions */
def listFunction(): Seq[FunctionIdentifier]
/** Drop a function */
def dropFunction(name: FunctionIdentifier): Boolean
/** Clear all functions */
def clear(): Unit
/** Create a copy of this registry */
def clone(): FunctionRegistry
}
/**
* Simple implementation of FunctionRegistry
*/
class SimpleFunctionRegistry extends FunctionRegistry {
private val functionBuilders =
new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]
def registerFunction(
name: FunctionIdentifier,
info: ExpressionInfo,
builder: FunctionBuilder): Unit = {
functionBuilders(normalizeIdentifier(name)) = (info, builder)
}
def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] = {
functionBuilders.get(normalizeIdentifier(name)).map(_._1)
}
def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = {
val normalizedName = normalizeIdentifier(name)
functionBuilders.get(normalizedName) match {
case Some((info, builder)) =>
try {
builder(children)
} catch {
case ex: Exception =>
throw new AnalysisException(s"Invalid function call: ${name.unquotedString}", cause = Some(ex))
}
case None =>
throw new AnalysisException(s"Undefined function: ${name.unquotedString}")
}
}
private def normalizeIdentifier(name: FunctionIdentifier): FunctionIdentifier = {
FunctionIdentifier(name.funcName.toLowerCase, name.database.map(_.toLowerCase))
}
}
/**
* Function builder type alias
*/
type FunctionBuilder = Seq[Expression] => Expression
/**
* Information about a function for the catalog
*/
case class ExpressionInfo(
className: String,
name: String,
usage: String,
extended: String,
note: String = "",
group: String = "",
since: String = "",
deprecated: String = "") {
def getUsage: String = if (usage.nonEmpty) usage else "N/A"
def getExtended: String = if (extended.nonEmpty) extended else "N/A"
}
/**
* Identifier for functions (name + optional database)
*/
case class FunctionIdentifier(funcName: String, database: Option[String] = None) {
def unquotedString: String =
database.map(db => s"$db.$funcName").getOrElse(funcName)
def quotedString: String =
database.map(db => s"`$db`.`$funcName`").getOrElse(s"`$funcName`")
override def toString: String = unquotedString
}Usage Examples:
// Create function registry
val registry = new SimpleFunctionRegistry()
// Register built-in functions
registry.registerFunction(
FunctionIdentifier("upper"),
ExpressionInfo("org.apache.spark.sql.catalyst.expressions.Upper", "upper",
"upper(str) - Returns str with all characters changed to uppercase"),
(children: Seq[Expression]) => {
require(children.length == 1, "upper requires exactly 1 argument")
Upper(children.head)
}
)
registry.registerFunction(
FunctionIdentifier("length"),
ExpressionInfo("org.apache.spark.sql.catalyst.expressions.Length", "length",
"length(str) - Returns the character length of str"),
(children: Seq[Expression]) => {
require(children.length == 1, "length requires exactly 1 argument")
Length(children.head)
}
)
// Look up and use functions
val upperFunc = registry.lookupFunction(
FunctionIdentifier("upper"),
Seq(Literal("hello"))
)
// upperFunc: Upper(Literal("hello"))
val lengthFunc = registry.lookupFunction(
FunctionIdentifier("length"),
Seq(AttributeReference("name", StringType)())
)
// lengthFunc: Length(AttributeReference("name", StringType))
// List registered functions
val allFunctions = registry.listFunction()
// allFunctions: Seq(FunctionIdentifier("upper"), FunctionIdentifier("length"))Placeholder expressions used before analysis resolution:
/**
* Unresolved attribute reference (before resolution)
*/
case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute with Unevaluable {
def name: String = nameParts.mkString(".")
def exprId: ExprId = throw new UnresolvedException(this, "exprId")
def dataType: DataType = throw new UnresolvedException(this, "dataType")
def nullable: Boolean = throw new UnresolvedException(this, "nullable")
def qualifiers: Seq[String] = throw new UnresolvedException(this, "qualifiers")
def metadata: Metadata = throw new UnresolvedException(this, "metadata")
override def toString: String = s"'${nameParts.mkString(".")}"
def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute")
def newInstance(): NamedExpression = this
def withName(newName: String): Attribute = UnresolvedAttribute(Seq(newName))
def withQualifier(newQualifier: Seq[String]): Attribute = this
def withExprId(newExprId: ExprId): Attribute = this
def withDataType(newType: DataType): Attribute = this
def withNullability(newNullability: Boolean): Attribute = this
def withMetadata(newMetadata: Metadata): Attribute = this
}
/**
* Unresolved function call (before resolution)
*/
case class UnresolvedFunction(
name: FunctionIdentifier,
children: Seq[Expression],
isDistinct: Boolean) extends Expression with Unevaluable {
def dataType: DataType = throw new UnresolvedException(this, "dataType")
def nullable: Boolean = throw new UnresolvedException(this, "nullable")
override def toString: String = {
val distinct = if (isDistinct) "DISTINCT " else ""
s"'${name.unquotedString}($distinct${children.mkString(", ")})"
}
override def prettyName: String = name.unquotedString
}
/**
* Unresolved star expression (*)
*/
case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable {
override def toString: String = target match {
case None => "*"
case Some(prefix) => prefix.mkString(".") + ".*"
}
def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = {
val exprs = input.output
target match {
case Some(parts) if parts.length == 1 =>
// Qualified star like "table.*"
val qualifier = parts.head
exprs.filter(_.qualifiers.nonEmpty && resolver(_.qualifiers.last, qualifier))
case _ =>
// Unqualified star "*"
exprs
}
}
}
/**
* Unresolved alias (before name resolution)
*/
case class UnresolvedAlias(
child: Expression,
aliasFunc: Option[Expression => String] = None) extends UnaryExpression with NamedExpression with Unevaluable {
def name: String = throw new UnresolvedException(this, "name")
def exprId: ExprId = throw new UnresolvedException(this, "exprId")
def qualifiers: Seq[String] = throw new UnresolvedException(this, "qualifiers")
def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute")
def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance")
def metadata: Metadata = Metadata.empty
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
override def toString: String = s"unresolvedalias(${child})"
}
/**
* Exception thrown when accessing unresolved properties
*/
class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: String)
extends AnalysisException(
s"Invalid call to $function on unresolved object, tree: $tree")Usage Examples:
// Unresolved expressions before analysis
val unresolvedAttr = UnresolvedAttribute(Seq("users", "name"))
val unresolvedFunc = UnresolvedFunction(
FunctionIdentifier("upper"),
Seq(UnresolvedAttribute(Seq("name"))),
isDistinct = false
)
val unresolvedStar = UnresolvedStar(Some(Seq("users"))) // users.*
val globalStar = UnresolvedStar(None) // *
// These will be resolved during analysis
val project = Project(
Seq(
unresolvedStar.expand(inputPlan, resolver),
Alias(unresolvedFunc, "upper_name")()
),
inputPlan
)
// Accessing unresolved properties throws exceptions
try {
val dataType = unresolvedAttr.dataType // Throws UnresolvedException
} catch {
case _: UnresolvedException =>
println("Cannot access dataType on unresolved attribute")
}Exception types for analysis errors and validation failures:
/**
* Exception thrown when a query fails to analyze
*/
case class AnalysisException(
message: String,
line: Option[Int] = None,
startPosition: Option[Int] = None,
plan: Option[LogicalPlan] = None,
cause: Option[Throwable] = None) extends Exception(message, cause.orNull) with Serializable {
def withPosition(line: Option[Int], startPosition: Option[Int]): AnalysisException = {
copy(line = line, startPosition = startPosition)
}
override def getMessage: String = {
val planAnnotation = plan.map(p => s";\n$p").getOrElse("")
getSimpleMessage + planAnnotation
}
// Get message without plan annotation (for testing)
def getSimpleMessage: String = {
val positionInfo = (line, startPosition) match {
case (Some(l), Some(p)) => s" (line $l, pos $p)"
case (Some(l), None) => s" (line $l)"
case _ => ""
}
message + positionInfo
}
}
/**
* Base class for "not found" exceptions
*/
abstract class NoSuchItemException(protected val message: String) extends AnalysisException(message)
case class NoSuchDatabaseException(db: String) extends NoSuchItemException(s"Database '$db' not found")
case class NoSuchTableException(db: String, table: String)
extends NoSuchItemException(s"Table or view '$table' not found in database '$db'")
case class NoSuchFunctionException(db: String, func: String)
extends NoSuchItemException(s"Function '$func' not found in database '$db'")
case class NoSuchPartitionException(db: String, table: String, spec: TablePartitionSpec)
extends NoSuchItemException(s"Partition not found in table '$db.$table'")
case class TableAlreadyExistsException(db: String, table: String)
extends AnalysisException(s"Table '$table' already exists in database '$db'")
case class DatabaseAlreadyExistsException(db: String)
extends AnalysisException(s"Database '$db' already exists")
case class FunctionAlreadyExistsException(db: String, func: String)
extends AnalysisException(s"Function '$func' already exists in database '$db'")Usage Examples:
// Analysis exception handling
try {
val analyzedPlan = analyzer.execute(unresolvedPlan)
} catch {
case ae: AnalysisException =>
println(s"Analysis failed: ${ae.getSimpleMessage}")
ae.plan.foreach(p => println(s"Problematic plan: $p"))
case nste: NoSuchTableException =>
println(s"Table not found: ${nste.getMessage}")
case nsde: NoSuchDatabaseException =>
println(s"Database not found: ${nsde.getMessage}")
}
// Create analysis exceptions with position info
val exception = AnalysisException(
"Column 'xyz' not found",
line = Some(5),
startPosition = Some(12),
plan = Some(problematicPlan)
)
throw exception.withPosition(Some(10), Some(5))Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-catalyst