CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-catalyst

Catalyst query optimization framework and expression evaluation engine for Apache Spark SQL

Pending
Overview
Eval results
Files

analysis.mddocs/

Analysis Framework and Rules

Catalyst's analysis framework transforms unresolved logical plans into fully resolved, type-checked plans. The analyzer resolves references, validates types, binds to catalog metadata, and ensures query correctness before optimization.

Capabilities

Analyzer Core

Main analysis engine that resolves logical plans through rule application:

/**
 * Main analysis engine that resolves logical plans
 * Applies resolution rules iteratively until fixed point or max iterations
 */
class Analyzer(
    catalog: SessionCatalog,
    conf: SQLConf, 
    maxIterations: Int = 100)
  extends RuleExecutor[LogicalPlan] with CheckAnalysis {
  
  /** Execute analysis on a logical plan */
  def execute(plan: LogicalPlan): LogicalPlan = {
    AnalysisContext.reset()
    try {
      executeSameContext(plan)
    } finally {
      AnalysisContext.reset()
    }
  }
  
  /** Execute analysis and validate the result */
  def executeAndCheck(plan: LogicalPlan): LogicalPlan = {
    val analyzed = execute(plan)
    try {
      checkAnalysis(analyzed)
      analyzed
    } catch {
      case e: AnalysisException =>
        val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed))
        ae.setStackTrace(e.getStackTrace)
        throw ae
    }
  }
  
  /** Check if plan is fully analyzed */
  def checkAnalysis(plan: LogicalPlan): Unit = {
    def checkExpressions(exprs: Seq[Expression]): Unit = {
      exprs.foreach { expr =>
        expr.foreach {
          case _: UnresolvedAttribute =>
            throw new AnalysisException(s"Unresolved attribute: ${expr.prettyName}")
          case _: UnresolvedFunction =>
            throw new AnalysisException(s"Unresolved function: ${expr.prettyName}")
          case _ =>
        }
      }
    }
    
    plan.foreach {
      case p if !p.resolved =>
        throw new AnalysisException(s"Unresolved plan: ${p.nodeName}")
      case p =>
        checkExpressions(p.expressions)
    }
  }
  
  /** Rule batches for iterative analysis */
  lazy val batches: Seq[Batch] = Seq(
    Batch("Hints", Once, 
      new ResolveHints.ResolveBroadcastHints(conf)),
    Batch("Simple Sanity Check", Once,
      LookupFunctions),
    Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions),
    Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveRelations :: ResolveReferences ::
      ResolveCreateNamedStruct ::
      ResolveDeserializer ::
      ResolveNewInstance ::
      ResolveUpCast ::
      ResolveGroupingAnalytics ::
      ResolvePivot ::
      ResolveOrdinalInOrderByAndGroupBy ::
      ResolveMissingReferences ::
      ExtractGenerator ::
      ResolveGenerate ::
      ResolveFunctions ::
      ResolveAliases ::
      ResolveSubquery ::
      ResolveWindowOrder ::
      ResolveWindowFrame ::
      ResolveNaturalAndUsingJoin ::
      ExtractWindowExpressions ::
      GlobalAggregates ::
      ResolveAggregateFunctions ::
      TimeWindowing ::
      ResolveInlineTables ::
      TypeCoercion.typeCoercionRules ++
      extendedResolutionRules: _*),
    Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
    Batch("View", Once,
      AliasViewChild),
    Batch("Nondeterministic", Once,
      PullOutNondeterministic),
    Batch("UDF", Once,
      HandleNullInputsForUDF),
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )
}

Usage Examples:

import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser

// Create analyzer with catalog and configuration  
val catalog = new SessionCatalog(externalCatalog, globalTempViewManager, 
                                functionResourceLoader, functionRegistry, conf, hadoopConf)
val analyzer = new Analyzer(catalog, conf, maxIterations = 100)

// Parse and analyze a query
val sqlText = "SELECT name, age FROM users WHERE age > 25"
val unresolvedPlan = CatalystSqlParser.parsePlan(sqlText)
val analyzedPlan = analyzer.execute(unresolvedPlan)

// Verify analysis succeeded
analyzer.checkAnalysis(analyzedPlan)
require(analyzedPlan.resolved, "Plan must be fully resolved after analysis")

Resolution Rules

Core rules that resolve different aspects of logical plans:

/**
 * Resolves relations (tables) against the session catalog
 */
