or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration-utilities.mdfile-formats.mdhive-client.mdindex.mdmetastore-integration.mdquery-execution.mdsession-configuration.mdudf-support.md
tile.json

query-execution.mddocs/

Query Execution

Specialized execution plans and strategies for Hive table operations and query processing.

Core Imports

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.HiveStrategies
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.expressions.Expression

Capabilities

Hive Table Scanning

Physical execution plan for scanning Hive tables with partition pruning and predicate pushdown.

case class HiveTableScanExec(
  requestedAttributes: Seq[Attribute],
  relation: HiveTableRelation,
  partitionPruningPred: Seq[Expression]
)(@transient private val sparkSession: SparkSession) extends LeafExecNode {
  
  /** Attributes produced by this execution plan */
  override def output: Seq[Attribute]
  
  /** Execute the table scan and produce RDD of internal rows */
  override protected def doExecute(): RDD[InternalRow]
  
  /** Statistics for query optimization */
  override def computeStats(): Statistics
  
  /** String representation for explain plans */
  override def simpleString(maxFields: Int): String
}

Script Transformation Execution

Execute Hive script transformations using external processes.

case class HiveScriptTransformationExec(
  script: Seq[Expression],
  output: Seq[Attribute], 
  child: SparkPlan,
  ioschema: ScriptTransformationIOSchema
) extends UnaryExecNode {
  
  /** Execute script transformation on input data */
  override protected def doExecute(): RDD[InternalRow]
  
  /** Schema for input/output serialization */
  def ioSchema: ScriptTransformationIOSchema
  
  /** Generate code for script execution */
  override def doGenerate(ctx: CodegenContext, ev: ExprCode): ExprCode
}

Hive Table Insert Operations

Command for inserting data into Hive tables with partition support.

case class InsertIntoHiveTable(
  table: CatalogTable,
  partition: Map[String, Option[String]],
  query: LogicalPlan,
  overwrite: Boolean,
  ifPartitionNotExists: Boolean,
  outputColumnNames: Seq[String]
) extends DataWritingCommand {
  
  /** Execute the insert operation */
  override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
  
  /** Metrics for monitoring insert performance */
  override lazy val metrics: Map[String, SQLMetric]
  
  /** Output attributes after insert */
  override def outputColumns: Seq[Attribute]
}

Create Table As Select

Command for creating Hive tables from SELECT query results.

case class CreateHiveTableAsSelectCommand(
  tableDesc: CatalogTable,
  query: LogicalPlan,
  outputColumnNames: Seq[String],
  mode: SaveMode
) extends DataWritingCommand {
  
  /** Execute table creation and data insertion */
  override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
  
  /** Check if table already exists */
  def tableExists(sparkSession: SparkSession): Boolean
  
  /** Validate table and query compatibility */
  def validateTable(sparkSession: SparkSession): Unit
}

Insert into Directory

Command for inserting data into HDFS directories using Hive format.

case class InsertIntoHiveDirCommand(
  isLocal: Boolean,
  storage: CatalogStorageFormat,
  query: LogicalPlan,
  overwrite: Boolean,
  outputColumnNames: Seq[String]
) extends DataWritingCommand {
  
  /** Execute directory insert operation */
  override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
  
  /** Resolve output path for directory insert */
  def resolveOutputPath(): Path
}

Query Planning Strategies

Hive-Specific Query Strategies

private[hive] trait HiveStrategies {
  
  /** Strategy for handling script transformations */
  object HiveScripts extends Strategy {
    def apply(plan: LogicalPlan): Seq[SparkPlan]
  }
  
  /** Strategy for Hive table scans with optimization */
  object HiveTableScans extends Strategy {
    def apply(plan: LogicalPlan): Seq[SparkPlan]
  }
}

Analysis Rules

Rules for analyzing and converting Hive-specific logical plans.

/** Convert generic operations to Hive-specific variants */
object HiveAnalysis extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan
}

/** Convert relations for better performance */
case class RelationConversions(
  sessionCatalog: HiveSessionCatalog
) extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan
}

/** Resolve Hive SerDe table properties */
class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan
}

/** Determine table statistics from HDFS */
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan
}

Usage Examples

Custom Table Scan with Partition Pruning

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions._

val spark = SparkSession.builder()
  .enableHiveSupport()
  .getOrCreate()

// Query with partition pruning
val partitionedQuery = spark.sql("""
  SELECT customer_id, order_total 
  FROM sales_partitioned 
  WHERE year = 2023 AND month >= 10
""")

