tessl install tessl/maven-org-apache-spark--spark-hive@1.6.0Apache Spark SQL Hive integration module providing HiveContext, metastore operations, HiveQL parsing, and Hive data format compatibility
The HiveQL Parser provides comprehensive translation from HiveQL syntax to Spark SQL Catalyst logical plans. It supports the full HiveQL language specification including DDL, DML, complex expressions, window functions, and Hive-specific extensions.
import org.apache.spark.sql.hive.HiveQl
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.hadoop.hive.ql.parse.ASTNodeobject HiveQl {
def parseSql(sql: String): LogicalPlan
def createPlan(sql: String): LogicalPlan
def parseDdl(ddl: String): Seq[Attribute]
def getAst(sql: String): ASTNode
}parseSql - Converts HiveQL string to Catalyst LogicalPlan representation
createPlan - Alternative method for creating logical plans from SQL
parseDdl - Parses DDL statements and returns column attributes
getAst - Returns the raw Hive AST (Abstract Syntax Tree) for the SQL
The parser handles:
Usage Examples:
import org.apache.spark.sql.hive.HiveQl
// Parse SELECT query
val selectPlan = HiveQl.parseSql("""
SELECT customer_id, sum(amount) as total
FROM sales
WHERE year = 2023
GROUP BY customer_id
HAVING total > 1000
""")
// Parse CREATE TABLE with SerDe
val createTablePlan = HiveQl.parseSql("""
CREATE TABLE staging.orders (
order_id bigint,
customer_id string,
amount decimal(10,2)
)
PARTITIONED BY (year int, month int)
STORED AS PARQUET
LOCATION '/warehouse/staging/orders'
""")
// Parse complex query with window functions
val windowPlan = HiveQl.parseSql("""
SELECT
product_id,
sale_date,
amount,
ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY sale_date) as rank,
LAG(amount, 1) OVER (PARTITION BY product_id ORDER BY sale_date) as prev_amount
FROM sales_data
""")case class CreateTableAsSelect(
tableDesc: HiveTable,
child: LogicalPlan,
allowExisting: Boolean
) extends LogicalPlanCreateTableAsSelect - Represents CREATE TABLE AS SELECT operations
Usage Example:
val ctasPlan = HiveQl.parseSql("""
CREATE TABLE analytics.customer_summary AS
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_spent
FROM orders
GROUP BY customer_id
""")case class CreateViewAsSelect(
tableDesc: HiveTable,
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
sql: String
) extends LogicalPlanCreateViewAsSelect - Represents CREATE VIEW operations with original SQL text
Usage Example:
val viewPlan = HiveQl.parseSql("""
CREATE OR REPLACE VIEW analytics.monthly_sales AS
SELECT
YEAR(order_date) as year,
MONTH(order_date) as month,
SUM(amount) as total_sales
FROM orders
GROUP BY YEAR(order_date), MONTH(order_date)
""")The parser supports all HiveQL expression types:
Arithmetic Operations:
+, -, *, /, %ABS, ROUND, CEILING, FLOORString Operations:
CONCAT, SUBSTR, LENGTH, TRIMLIKE, RLIKE, REGEXPDate/Time Operations:
YEAR, MONTH, DAY, DATE_ADD, DATEDIFFDATE_FORMAT, UNIX_TIMESTAMPConditional Logic:
CASE WHEN ... THEN ... ELSE ... ENDIF(condition, true_value, false_value)COALESCE, NULLIFAggregate Functions:
COUNT, SUM, AVG, MIN, MAXCOLLECT_LIST, COLLECT_SETSTDDEV, VARIANCE, PERCENTILEWindow Functions:
ROW_NUMBER, RANK, DENSE_RANKLAG, LEAD, FIRST_VALUE, LAST_VALUEUsage Examples:
// Complex CASE expression
val caseExpr = HiveQl.parseSql("""
SELECT
customer_id,
CASE
WHEN total_orders > 50 THEN 'Premium'
WHEN total_orders > 10 THEN 'Regular'
ELSE 'Basic'
END as customer_tier
FROM customer_stats
""")
// Window function with frame specification
val windowFrame = HiveQl.parseSql("""
SELECT
sale_date,
amount,
SUM(amount) OVER (
ORDER BY sale_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_total
FROM daily_sales
""")val lateralViewPlan = HiveQl.parseSql("""
SELECT
customer_id,
tag
FROM customers
LATERAL VIEW explode(tags) exploded_table AS tag
WHERE tag LIKE 'premium%'
""")val distributePlan = HiveQl.parseSql("""
SELECT customer_id, order_date, amount
FROM orders
DISTRIBUTE BY customer_id
SORT BY order_date
""")val customSerDePlan = HiveQl.parseSql("""
CREATE TABLE json_data (
id bigint,
data string
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/data/json_files'
""")// Aggregate functions
val COUNT: String
val SUM: String
val AVG: String
val MIN: String
val MAX: String
// Logical operators
val AND: String
val OR: String
val NOT: String
// Comparison operators
val LIKE: String
val RLIKE: String
val REGEXP: String
val IN: String
// Conditional expressions
val CASE: String
val WHEN: String
val THEN: String
val ELSE: String
// Window function frame boundaries
val PRECEDING: String
val FOLLOWING: String
val CURRENT: String
val UNBOUNDED: String
// Data types
val STRING: String
val BIGINT: String
val DOUBLE: String
val BOOLEAN: String
val TIMESTAMP: String
val DECIMAL: StringThe parser supports all UDF types:
// Built-in UDFs
val builtinUDF = HiveQl.parseSql("""
SELECT regexp_replace(name, '[^a-zA-Z]', '') as clean_name
FROM customers
""")
// Custom UDFs (assuming registered)
val customUDF = HiveQl.parseSql("""
SELECT my_custom_function(data, param1, param2) as result
FROM input_table
""")
// UDAFs (User-Defined Aggregate Functions)
val udafPlan = HiveQl.parseSql("""
SELECT
category,
my_custom_agg(value) as custom_aggregate
FROM data_table
GROUP BY category
""")// Array operations
val arrayPlan = HiveQl.parseSql("""
SELECT
array_col[0] as first_element,
size(array_col) as array_length
FROM table_with_arrays
""")
// Map operations
val mapPlan = HiveQl.parseSql("""
SELECT
map_col['key1'] as value1,
map_keys(map_col) as all_keys
FROM table_with_maps
""")
// Struct operations
val structPlan = HiveQl.parseSql("""
SELECT
struct_col.field1,
struct_col.field2
FROM table_with_structs
""")// Dynamic partitioning
val dynamicPartition = HiveQl.parseSql("""
INSERT INTO TABLE partitioned_table
PARTITION (year, month)
SELECT data, year_col, month_col
FROM source_table
""")
// Bucketed tables
val bucketedTable = HiveQl.parseSql("""
CREATE TABLE bucketed_data (
id bigint,
name string,
category string
)
CLUSTERED BY (id) INTO 32 BUCKETS
STORED AS ORC
""")private[hive] class HiveQLDialect(sqlContext: HiveContext) extends ParserDialect {
override def parse(sqlText: String): LogicalPlan = {
sqlContext.executionHive.withHiveState {
HiveQl.parseSql(sqlText)
}
}
}The HiveQLDialect integrates the parser with HiveContext, ensuring proper Hive state management during parsing.
The parser handles various error conditions:
Example Error Handling:
try {
val plan = HiveQl.parseSql("SELECT * FROM non_existent_table")
} catch {
case e: ParseException =>
println(s"Syntax error: ${e.getMessage}")
case e: AnalysisException =>
println(s"Semantic error: ${e.getMessage}")
}Optimization Tips: