Semantic analysis system for resolving unresolved references, type checking, and plan validation in Catalyst query plans.
The main analyzer for logical plan analysis and resolution.
/**
* Logical plan analyzer that resolves unresolved relations and expressions
*/
class Analyzer(
catalog: Catalog,
registry: FunctionRegistry,
conf: CatalystConf,
maxIterations: Int = 100) extends RuleExecutor[LogicalPlan] {
/**
* Analyze a logical plan by applying resolution rules
*/
def execute(plan: LogicalPlan): LogicalPlan
/** Sequence of rule batches for analysis */
def batches: Seq[Batch]
}Usage Examples:
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
// Create analyzer components
val catalog = new SimpleCatalog()
val functionRegistry = FunctionRegistry.builtin
val conf = new SimpleCatalystConf()
// Create analyzer
val analyzer = new Analyzer(catalog, functionRegistry, conf)
// Analyze an unresolved plan
val unresolvedPlan = Project(
Seq(UnresolvedAttribute("name")),
UnresolvedRelation(TableIdentifier("users"))
)
val analyzedPlan = analyzer.execute(unresolvedPlan)
// Result: resolved plan with concrete attribute references and relationsInterface for looking up relations, functions, and metadata.
/**
* Interface for looking up relations, functions, and types
*/
trait Catalog {
/**
* Look up a relation by name
*/
def lookupRelation(name: Seq[String]): LogicalPlan
/**
* Check if a function exists
*/
def functionExists(name: String): Boolean
/**
* Look up a function by name and create expression
*/
def lookupFunction(name: String, children: Seq[Expression]): Expression
/**
* Get table metadata
*/
def getTables(databaseName: Option[String]): Seq[(String, Boolean)]
/**
* Check if relation exists
*/
def tableExists(name: Seq[String]): Boolean
}
/**
* Simple in-memory catalog implementation
*/
class SimpleCatalog extends Catalog {
private val tables = scala.collection.mutable.HashMap[String, LogicalPlan]()
def registerTable(name: String, plan: LogicalPlan): Unit = {
tables(name) = plan
}
override def lookupRelation(name: Seq[String]): LogicalPlan = {
val tableName = name.mkString(".")
tables.getOrElse(tableName, throw new AnalysisException(s"Table not found: $tableName"))
}
}Usage Examples:
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
// Create and populate catalog
val catalog = new SimpleCatalog()
val userTable = LocalRelation(
AttributeReference("id", IntegerType, false)(),
AttributeReference("name", StringType, true)(),
AttributeReference("age", IntegerType, true)()
)
catalog.registerTable("users", userTable)
// Look up relations
val relation = catalog.lookupRelation(Seq("users"))
val exists = catalog.tableExists(Seq("users")) // true
val missing = catalog.tableExists(Seq("products")) // false
// Check functions
val functionExists = catalog.functionExists("upper") // depends on implementationRegistry for SQL functions and user-defined functions.
/**
* Registry for SQL functions
*/
trait FunctionRegistry {
/**
* Register a function
*/
def registerFunction(
name: String,
info: ExpressionInfo,
builder: Seq[Expression] => Expression): Unit
/**
* Look up a function and create expression
*/
def lookupFunction(name: String, children: Seq[Expression]): Expression
/**
* Check if function exists
*/
def functionExists(name: String): Boolean
/**
* List all registered functions
*/
def listFunction(): Seq[String]
}
/**
* Built-in function registry with standard SQL functions
*/
object FunctionRegistry {
/** Get built-in function registry */
def builtin: FunctionRegistry
/** Create empty function registry */
def empty: FunctionRegistry
}
/**
* Function information for registry
*/
case class ExpressionInfo(
className: String,
name: String,
usage: String,
extended: String = "") {
def getUsage(): String = if (usage != null) usage else ""
def getExtended(): String = if (extended != null) extended else ""
}Usage Examples:
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
// Get built-in registry
val registry = FunctionRegistry.builtin
// Check if functions exist
val hasUpper = registry.functionExists("upper") // true
val hasCustom = registry.functionExists("my_func") // false
// Look up and create function expressions
val upperExpr = registry.lookupFunction("upper",
Seq(Literal("hello", StringType)))
val concatExpr = registry.lookupFunction("concat",
Seq(Literal("Hello ", StringType), Literal("World", StringType)))
// Register custom function
val customRegistry = FunctionRegistry.empty
val customInfo = ExpressionInfo(
"com.example.MyFunction",
"my_func",
"my_func(str) - Returns custom transformation of string"
)
customRegistry.registerFunction("my_func", customInfo, { expressions =>
// Custom function builder
MyCustomExpression(expressions.head)
})
// List all functions
val allFunctions = registry.listFunction() // Seq of function namesType checking results and validation utilities.
/**
* Result of type checking with success/failure information
*/
case class TypeCheckResult(isSuccess: Boolean, errorMessage: Option[String]) {
def isFailure: Boolean = !isSuccess
}
object TypeCheckResult {
/** Successful type check */
val TypeCheckSuccess: TypeCheckResult = TypeCheckResult(true, None)
/** Create failure result */
def TypeCheckFailure(message: String): TypeCheckResult =
TypeCheckResult(false, Some(message))
}
/**
* Validation rules for analyzed plans
*/
object CheckAnalysis {
/**
* Check that a plan is properly analyzed
*/
def checkAnalysis(plan: LogicalPlan): Unit = {
plan.foreachUp {
case p if !p.analyzed =>
// Check for unresolved references, missing attributes, etc.
case _ => // OK
}
}
}Usage Examples:
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
// Type checking in expressions
case class MyExpression(child: Expression) extends UnaryExpression {
override def checkInputDataTypes(): TypeCheckResult = {
if (child.dataType == StringType) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure(
s"Expected StringType, but got ${child.dataType}"
)
}
}
override def dataType: DataType = StringType
override def nullable: Boolean = child.nullable
override def eval(input: InternalRow): Any = {
// Implementation
}
}
// Use type checking
val validExpr = MyExpression(Literal("hello", StringType))
val invalidExpr = MyExpression(Literal(42, IntegerType))
val validCheck = validExpr.checkInputDataTypes() // Success
val invalidCheck = invalidExpr.checkInputDataTypes() // Failure
// Plan analysis checking
val plan = Project(
Seq(AttributeReference("name", StringType, true)()),
LocalRelation(AttributeReference("name", StringType, true)())
)
CheckAnalysis.checkAnalysis(plan) // Validates plan is properly analyzedCommon resolution patterns and utilities for analysis.
/**
* Type coercion rules for Hive compatibility
*/
object HiveTypeCoercion {
/** Implicit type conversions for binary operations */
object ImplicitTypeCasts extends Rule[LogicalPlan]
/** Boolean equality type coercion */
object BooleanEquality extends Rule[LogicalPlan]
/** String to integral type coercion */
object StringToIntegralCasts extends Rule[LogicalPlan]
/** Division type coercion */
object Division extends Rule[LogicalPlan]
}
/**
* Resolver function type for name resolution
*/
type Resolver = (String, String) => Boolean
/** Case insensitive name resolution */
val caseInsensitiveResolution: Resolver = (a: String, b: String) => a.equalsIgnoreCase(b)
/** Case sensitive name resolution */
val caseSensitiveResolution: Resolver = (a: String, b: String) => a == b
/**
* Implicit class for analysis error reporting
*/
implicit class AnalysisErrorAt(t: TreeNode[_]) {
/** Fails analysis with error message and position */
def failAnalysis(msg: String): Nothing = {
throw new AnalysisException(msg, t.origin.line, t.origin.startPosition)
}
}
/**
* Execute function with position context for error reporting
*/
def withPosition[A](t: TreeNode[_])(f: => A): A = {
try f catch {
case a: AnalysisException =>
throw a.withPosition(t.origin.line, t.origin.startPosition)
}
}Usage Examples:
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
// Name resolution
val resolver = caseInsensitiveResolution
val matches1 = resolver("Name", "name") // true
val matches2 = resolver("ID", "id") // true
val strictResolver = caseSensitiveResolution
val matches3 = strictResolver("Name", "name") // false
// Error reporting with position
val expr = Add(
AttributeReference("a", IntegerType, false)(),
AttributeReference("b", StringType, false)() // Type mismatch
)
try {
withPosition(expr) {
expr.checkInputDataTypes() match {
case TypeCheckResult(false, Some(message)) =>
expr.failAnalysis(s"Type check failed: $message")
case _ => // OK
}
}
} catch {
case e: AnalysisException =>
println(s"Analysis error: ${e.getMessage}")
// Includes position information from expr.origin
}
// Type coercion example
val plan = Project(
Seq(Add(
AttributeReference("int_col", IntegerType, false)(),
AttributeReference("double_col", DoubleType, false)()
).as("sum")),
LocalRelation(
AttributeReference("int_col", IntegerType, false)(),
AttributeReference("double_col", DoubleType, false)()
)
)
// Type coercion rules would automatically convert int_col to double for the addition
val coercedPlan = HiveTypeCoercion.ImplicitTypeCasts.apply(plan)Support for handling relations with multiple instances.
/**
* Trait for relations that can appear multiple times in a query
*/
trait MultiInstanceRelation {
/** Create a new instance with fresh expression IDs */
def newInstance(): LogicalPlan
}
/**
* Plans that produce the same result but have different expression IDs
*/
object MultiInstanceRelation {
/** Check if two plans are the same ignoring expression IDs */
def isSame(left: LogicalPlan, right: LogicalPlan): Boolean
}Usage Examples:
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
// Self-join requires multiple instances of the same relation
val baseRelation = LocalRelation(
AttributeReference("id", IntegerType, false)(),
AttributeReference("name", StringType, true)()
)
// Create new instance for self-join
val relation1 = baseRelation
val relation2 = baseRelation.newInstance() // Fresh expression IDs
val selfJoin = Join(
relation1,
relation2,
Inner,
Some(EqualTo(relation1.output(0), relation2.output(0)))
)
// Check if plans are the same (ignoring expression IDs)
val same = MultiInstanceRelation.isSame(relation1, relation2) // true