or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-hive-integration.mdexecution-engine.mdfile-formats.mdindex.mdmetastore-operations.mdudf-integration.md
tile.json

execution-engine.mddocs/

Execution Engine

The Apache Spark Hive integration execution engine provides specialized physical plans and execution strategies for Hive table operations, including table scanning, data insertion, and table creation with optimized performance for Hive-compatible formats.

Physical Execution Plans

HiveTableScanExec

Physical execution plan for scanning Hive tables with partition pruning and predicate pushdown capabilities.

case class HiveTableScanExec(
  requestedAttributes: Seq[Attribute],
  relation: HiveTableRelation,
  partitionPruningPred: Seq[Expression]
) extends LeafExecNode with CodegenSupport {

  def doExecute(): RDD[InternalRow]
  def inputRDDs(): Seq[RDD[InternalRow]]
  def doProduce(ctx: CodegenContext): String
  def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
}

Usage Example:

The HiveTableScanExec is automatically generated when querying Hive tables:

// This query will generate HiveTableScanExec plan
val result = spark.sql("""
SELECT id, name, department 
FROM employee 
WHERE department = 'Engineering' AND hire_date > '2020-01-01'
""")

// View execution plan
result.explain(true)

Key Features:

  • Partition Pruning: Automatically eliminates irrelevant partitions
  • Predicate Pushdown: Pushes filters to storage layer when possible
  • Column Pruning: Reads only required columns
  • Code Generation: Supports code generation for better performance

HiveTableRelation

Logical representation of a Hive table used in physical planning.

case class HiveTableRelation(
  tableMeta: CatalogTable,
  dataCols: Seq[Attribute],
  partitionCols: Seq[Attribute]
) extends LeafNode with MultiInstanceRelation {

  def output: Seq[Attribute]
  def refresh(): Unit
  def newInstance(): HiveTableRelation
}

Data Insertion Operations

InsertIntoHiveTable

Command for inserting data into Hive tables with support for static and dynamic partitioning.

case class InsertIntoHiveTable(
  table: CatalogTable,
  partition: Map[String, Option[String]],
  query: LogicalPlan,
  overwrite: Boolean,
  ifPartitionNotExists: Boolean
) extends UnaryCommand {

  def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
  def innerChildren: Seq[QueryPlan[_]]
}

Usage Examples:

Static Partitioning:

// Insert into specific partition
spark.sql("""
INSERT INTO TABLE partitioned_sales PARTITION(year=2023, month=12)
SELECT transaction_id, amount, customer_id FROM daily_sales
WHERE date_col = '2023-12-01'
""")

// Overwrite partition
spark.sql("""
INSERT OVERWRITE TABLE partitioned_sales PARTITION(year=2023, month=12)
SELECT transaction_id, amount, customer_id FROM corrected_sales
WHERE date_col = '2023-12-01'
""")

Dynamic Partitioning:

// Enable dynamic partitioning
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

// Insert with dynamic partitioning
spark.sql("""
INSERT INTO TABLE partitioned_sales PARTITION(year, month)
SELECT transaction_id, amount, customer_id, 
       year(transaction_date), month(transaction_date)
FROM raw_sales
""")

Conditional Insert:

// Insert only if partition doesn't exist
spark.sql("""
ALTER TABLE partitioned_sales ADD IF NOT EXISTS PARTITION(year=2023, month=11)
""")

spark.sql("""
INSERT INTO TABLE partitioned_sales PARTITION(year=2023, month=11)
SELECT * FROM source_data WHERE year=2023 AND month=11
""")

Table Creation Operations

CreateHiveTableAsSelectCommand

Command for creating Hive tables from query results with configurable storage formats and properties.

case class CreateHiveTableAsSelectCommand(
  tableDesc: CatalogTable,
  query: LogicalPlan,
  ignoreIfExists: Boolean
) extends DataWritingCommand {

  def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
  def innerChildren: Seq[QueryPlan[_]]
}

Usage Examples:

Basic CTAS:

spark.sql("""
CREATE TABLE employee_summary AS
SELECT department, 
       COUNT(*) as employee_count,
       AVG(salary) as avg_salary
FROM employee
GROUP BY department
""")

