Catalyst query optimization framework and expression evaluation engine for Apache Spark SQL
—
This section covers the SQL parsing interfaces and abstract syntax tree representations in Spark Catalyst. The parser converts SQL text into logical plan trees.
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._The main interface for parsing SQL statements, expressions, and data types.
abstract class ParserInterface {
def parsePlan(sqlText: String): LogicalPlan
def parseExpression(sqlText: String): Expression
def parseDataType(sqlText: String): DataType
def parseTableIdentifier(sqlText: String): TableIdentifier
def parseFunctionIdentifier(sqlText: String): FunctionIdentifier
def parseTableSchema(sqlText: String): StructType
}import org.apache.spark.sql.catalyst.parser._
// Create parser instance
val parser = new CatalystSqlParser()
// Parse SQL statement
val logicalPlan = parser.parsePlan("SELECT name, age FROM users WHERE age > 18")
// Parse expression
val expression = parser.parseExpression("age + 1")
// Parse data type
val dataType = parser.parseDataType("STRUCT<name: STRING, age: INT>")The default SQL parser implementation for Spark SQL.
object CatalystSqlParser extends AbstractSqlParser {
override def astBuilder: AstBuilder = new AstBuilder()
def parseExpression(sqlText: String): Expression
def parsePlan(sqlText: String): LogicalPlan
def parseDataType(sqlText: String): DataType
def parseTableIdentifier(sqlText: String): TableIdentifier
def parseMultipartIdentifier(sqlText: String): Seq[String]
}Converts ANTLR parse trees into Catalyst logical plans and expressions.
trait AstBuilder extends SqlBaseBaseVisitor[AnyRef] {
// Plan visitors
def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan
def visitQuery(ctx: QueryContext): LogicalPlan
def visitQuerySpecification(ctx: QuerySpecificationContext): LogicalPlan
def visitFromClause(ctx: FromClauseContext): LogicalPlan
def visitJoinRelation(ctx: JoinRelationContext): LogicalPlan
// Expression visitors
def visitSingleExpression(ctx: SingleExpressionContext): Expression
def visitArithmeticBinary(ctx: ArithmeticBinaryContext): Expression
def visitComparison(ctx: ComparisonContext): Expression
def visitLogicalBinary(ctx: LogicalBinaryContext): Expression
def visitPredicated(ctx: PredicatedContext): Expression
// Data type visitors
def visitSingleDataType(ctx: SingleDataTypeContext): DataType
def visitPrimitiveDataType(ctx: PrimitiveDataTypeContext): DataType
def visitComplexDataType(ctx: ComplexDataTypeContext): DataType
}class ParseException(
message: String,
line: Int,
startPosition: Int,
cause: Throwable = null
) extends Exception(message, cause) {
def withCommand(command: String): ParseException
override def getMessage: String
}
case class TemplateSqlParseException(
message: String,
errorClass: String,
messageParameters: Map[String, String],
origin: Option[Origin],
cause: Option[Throwable]
) extends ParseException(message, 0, 0, cause.orNull)object DataType {
def fromDDL(ddl: String): DataType = CatalystSqlParser.parseDataType(ddl)
def fromJson(json: String): DataType
}
// Examples of DDL type strings
val intType = DataType.fromDDL("INT")
val arrayType = DataType.fromDDL("ARRAY<STRING>")
val structType = DataType.fromDDL("STRUCT<name: STRING, age: INT>")
val mapType = DataType.fromDDL("MAP<STRING, DOUBLE>")object StructType {
def fromDDL(ddl: String): StructType = CatalystSqlParser.parseTableSchema(ddl)
}
// Example schema parsing
val schema = StructType.fromDDL("name STRING, age INT, scores ARRAY<DOUBLE>")// Parse various expression types
val literalExpr = parser.parseExpression("42")
val columnExpr = parser.parseExpression("users.name")
val arithmeticExpr = parser.parseExpression("age + 1")
val functionExpr = parser.parseExpression("UPPER(name)")
val caseExpr = parser.parseExpression("CASE WHEN age > 18 THEN 'adult' ELSE 'minor' END")import org.apache.spark.sql.catalyst.parser.CatalystSqlParser._
// Array operations
val arrayExpr = parseExpression("array(1, 2, 3)[0]")
// Map operations
val mapExpr = parseExpression("map('key1', 'value1')['key1']")
// Struct operations
val structExpr = parseExpression("named_struct('name', 'Alice', 'age', 25).name")
// Window functions
val windowExpr = parseExpression("ROW_NUMBER() OVER (PARTITION BY dept ORDER BY salary DESC)")// Table creation
case class CreateTable(
tableIdentifier: TableIdentifier,
tableSchema: StructType,
partitionColumnNames: Seq[String],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String],
ifNotExists: Boolean
) extends LogicalPlan
// View creation
case class CreateView(
name: TableIdentifier,
userSpecifiedColumns: Seq[(String, Option[String])],
comment: Option[String],
properties: Map[String, String],
originalText: Option[String],
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
isTemporary: Boolean
) extends LogicalPlan// Insert statements
case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean
) extends LogicalPlan
// Update statements (Catalyst representation)
case class UpdateTable(
table: LogicalPlan,
assignments: Seq[Assignment],
condition: Option[Expression]
) extends LogicalPlancase class FunctionIdentifier(funcName: String, database: Option[String] = None) {
def identifier: String = database.map(_ + ".").getOrElse("") + funcName
def unquotedString: String = identifier
def quotedString: String = database.map(quote).map(_ + ".").getOrElse("") + quote(funcName)
}
// Parse function names
val simpleFunc = parser.parseFunctionIdentifier("upper")
val qualifiedFunc = parser.parseFunctionIdentifier("my_db.custom_func")import org.apache.spark.sql.internal.SQLConf
// Parser behavior affected by configuration
val conf = SQLConf.get
val caseSensitive = conf.caseSensitiveAnalysis
val ansiMode = conf.ansiEnabled
val parser = new SparkSqlParser(conf)import org.apache.spark.sql.catalyst.parser._
// Custom AST builder with additional rules
class CustomAstBuilder extends AstBuilder {
override def visitCustomFunction(ctx: CustomFunctionContext): Expression = {
// Custom parsing logic for domain-specific functions
super.visitCustomFunction(ctx)
}
}
// Custom parser with extended functionality
class CustomSqlParser extends AbstractSqlParser {
override def astBuilder: AstBuilder = new CustomAstBuilder()
}import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.plans.logical._
// Parse complex SQL query
val sqlText = """
SELECT
u.name,
COUNT(o.id) as order_count,
AVG(o.amount) as avg_amount
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.age > 18
GROUP BY u.id, u.name
HAVING COUNT(o.id) > 5
ORDER BY avg_amount DESC
LIMIT 10
"""
val parser = CatalystSqlParser
val logicalPlan = parser.parsePlan(sqlText)
// Extract components
logicalPlan match {
case Limit(limitExpr,
Sort(order, global,
Filter(havingCondition,
Aggregate(groupingExprs, aggregateExprs,
Join(left, right, joinType, joinCondition))))) =>
println(s"Parsed complex query with joins, aggregation, and ordering")
}import org.apache.spark.sql.catalyst.parser.ParseException
try {
val plan = parser.parsePlan("INVALID SQL SYNTAX")
} catch {
case e: ParseException =>
println(s"Parse error at line ${e.line}, position ${e.startPosition}: ${e.getMessage}")
}The parsing framework provides a complete SQL-to-AST transformation pipeline that handles the full spectrum of SQL constructs and converts them into Catalyst's internal representation for further processing.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-catalyst