// Examine execution plan
partitionedQuery.explain(true)

// Show only pushed-down partitions
partitionedQuery.queryExecution.executedPlan.collect {
  case scan: HiveTableScanExec => 
    println(s"Partition filters: ${scan.partitionPruningPred}")
}

Script Transformation Example

// Register custom transformation script
spark.sql("""
  SELECT TRANSFORM(name, age) 
  USING 'python3 /path/to/transform_script.py' 
  AS (processed_name STRING, age_group STRING)
  FROM users
""").show()

// Alternative: Using script files
spark.sql("""
  FROM users
  SELECT TRANSFORM(*)
  USING 'awk -F, "{print $1, ($3 > 30 ? "senior" : "junior")}"'
  AS (name STRING, category STRING)
""").show()

Dynamic Partition Insert

// Enable dynamic partitioning
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

// Insert with dynamic partitions
spark.sql("""
  INSERT INTO TABLE sales_partitioned 
  PARTITION (year, month)
  SELECT customer_id, order_total, order_date, 
         YEAR(order_date) as year, 
         MONTH(order_date) as month
  FROM raw_sales
""")

// Check created partitions
spark.sql("SHOW PARTITIONS sales_partitioned").show()

Create External Table As Select

// Create external table from query with custom location
spark.sql("""
  CREATE TABLE external_summary
  USING HIVE
  OPTIONS (
    path '/user/warehouse/external/summary'
  )
  AS SELECT 
    customer_id,
    COUNT(*) as order_count,
    SUM(order_total) as total_spent
  FROM orders
  GROUP BY customer_id
""")

Performance Optimization

Predicate Pushdown

// Query demonstrating filter pushdown
val optimizedQuery = spark.sql("""
  SELECT product_name, sales_amount
  FROM product_sales 
  WHERE category = 'electronics' 
    AND sale_date >= '2023-01-01'
    AND region IN ('us-west', 'us-east')
""")

// Verify pushdown in execution plan
optimizedQuery.queryExecution.optimizedPlan.collect {
  case Filter(condition, _) => 
    println(s"Filter condition: $condition")
}

Vectorized ORC Reading

// Enable vectorized reading for better performance
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
spark.conf.set("spark.sql.orc.columnarReaderBatchSize", "4096")

// Query will use vectorized reader
val vectorizedQuery = spark.sql("""
  SELECT SUM(sales_amount), AVG(quantity)
  FROM large_orc_table
  WHERE year = 2023
""")

Error Handling

Common execution exceptions:

  • SparkException: General execution failures during query processing
  • TaskFailedException: When individual tasks fail during execution
  • AnalysisException: Schema or table access errors during execution
  • MetastoreException: Hive metastore access errors during execution
import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException

try {
  val result = spark.sql("""
    INSERT INTO non_existent_table 
    SELECT * FROM source_table
  """)
  result.collect()
} catch {
  case e: AnalysisException if e.getMessage.contains("Table or view not found") =>
    println("Target table does not exist")
  case e: SparkException if e.getMessage.contains("Task failed") =>
    println(s"Execution failed: ${e.getCause}")
  case e: Exception =>
    println(s"Unexpected error: ${e.getMessage}")
    throw e
}

Types

Execution Plan Types

trait DataWritingCommand extends Command {
  def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
  def outputColumns: Seq[Attribute]
  def outputOrdering: Seq[SortOrder]
  def metrics: Map[String, SQLMetric]
}

case class ScriptTransformationIOSchema(
  inputRowFormat: Seq[(String, String)],
  outputRowFormat: Seq[(String, String)],
  inputSerdeClass: Option[String],
  outputSerdeClass: Option[String],
  inputSerdeProps: Seq[(String, String)],
  outputSerdeProps: Seq[(String, String)],
  recordReaderClass: Option[String],
  recordWriterClass: Option[String],
  schemaLess: Boolean
)

Table and Partition Types

case class HiveTableRelation(
  tableMeta: CatalogTable,
  dataCols: Seq[AttributeReference], 
  partitionCols: Seq[AttributeReference],
  tableStats: Option[Statistics],
  prunedPartitions: Option[Seq[CatalogTablePartition]]
) extends LogicalRelation {
  
  def isPartitioned: Boolean
  def partitionSpec: Map[String, String]
  def computeStats(): Statistics
}

case class CatalogTablePartition(
  spec: TablePartitionSpec,
  storage: CatalogStorageFormat,
  parameters: Map[String, String]
) {
  def location: Option[URI]
  def toRow: InternalRow
}