object ResolveRelations extends Rule[LogicalPlan] {
  def ruleName: String = "ResolveRelations"
  
  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
    case u @ UnresolvedRelation(tableIdentifier, alias) =>
      try {
        val table = lookupTableFromCatalog(tableIdentifier)
        val relationPlan = UnresolvedCatalogRelation(table, alias)
        relationPlan
      } catch {
        case _: NoSuchTableException =>
          u
      }
  }
}

/**
 * Resolves attribute references against available input attributes  
 */
object ResolveReferences extends Rule[LogicalPlan] {
  def ruleName: String = "ResolveReferences"
  
  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
    case p: LogicalPlan if !p.childrenResolved => p
    case p: LogicalPlan => p.transformExpressionsUp {
      case u @ UnresolvedAttribute(nameParts) =>
        val resolved = resolveAttribute(u, p.inputSet)
        resolved.getOrElse(u)
    }
  }
  
  private def resolveAttribute(
      u: UnresolvedAttribute, 
      input: AttributeSet): Option[NamedExpression] = {
    val candidates = input.filter { attr =>
      // Match attribute name (case-insensitive)
      resolver(attr.name, u.name)
    }
    
    candidates.toSeq match {
      case Seq(a) => Some(a)
      case Nil => None
      case ambiguous => 
        throw new AnalysisException(s"Ambiguous reference to ${u.name}: $ambiguous")
    }
  }
}

/**
 * Resolves function calls to registered functions
 */
object ResolveFunctions extends Rule[LogicalPlan] {
  def ruleName: String = "ResolveFunctions"
  
  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
    case q: LogicalPlan => q.transformExpressionsUp {
      case u @ UnresolvedFunction(name, children, isDistinct) =>
        withPosition(u) {
          try {
            val info = catalog.lookupFunction(name, children)  
            val function = info.builder(children)
            function
          } catch {
            case _: AnalysisException => u
          }
        }
    }
  }
}

/**
 * Resolves aliases in SELECT lists and ensures proper naming
 */
object ResolveAliases extends Rule[LogicalPlan] {
  def ruleName: String = "ResolveAliases"
  
  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
    case Project(projectList, child) if child.resolved =>
      val resolvedProjectList = projectList.map {
        case u @ UnresolvedAlias(child, aliasFunc) =>
          aliasFunc.map(_(child)).getOrElse(Alias(child, toPrettySQL(child))())
        case other => other
      }
      Project(resolvedProjectList, child)
  }
}

/**
 * Resolves aggregate functions and ensures proper grouping
 */
object ResolveAggregateFunctions extends Rule[LogicalPlan] {
  def ruleName: String = "ResolveAggregateFunctions"
  
  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
    case a @ Aggregate(grouping, aggregates, child) if child.resolved =>
      val resolvedAggregates = aggregates.map { expr =>
        expr.transformUp {
          case UnresolvedFunction(name, children, isDistinct) =>
            // Resolve to appropriate aggregate function
            catalog.lookupFunction(name, children) match {
              case info if info.isAggregate =>
                info.builder(children)
              case _ =>
                throw new AnalysisException(s"${name.unquotedString} is not an aggregate function")
            }
        }
      }
      a.copy(aggregateExpressions = resolvedAggregates)
  }
}

Type Checking and Coercion

Type validation and automatic type conversions:

/**
 * Result of expression type checking
 */
sealed trait TypeCheckResult {
  def isSuccess: Boolean
}

case object TypeCheckSuccess extends TypeCheckResult {
  def isSuccess: Boolean = true
}

case class TypeCheckFailure(message: String) extends TypeCheckResult {
  def isSuccess: Boolean = false
}

/**
 * Trait for expressions that expect specific input types
 */
trait ExpectsInputTypes extends Expression {
  /** Expected input data types in order */
  def inputTypes: Seq[AbstractDataType]
  
  /** Check if input types match expectations */
  override def checkInputDataTypes(): TypeCheckResult = {
    val expectedTypes = inputTypes
    val actualTypes = children.map(_.dataType)
    
    if (expectedTypes.length != actualTypes.length) {
      return TypeCheckFailure(
        s"${prettyName} requires ${expectedTypes.length} arguments, " +
        s"but ${actualTypes.length} were provided"
      )
    }
    
    expectedTypes.zip(actualTypes).zipWithIndex.foreach {
      case ((expected, actual), index) =>
        if (!expected.acceptsType(actual)) {
          return TypeCheckFailure(
            s"${prettyName} argument ${index + 1} requires ${expected.simpleString} type, " +
            s"not ${actual.catalogString}"
          )
        }
    }
    
    TypeCheckSuccess
  }
}

