Specialized execution plans and strategies for Hive table operations and query processing.
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.HiveStrategies
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.expressions.ExpressionPhysical execution plan for scanning Hive tables with partition pruning and predicate pushdown.
case class HiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: HiveTableRelation,
partitionPruningPred: Seq[Expression]
)(@transient private val sparkSession: SparkSession) extends LeafExecNode {
/** Attributes produced by this execution plan */
override def output: Seq[Attribute]
/** Execute the table scan and produce RDD of internal rows */
override protected def doExecute(): RDD[InternalRow]
/** Statistics for query optimization */
override def computeStats(): Statistics
/** String representation for explain plans */
override def simpleString(maxFields: Int): String
}Execute Hive script transformations using external processes.
case class HiveScriptTransformationExec(
script: Seq[Expression],
output: Seq[Attribute],
child: SparkPlan,
ioschema: ScriptTransformationIOSchema
) extends UnaryExecNode {
/** Execute script transformation on input data */
override protected def doExecute(): RDD[InternalRow]
/** Schema for input/output serialization */
def ioSchema: ScriptTransformationIOSchema
/** Generate code for script execution */
override def doGenerate(ctx: CodegenContext, ev: ExprCode): ExprCode
}Command for inserting data into Hive tables with partition support.
case class InsertIntoHiveTable(
table: CatalogTable,
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean,
outputColumnNames: Seq[String]
) extends DataWritingCommand {
/** Execute the insert operation */
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
/** Metrics for monitoring insert performance */
override lazy val metrics: Map[String, SQLMetric]
/** Output attributes after insert */
override def outputColumns: Seq[Attribute]
}Command for creating Hive tables from SELECT query results.
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumnNames: Seq[String],
mode: SaveMode
) extends DataWritingCommand {
/** Execute table creation and data insertion */
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
/** Check if table already exists */
def tableExists(sparkSession: SparkSession): Boolean
/** Validate table and query compatibility */
def validateTable(sparkSession: SparkSession): Unit
}Command for inserting data into HDFS directories using Hive format.
case class InsertIntoHiveDirCommand(
isLocal: Boolean,
storage: CatalogStorageFormat,
query: LogicalPlan,
overwrite: Boolean,
outputColumnNames: Seq[String]
) extends DataWritingCommand {
/** Execute directory insert operation */
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
/** Resolve output path for directory insert */
def resolveOutputPath(): Path
}private[hive] trait HiveStrategies {
/** Strategy for handling script transformations */
object HiveScripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan]
}
/** Strategy for Hive table scans with optimization */
object HiveTableScans extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan]
}
}Rules for analyzing and converting Hive-specific logical plans.
/** Convert generic operations to Hive-specific variants */
object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan
}
/** Convert relations for better performance */
case class RelationConversions(
sessionCatalog: HiveSessionCatalog
) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan
}
/** Resolve Hive SerDe table properties */
class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan
}
/** Determine table statistics from HDFS */
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan
}import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions._
val spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate()
// Query with partition pruning
val partitionedQuery = spark.sql("""
SELECT customer_id, order_total
FROM sales_partitioned
WHERE year = 2023 AND month >= 10
""")
// Examine execution plan
partitionedQuery.explain(true)
// Show only pushed-down partitions
partitionedQuery.queryExecution.executedPlan.collect {
case scan: HiveTableScanExec =>
println(s"Partition filters: ${scan.partitionPruningPred}")
}// Register custom transformation script
spark.sql("""
SELECT TRANSFORM(name, age)
USING 'python3 /path/to/transform_script.py'
AS (processed_name STRING, age_group STRING)
FROM users
""").show()
// Alternative: Using script files
spark.sql("""
FROM users
SELECT TRANSFORM(*)
USING 'awk -F, "{print $1, ($3 > 30 ? "senior" : "junior")}"'
AS (name STRING, category STRING)
""").show()// Enable dynamic partitioning
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
// Insert with dynamic partitions
spark.sql("""
INSERT INTO TABLE sales_partitioned
PARTITION (year, month)
SELECT customer_id, order_total, order_date,
YEAR(order_date) as year,
MONTH(order_date) as month
FROM raw_sales
""")
// Check created partitions
spark.sql("SHOW PARTITIONS sales_partitioned").show()// Create external table from query with custom location
spark.sql("""
CREATE TABLE external_summary
USING HIVE
OPTIONS (
path '/user/warehouse/external/summary'
)
AS SELECT
customer_id,
COUNT(*) as order_count,
SUM(order_total) as total_spent
FROM orders
GROUP BY customer_id
""")// Query demonstrating filter pushdown
val optimizedQuery = spark.sql("""
SELECT product_name, sales_amount
FROM product_sales
WHERE category = 'electronics'
AND sale_date >= '2023-01-01'
AND region IN ('us-west', 'us-east')
""")
// Verify pushdown in execution plan
optimizedQuery.queryExecution.optimizedPlan.collect {
case Filter(condition, _) =>
println(s"Filter condition: $condition")
}// Enable vectorized reading for better performance
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
spark.conf.set("spark.sql.orc.columnarReaderBatchSize", "4096")
// Query will use vectorized reader
val vectorizedQuery = spark.sql("""
SELECT SUM(sales_amount), AVG(quantity)
FROM large_orc_table
WHERE year = 2023
""")Common execution exceptions:
import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
try {
val result = spark.sql("""
INSERT INTO non_existent_table
SELECT * FROM source_table
""")
result.collect()
} catch {
case e: AnalysisException if e.getMessage.contains("Table or view not found") =>
println("Target table does not exist")
case e: SparkException if e.getMessage.contains("Task failed") =>
println(s"Execution failed: ${e.getCause}")
case e: Exception =>
println(s"Unexpected error: ${e.getMessage}")
throw e
}trait DataWritingCommand extends Command {
def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
def outputColumns: Seq[Attribute]
def outputOrdering: Seq[SortOrder]
def metrics: Map[String, SQLMetric]
}
case class ScriptTransformationIOSchema(
inputRowFormat: Seq[(String, String)],
outputRowFormat: Seq[(String, String)],
inputSerdeClass: Option[String],
outputSerdeClass: Option[String],
inputSerdeProps: Seq[(String, String)],
outputSerdeProps: Seq[(String, String)],
recordReaderClass: Option[String],
recordWriterClass: Option[String],
schemaLess: Boolean
)case class HiveTableRelation(
tableMeta: CatalogTable,
dataCols: Seq[AttributeReference],
partitionCols: Seq[AttributeReference],
tableStats: Option[Statistics],
prunedPartitions: Option[Seq[CatalogTablePartition]]
) extends LogicalRelation {
def isPartitioned: Boolean
def partitionSpec: Map[String, String]
def computeStats(): Statistics
}
case class CatalogTablePartition(
spec: TablePartitionSpec,
storage: CatalogStorageFormat,
parameters: Map[String, String]
) {
def location: Option[URI]
def toRow: InternalRow
}