Catalyst is a library for manipulating relational query plans used as the foundation for Spark SQL's query optimizer and execution engine
Semantic analysis system for resolving unresolved references, type checking, and plan validation in Catalyst query plans.
The main analyzer for logical plan analysis and resolution.
/**
* Logical plan analyzer that resolves unresolved relations and expressions
*/
class Analyzer(
catalog: Catalog,
registry: FunctionRegistry,
conf: CatalystConf,
maxIterations: Int = 100) extends RuleExecutor[LogicalPlan] {
/**
* Analyze a logical plan by applying resolution rules
*/
def execute(plan: LogicalPlan): LogicalPlan
/** Sequence of rule batches for analysis */
def batches: Seq[Batch]
}Usage Examples:
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
// Create analyzer components
val catalog = new SimpleCatalog()
val functionRegistry = FunctionRegistry.builtin
val conf = new SimpleCatalystConf()
// Create analyzer
val analyzer = new Analyzer(catalog, functionRegistry, conf)
// Analyze an unresolved plan
val unresolvedPlan = Project(
Seq(UnresolvedAttribute("name")),
UnresolvedRelation(TableIdentifier("users"))
)
val analyzedPlan = analyzer.execute(unresolvedPlan)
// Result: resolved plan with concrete attribute references and relationsInterface for looking up relations, functions, and metadata.
/**
* Interface for looking up relations, functions, and types
*/
trait Catalog {
/**
* Look up a relation by name
*/
def lookupRelation(name: Seq[String]): LogicalPlan
/**
* Check if a function exists
*/
def functionExists(name: String): Boolean
/**
* Look up a function by name and create expression
*/
def lookupFunction(name: String, children: Seq[Expression]): Expression
/**
* Get table metadata
*/
def getTables(databaseName: Option[String]): Seq[(String, Boolean)]
/**
* Check if relation exists
*/
def tableExists(name: Seq[String]): Boolean
}
/**
* Simple in-memory catalog implementation
*/
class SimpleCatalog extends Catalog {
private val tables = scala.collection.mutable.HashMap[String, LogicalPlan]()
def registerTable(name: String, plan: LogicalPlan): Unit = {
tables(name) = plan
}
override def lookupRelation(name: Seq[String]): LogicalPlan = {
val tableName = name.mkString(".")
tables.getOrElse(tableName, throw new AnalysisException(s"Table not found: $tableName"))
}
}Usage Examples:
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
// Create and populate catalog
val catalog = new SimpleCatalog()
val userTable = LocalRelation(
AttributeReference("id", IntegerType, false)(),
AttributeReference("name", StringType, true)(),
AttributeReference("age", IntegerType, true)()
)
catalog.registerTable("users", userTable)
// Look up relations
val relation = catalog.lookupRelation(Seq("users"))
val exists = catalog.tableExists(Seq("users")) // true
val missing = catalog.tableExists(Seq("products")) // false
// Check functions
val functionExists = catalog.functionExists("upper") // depends on implementationRegistry for SQL functions and user-defined functions.
/**
* Registry for SQL functions
*/
trait FunctionRegistry {
/**
* Register a function
*/
def registerFunction(
name: String,
info: ExpressionInfo,
builder: Seq[Expression] => Expression): Unit
/**
* Look up a function and create expression
*/
def lookupFunction(name: String, children: Seq[Expression]): Expression
/**
* Check if function exists
*/
def functionExists(name: String): Boolean
/**
* List all registered functions
*/
def listFunction(): Seq[String]
}
/**
* Built-in function registry with standard SQL functions
*/
object FunctionRegistry {
/** Get built-in function registry */
def builtin: FunctionRegistry
/** Create empty function registry */
def empty: FunctionRegistry
}
/**
* Function information for registry
*/
case class ExpressionInfo(
className: String,
name: String,
usage: String,
extended: String = "") {
def getUsage(): String = if (usage != null) usage else ""
def getExtended(): String = if (extended != null) extended else ""
}Usage Examples:
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
// Get built-in registry
val registry = FunctionRegistry.builtin
// Check if functions exist
val hasUpper = registry.functionExists("upper") // true
val hasCustom = registry.functionExists("my_func") // false
// Look up and create function expressions
val upperExpr = registry.lookupFunction("upper",
Seq(Literal("hello", StringType)))
val concatExpr = registry.lookupFunction("concat",
Seq(Literal("Hello ", StringType), Literal("World", StringType)))
// Register custom function
val customRegistry = FunctionRegistry.empty
val customInfo = ExpressionInfo(
"com.example.MyFunction",
"my_func",
"my_func(str) - Returns custom transformation of string"
)
customRegistry.registerFunction("my_func", customInfo, { expressions =>
// Custom function builder
MyCustomExpression(expressions.head)
})
// List all functions
val allFunctions = registry.listFunction() // Seq of function namesType checking results and validation utilities.
/**
* Result of type checking with success/failure information
*/
case class TypeCheckResult(isSuccess: Boolean, errorMessage: Option[String]) {
def isFailure: Boolean = !isSuccess
}
object TypeCheckResult {
/** Successful type check */
val TypeCheckSuccess: TypeCheckResult = TypeCheckResult(true, None)
/** Create failure result */
def TypeCheckFailure(message: String): TypeCheckResult =
TypeCheckResult(false, Some(message))
}
/**
* Validation rules for analyzed plans
*/
object CheckAnalysis {
/**
* Check that a plan is properly analyzed
*/
def checkAnalysis(plan: LogicalPlan): Unit = {
plan.foreachUp {
case p if !p.analyzed =>
// Check for unresolved references, missing attributes, etc.
case _ => // OK
}
}
}Usage Examples:
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
// Type checking in expressions
case class MyExpression(child: Expression) extends UnaryExpression {
override def checkInputDataTypes(): TypeCheckResult = {
if (child.dataType == StringType) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure(
s"Expected StringType, but got ${child.dataType}"
)
}
}
override def dataType: DataType = StringType
override def nullable: Boolean = child.nullable
override def eval(input: InternalRow): Any = {
// Implementation
}
}
// Use type checking
val validExpr = MyExpression(Literal("hello", StringType))
val invalidExpr = MyExpression(Literal(42, IntegerType))
val validCheck = validExpr.checkInputDataTypes() // Success
val invalidCheck = invalidExpr.checkInputDataTypes() // Failure
// Plan analysis checking
val plan = Project(
Seq(AttributeReference("name", StringType, true)()),
LocalRelation(AttributeReference("name", StringType, true)())
)
CheckAnalysis.checkAnalysis(plan) // Validates plan is properly analyzedCommon resolution patterns and utilities for analysis.
/**
* Type coercion rules for Hive compatibility
*/
object HiveTypeCoercion {
/** Implicit type conversions for binary operations */
object ImplicitTypeCasts extends Rule[LogicalPlan]
/** Boolean equality type coercion */
object BooleanEquality extends Rule[LogicalPlan]
/** String to integral type coercion */
object StringToIntegralCasts extends Rule[LogicalPlan]
/** Division type coercion */
object Division extends Rule[LogicalPlan]
}
/**
* Resolver function type for name resolution
*/
type Resolver = (String, String) => Boolean
/** Case insensitive name resolution */
val caseInsensitiveResolution: Resolver = (a: String, b: String) => a.equalsIgnoreCase(b)
/** Case sensitive name resolution */
val caseSensitiveResolution: Resolver = (a: String, b: String) => a == b
/**
* Implicit class for analysis error reporting
*/
implicit class AnalysisErrorAt(t: TreeNode[_]) {
/** Fails analysis with error message and position */
def failAnalysis(msg: String): Nothing = {
throw new AnalysisException(msg, t.origin.line, t.origin.startPosition)
}
}
/**
* Execute function with position context for error reporting
*/
def withPosition[A](t: TreeNode[_])(f: => A): A = {
try f catch {
case a: AnalysisException =>
throw a.withPosition(t.origin.line, t.origin.startPosition)
}
}Usage Examples:
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
// Name resolution
val resolver = caseInsensitiveResolution
val matches1 = resolver("Name", "name") // true
val matches2 = resolver("ID", "id") // true
val strictResolver = caseSensitiveResolution
val matches3 = strictResolver("Name", "name") // false
// Error reporting with position
val expr = Add(
AttributeReference("a", IntegerType, false)(),
AttributeReference("b", StringType, false)() // Type mismatch
)
try {
withPosition(expr) {
expr.checkInputDataTypes() match {
case TypeCheckResult(false, Some(message)) =>
expr.failAnalysis(s"Type check failed: $message")
case _ => // OK
}
}
} catch {
case e: AnalysisException =>
println(s"Analysis error: ${e.getMessage}")
// Includes position information from expr.origin
}
// Type coercion example
val plan = Project(
Seq(Add(
AttributeReference("int_col", IntegerType, false)(),
AttributeReference("double_col", DoubleType, false)()
).as("sum")),
LocalRelation(
AttributeReference("int_col", IntegerType, false)(),
AttributeReference("double_col", DoubleType, false)()
)
)
// Type coercion rules would automatically convert int_col to double for the addition
val coercedPlan = HiveTypeCoercion.ImplicitTypeCasts.apply(plan)Support for handling relations with multiple instances.
/**
* Trait for relations that can appear multiple times in a query
*/
trait MultiInstanceRelation {
/** Create a new instance with fresh expression IDs */
def newInstance(): LogicalPlan
}
/**
* Plans that produce the same result but have different expression IDs
*/
object MultiInstanceRelation {
/** Check if two plans are the same ignoring expression IDs */
def isSame(left: LogicalPlan, right: LogicalPlan): Boolean
}Usage Examples:
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
// Self-join requires multiple instances of the same relation
val baseRelation = LocalRelation(
AttributeReference("id", IntegerType, false)(),
AttributeReference("name", StringType, true)()
)
// Create new instance for self-join
val relation1 = baseRelation
val relation2 = baseRelation.newInstance() // Fresh expression IDs
val selfJoin = Join(
relation1,
relation2,
Inner,
Some(EqualTo(relation1.output(0), relation2.output(0)))
)
// Check if plans are the same (ignoring expression IDs)
val same = MultiInstanceRelation.isSame(relation1, relation2) // trueInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-catalyst-2-10