/**
 * Type coercion rules that automatically convert compatible types
 */
object TypeCoercion {
  /** All type coercion rules */
  val typeCoercionRules: List[Rule[LogicalPlan]] = List(
    InConversion,
    WidenSetOperationTypes,
    PromoteStrings,
    DecimalPrecision,
    BooleanEquality,
    FunctionArgumentConversion,
    ConcatCoercion,
    EltCoercion,
    CaseWhenCoercion,
    IfCoercion,
    StackCoercion,
    Division,
    ImplicitTypeCasts,
    DateTimeOperations,
    WindowFrameCoercion
  )
}

/**
 * Promotes string literals in numeric contexts
 */
object PromoteStrings extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
    case q: LogicalPlan => q.transformExpressionsUp {
      case a @ Add(left @ StringType(), right) if right.dataType.isInstanceOf[NumericType] =>
        a.copy(left = Cast(left, right.dataType))
      case a @ Add(left, right @ StringType()) if left.dataType.isInstanceOf[NumericType] =>  
        a.copy(right = Cast(right, left.dataType))
      // Similar rules for other arithmetic operations...
    }
  }
}

Usage Examples:

// Type checking example
val addExpr = Add(
  AttributeReference("age", IntegerType)(),
  Literal("25", StringType)  // String literal
)

// Before type coercion - would fail type check
val typeCheckResult = addExpr.checkInputDataTypes()
// typeCheckResult: TypeCheckFailure

// After applying PromoteStrings rule
val coercedExpr = addExpr.transform {
  case Add(left, right @ Literal(_, StringType)) =>
    Add(left, Cast(right, IntegerType))
}

// Now type check passes
val newResult = coercedExpr.checkInputDataTypes()
// newResult: TypeCheckSuccess

Function Registry

Registry system for SQL functions and user-defined functions:

/**
 * Registry for SQL functions
 */
trait FunctionRegistry {
  /** Register a function */
  def registerFunction(
      name: FunctionIdentifier,
      info: ExpressionInfo, 
      builder: FunctionBuilder): Unit
  
  /** Look up function metadata */
  def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo]
  
  /** Look up function and create expression */
  def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression
  
  /** List all registered functions */
  def listFunction(): Seq[FunctionIdentifier]
  
  /** Drop a function */
  def dropFunction(name: FunctionIdentifier): Boolean
  
  /** Clear all functions */
  def clear(): Unit
  
  /** Create a copy of this registry */
  def clone(): FunctionRegistry
}

/**
 * Simple implementation of FunctionRegistry
 */
class SimpleFunctionRegistry extends FunctionRegistry {
  private val functionBuilders = 
    new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]
    
  def registerFunction(
      name: FunctionIdentifier,
      info: ExpressionInfo,
      builder: FunctionBuilder): Unit = {
    functionBuilders(normalizeIdentifier(name)) = (info, builder)
  }
  
  def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] = {
    functionBuilders.get(normalizeIdentifier(name)).map(_._1)
  }
  
  def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = {
    val normalizedName = normalizeIdentifier(name)
    functionBuilders.get(normalizedName) match {
      case Some((info, builder)) => 
        try {
          builder(children)
        } catch {
          case ex: Exception =>
            throw new AnalysisException(s"Invalid function call: ${name.unquotedString}", cause = Some(ex))
        }
      case None =>
        throw new AnalysisException(s"Undefined function: ${name.unquotedString}")
    }
  }
  
  private def normalizeIdentifier(name: FunctionIdentifier): FunctionIdentifier = {
    FunctionIdentifier(name.funcName.toLowerCase, name.database.map(_.toLowerCase))
  }
}

/**
 * Function builder type alias
 */
type FunctionBuilder = Seq[Expression] => Expression

/**
 * Information about a function for the catalog
 */
case class ExpressionInfo(
    className: String,
    name: String,
    usage: String,
    extended: String,
    note: String = "",
    group: String = "",
    since: String = "",
    deprecated: String = "") {
  
  def getUsage: String = if (usage.nonEmpty) usage else "N/A"
  def getExtended: String = if (extended.nonEmpty) extended else "N/A"
}

/**
 * Identifier for functions (name + optional database)
 */