CTAS with Storage Format:

spark.sql("""
CREATE TABLE employee_orc
USING HIVE
STORED AS ORC
AS SELECT * FROM employee
""")

CTAS with Partitioning:

spark.sql("""
CREATE TABLE partitioned_employee_summary
USING HIVE
PARTITIONED BY (department)
STORED AS PARQUET
AS SELECT id, name, salary, department FROM employee
""")

CTAS with Properties:

spark.sql("""
CREATE TABLE compressed_employee
USING HIVE
STORED AS ORC
TBLPROPERTIES (
  'orc.compress'='SNAPPY',
  'orc.stripe.size'='67108864'
)
AS SELECT * FROM employee
""")

Script Transformation

ScriptTransformationExec

Execution plan for TRANSFORM queries using external scripts (MAP-REDUCE style processing).

case class ScriptTransformationExec(
  input: Seq[Expression],
  script: String,
  output: Seq[Attribute],
  child: SparkPlan,
  ioschema: HiveScriptIOSchema
) extends UnaryExecNode {

  def doExecute(): RDD[InternalRow]
  protected def withNewChildInternal(newChild: SparkPlan): ScriptTransformationExec
}

Usage Example:

// Transform using external script
spark.sql("""
SELECT TRANSFORM(id, name, salary)
USING 'python process_employee.py'
AS (processed_id INT, processed_name STRING, salary_grade STRING)
FROM employee
""")

// Transform with custom input/output format
spark.sql("""
SELECT TRANSFORM(name, department)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
USING '/usr/bin/python3 transform_data.py'
ROW FORMAT DELIMITED  
FIELDS TERMINATED BY ','
AS (transformed_name STRING, dept_code STRING)
FROM employee
""")

Hive Query Strategies

HiveStrategies

Planning strategies specific to Hive integration that determine optimal execution plans.

object HiveStrategies extends Strategy {
  def apply(plan: LogicalPlan): Seq[SparkPlan]
  
  // Specific strategies
  object Scripts extends Strategy
  object DataSinks extends Strategy  
  object DDLStrategy extends Strategy
}

Key Strategies:

  1. Hive Table Scans: Optimized scanning of Hive tables
  2. Script Transformations: Execution of TRANSFORM queries
  3. Data Sinks: Efficient writing to Hive tables
  4. DDL Operations: Handling Hive DDL commands

Optimization Features

Predicate Pushdown

Automatic pushdown of filters to reduce data scanning:

// Filters pushed down to storage layer
val optimizedQuery = spark.sql("""
SELECT id, name 
FROM large_table 
WHERE year = 2023 AND month = 12 AND status = 'ACTIVE'
""")

// Check pushdown in execution plan
optimizedQuery.queryExecution.executedPlan

Partition Pruning

Elimination of unnecessary partition scans:

// Only scans relevant partitions
val prunedQuery = spark.sql("""
SELECT COUNT(*) 
FROM partitioned_sales 
WHERE year IN (2022, 2023) AND month > 6
""")

// Verify partition pruning
prunedQuery.queryExecution.optimizedPlan

Column Pruning

Reading only required columns from storage:

// Only reads 'name' and 'salary' columns
val columnPrunedQuery = spark.sql("""
SELECT name, salary
FROM employee_with_many_columns
WHERE department = 'Engineering'
""")

Bucketed Table Support

Bucketed Reads

Optimized reading from bucketed Hive tables:

// Create bucketed table
spark.sql("""
CREATE TABLE bucketed_employee (
  id INT, name STRING, department STRING, salary DOUBLE
) USING HIVE
CLUSTERED BY (id) INTO 4 BUCKETS
STORED AS ORC
""")

// Bucketed joins (automatic optimization)
val bucketedJoin = spark.sql("""
SELECT e1.name, e2.name as manager_name
FROM bucketed_employee e1 
JOIN bucketed_employee e2 ON e1.manager_id = e2.id
""")

Sort-Merge Bucket Joins

High-performance joins for bucketed tables:

// Enable sort-merge bucket joins
spark.conf.set("spark.sql.bucketing.coalesceBucketsInJoin.enabled", "true")

// Automatic SMB join for compatible bucketed tables
val smbJoin = spark.sql("""
SELECT o.order_id, c.customer_name
FROM bucketed_orders o
JOIN bucketed_customers c ON o.customer_id = c.customer_id
""")

Configuration and Tuning

Execution Configuration

// Configure Hive execution settings
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("hive.exec.max.dynamic.partitions", "5000")
spark.conf.set("hive.exec.max.dynamic.partitions.pernode", "2000")

// Control small files
spark.conf.set("hive.merge.tezfiles", "true")
spark.conf.set("hive.merge.smallfiles.avgsize", "16000000")

Performance Tuning

// Optimize for large tables
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

// Configure join strategies
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.bucketing.coalesceBucketsInJoin.enabled", "true")

// Memory management
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Error Handling

Common Execution Errors

Partition Not Found:

// Handle missing partitions
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

// Graceful handling
val safeQuery = spark.sql("""
SELECT * FROM partitioned_table 
WHERE year = 2023 AND month BETWEEN 1 AND 12
""").filter($"year".isNotNull && $"month".isNotNull)

Schema Evolution:

// Handle schema changes
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

// Read with schema tolerance
val schema_tolerant = spark.read
  .option("mergeSchema", "true")
  .table("evolving_table")

Resource Constraints:

// Optimize for limited resources
spark.conf.set("spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold", "0")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

Monitoring and Debugging

Execution Plan Analysis

// View complete execution plan
val query = spark.sql("SELECT * FROM large_hive_table WHERE id > 1000")

// Physical plan
query.explain(true)

// Formatted plan
query.queryExecution.debug.codegen()

// Execution statistics
query.queryExecution.executedPlan.execute().count()

Performance Metrics

// Enable metrics collection
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.logLevel", "INFO")

// Access metrics after execution
val metrics = query.queryExecution.executedPlan.metrics
metrics.foreach { case (name, metric) =>
  println(s"$name: ${metric.value}")
}

Types

// Base execution node
trait SparkPlan extends QueryPlan[SparkPlan] {
  def execute(): RDD[InternalRow]
  def executeCollect(): Array[InternalRow]
  def metrics: Map[String, SQLMetric]
}

// Leaf execution node
trait LeafExecNode extends SparkPlan {
  final override def children: Seq[SparkPlan] = Nil
  def doExecute(): RDD[InternalRow]
}

// Unary execution node
trait UnaryExecNode extends SparkPlan {
  def child: SparkPlan
  final override def children: Seq[SparkPlan] = child :: Nil
}

// Code generation support
trait CodegenSupport extends SparkPlan {
  def doProduce(ctx: CodegenContext): String
  def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
}

// Data writing command
trait DataWritingCommand extends Command {
  def outputColumnNames: Seq[String]
  def outputColumns: Seq[Attribute]
}

// Command interface
trait Command extends LogicalPlan {
  override def output: Seq[Attribute] = Seq.empty
  override def children: Seq[LogicalPlan] = Seq.empty
  def run(sparkSession: SparkSession): Seq[Row]
}

// Hive script I/O schema
case class HiveScriptIOSchema(
  inputRowFormat: Seq[(String, String)],
  outputRowFormat: Seq[(String, String)],  
  inputSerdeClass: Option[String],
  outputSerdeClass: Option[String],
  inputSerdeProps: Seq[(String, String)],
  outputSerdeProps: Seq[(String, String)],
  recordReaderClass: Option[String],
  recordWriterClass: Option[String],
  schemaLess: Boolean
)

// Table partition specification
type TablePartitionSpec = Map[String, String]

// SQL metric for execution statistics
class SQLMetric(
  val metricType: String,
  initValue: Long = 0L
) extends AccumulatorV2[Long, Long]

// Expression for column references and computations
trait Expression extends TreeNode[Expression] {
  def dataType: DataType
  def nullable: Boolean  
  def eval(input: InternalRow): Any
}

// Attribute for column metadata
trait Attribute extends Expression with NamedExpression {
  def name: String
  def dataType: DataType
  def nullable: Boolean
  def qualifier: Seq[String]
}