case class FunctionIdentifier(funcName: String, database: Option[String] = None) {
  def unquotedString: String = 
    database.map(db => s"$db.$funcName").getOrElse(funcName)
  
  def quotedString: String = 
    database.map(db => s"`$db`.`$funcName`").getOrElse(s"`$funcName`")
    
  override def toString: String = unquotedString
}

Usage Examples:

// Create function registry  
val registry = new SimpleFunctionRegistry()

// Register built-in functions
registry.registerFunction(
  FunctionIdentifier("upper"),
  ExpressionInfo("org.apache.spark.sql.catalyst.expressions.Upper", "upper", 
                "upper(str) - Returns str with all characters changed to uppercase"),
  (children: Seq[Expression]) => {
    require(children.length == 1, "upper requires exactly 1 argument")
    Upper(children.head)
  }
)

registry.registerFunction(
  FunctionIdentifier("length"), 
  ExpressionInfo("org.apache.spark.sql.catalyst.expressions.Length", "length",
                "length(str) - Returns the character length of str"),
  (children: Seq[Expression]) => {
    require(children.length == 1, "length requires exactly 1 argument")
    Length(children.head)
  }
)

// Look up and use functions
val upperFunc = registry.lookupFunction(
  FunctionIdentifier("upper"),
  Seq(Literal("hello"))
)
// upperFunc: Upper(Literal("hello"))

val lengthFunc = registry.lookupFunction(
  FunctionIdentifier("length"), 
  Seq(AttributeReference("name", StringType)())
)
// lengthFunc: Length(AttributeReference("name", StringType))

// List registered functions
val allFunctions = registry.listFunction()
// allFunctions: Seq(FunctionIdentifier("upper"), FunctionIdentifier("length"))

Unresolved Expressions

Placeholder expressions used before analysis resolution:

/**
 * Unresolved attribute reference (before resolution)
 */
case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute with Unevaluable {
  def name: String = nameParts.mkString(".")
  def exprId: ExprId = throw new UnresolvedException(this, "exprId")
  def dataType: DataType = throw new UnresolvedException(this, "dataType")  
  def nullable: Boolean = throw new UnresolvedException(this, "nullable")
  def qualifiers: Seq[String] = throw new UnresolvedException(this, "qualifiers")
  def metadata: Metadata = throw new UnresolvedException(this, "metadata")
  
  override def toString: String = s"'${nameParts.mkString(".")}"
  
  def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute")
  def newInstance(): NamedExpression = this
  def withName(newName: String): Attribute = UnresolvedAttribute(Seq(newName))
  def withQualifier(newQualifier: Seq[String]): Attribute = this
  def withExprId(newExprId: ExprId): Attribute = this
  def withDataType(newType: DataType): Attribute = this
  def withNullability(newNullability: Boolean): Attribute = this
  def withMetadata(newMetadata: Metadata): Attribute = this
}

/**
 * Unresolved function call (before resolution)
 */
case class UnresolvedFunction(
    name: FunctionIdentifier,
    children: Seq[Expression], 
    isDistinct: Boolean) extends Expression with Unevaluable {
  
  def dataType: DataType = throw new UnresolvedException(this, "dataType")
  def nullable: Boolean = throw new UnresolvedException(this, "nullable")
  
  override def toString: String = {
    val distinct = if (isDistinct) "DISTINCT " else ""
    s"'${name.unquotedString}($distinct${children.mkString(", ")})"
  }
  
  override def prettyName: String = name.unquotedString
}

/**
 * Unresolved star expression (*) 
 */
case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable {
  override def toString: String = target match {
    case None => "*"
    case Some(prefix) => prefix.mkString(".") + ".*"
  }
  
  def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = {
    val exprs = input.output
    target match {
      case Some(parts) if parts.length == 1 =>
        // Qualified star like "table.*"  
        val qualifier = parts.head
        exprs.filter(_.qualifiers.nonEmpty && resolver(_.qualifiers.last, qualifier))
      case _ => 
        // Unqualified star "*"
        exprs
    }
  }
}

/**
 * Unresolved alias (before name resolution)
 */  
case class UnresolvedAlias(
    child: Expression,
    aliasFunc: Option[Expression => String] = None) extends UnaryExpression with NamedExpression with Unevaluable {
  
  def name: String = throw new UnresolvedException(this, "name")
  def exprId: ExprId = throw new UnresolvedException(this, "exprId") 
  def qualifiers: Seq[String] = throw new UnresolvedException(this, "qualifiers")
  def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute")
  def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance")
  def metadata: Metadata = Metadata.empty
  
  override def dataType: DataType = throw new UnresolvedException(this, "dataType")
  override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
  override def toString: String = s"unresolvedalias(${child})"
}

/**
 * Exception thrown when accessing unresolved properties
 */
class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: String)
  extends AnalysisException(
    s"Invalid call to $function on unresolved object, tree: $tree")

Usage Examples:

// Unresolved expressions before analysis
val unresolvedAttr = UnresolvedAttribute(Seq("users", "name"))
val unresolvedFunc = UnresolvedFunction(
  FunctionIdentifier("upper"),
  Seq(UnresolvedAttribute(Seq("name"))),
  isDistinct = false
)

val unresolvedStar = UnresolvedStar(Some(Seq("users")))  // users.*
val globalStar = UnresolvedStar(None)                    // *

// These will be resolved during analysis
val project = Project(
  Seq(
    unresolvedStar.expand(inputPlan, resolver),
    Alias(unresolvedFunc, "upper_name")()
  ),
  inputPlan
)

// Accessing unresolved properties throws exceptions
try {
  val dataType = unresolvedAttr.dataType  // Throws UnresolvedException
} catch {
  case _: UnresolvedException => 
    println("Cannot access dataType on unresolved attribute")
}

Analysis Exceptions

Exception types for analysis errors and validation failures:

/**
 * Exception thrown when a query fails to analyze
 */
case class AnalysisException(
    message: String,
    line: Option[Int] = None,
    startPosition: Option[Int] = None, 
    plan: Option[LogicalPlan] = None,
    cause: Option[Throwable] = None) extends Exception(message, cause.orNull) with Serializable {
  
  def withPosition(line: Option[Int], startPosition: Option[Int]): AnalysisException = {
    copy(line = line, startPosition = startPosition)
  }
  
  override def getMessage: String = {
    val planAnnotation = plan.map(p => s";\n$p").getOrElse("")
    getSimpleMessage + planAnnotation  
  }
  
  // Get message without plan annotation (for testing)
  def getSimpleMessage: String = {
    val positionInfo = (line, startPosition) match {
      case (Some(l), Some(p)) => s" (line $l, pos $p)"
      case (Some(l), None) => s" (line $l)"
      case _ => ""
    }
    message + positionInfo
  }
}

/**
 * Base class for "not found" exceptions  
 */
abstract class NoSuchItemException(protected val message: String) extends AnalysisException(message)

case class NoSuchDatabaseException(db: String) extends NoSuchItemException(s"Database '$db' not found")

case class NoSuchTableException(db: String, table: String) 
  extends NoSuchItemException(s"Table or view '$table' not found in database '$db'")

case class NoSuchFunctionException(db: String, func: String)
  extends NoSuchItemException(s"Function '$func' not found in database '$db'") 

case class NoSuchPartitionException(db: String, table: String, spec: TablePartitionSpec)
  extends NoSuchItemException(s"Partition not found in table '$db.$table'")

case class TableAlreadyExistsException(db: String, table: String)
  extends AnalysisException(s"Table '$table' already exists in database '$db'")

case class DatabaseAlreadyExistsException(db: String)
  extends AnalysisException(s"Database '$db' already exists")
  
case class FunctionAlreadyExistsException(db: String, func: String) 
  extends AnalysisException(s"Function '$func' already exists in database '$db'")

Usage Examples:

// Analysis exception handling
try {
  val analyzedPlan = analyzer.execute(unresolvedPlan)
} catch {
  case ae: AnalysisException =>
    println(s"Analysis failed: ${ae.getSimpleMessage}")
    ae.plan.foreach(p => println(s"Problematic plan: $p"))
    
  case nste: NoSuchTableException =>
    println(s"Table not found: ${nste.getMessage}")
    
  case nsde: NoSuchDatabaseException => 
    println(s"Database not found: ${nsde.getMessage}")
}

// Create analysis exceptions with position info
val exception = AnalysisException(
  "Column 'xyz' not found",
  line = Some(5),
  startPosition = Some(12),
  plan = Some(problematicPlan)
)

throw exception.withPosition(Some(10), Some(5))

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-catalyst

docs

analysis.md

code-generation.md

data-types.md

expressions.md

index.md

optimization.md

parsing.md

query-plans.md

utilities.md

tile.json