or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-hive_2.11@1.6.x

docs

client-interface.mdexecution-engine.mdhive-context.mdhiveql-parser.mdindex.mdorc-support.mdtype-system.mdudf-support.md
tile.json

tessl/maven-org-apache-spark--spark-hive

tessl install tessl/maven-org-apache-spark--spark-hive@1.6.0

Apache Spark SQL Hive integration module providing HiveContext, metastore operations, HiveQL parsing, and Hive data format compatibility

execution-engine.mddocs/

Execution Engine

The Spark Hive Execution Engine provides physical operators and commands specifically designed for Hive integration. These operators handle table scans, data insertion, script transformations, and metadata operations while maintaining compatibility with Hive's data formats and SerDes.

Physical Operators

HiveTableScan

case class HiveTableScan(
  attributes: Seq[Attribute],
  relation: MetastoreRelation,
  partitionPruningPred: Option[Expression]
) extends LeafNode

HiveTableScan - Physical operator for scanning Hive tables with partition pruning support

  • attributes - Output schema columns
  • relation - Hive table metadata from metastore
  • partitionPruningPred - Optional predicate for partition elimination

Usage Examples:

// The HiveTableScan is typically created by the query planner
// when accessing Hive tables through HiveContext

val hiveContext = new HiveContext(sc)

// This query will generate a HiveTableScan operator
val df = hiveContext.sql("""
  SELECT customer_id, order_date, amount 
  FROM sales.orders 
  WHERE year = 2023 AND month >= 6
""")

// The partition pruning predicate (year = 2023 AND month >= 6) 
// will be pushed down to the HiveTableScan operator

InsertIntoHiveTable

case class InsertIntoHiveTable(
  table: MetastoreRelation,
  partition: Map[String, Option[String]],
  child: SparkPlan,
  overwrite: Boolean,
  ifNotExists: Boolean
) extends UnaryNode

InsertIntoHiveTable - Physical operator for inserting data into Hive tables

  • table - Target Hive table metadata
  • partition - Partition specification (None for dynamic partitioning)
  • child - Source data operator
  • overwrite - Whether to overwrite existing data
  • ifNotExists - Whether to skip if partition exists

Usage Examples:

// Static partition insert
hiveContext.sql("""
  INSERT OVERWRITE TABLE sales.orders 
  PARTITION (year=2024, month=1)
  SELECT order_id, customer_id, amount, order_date
  FROM staging.new_orders
""")

// Dynamic partition insert
hiveContext.sql("""
  INSERT INTO TABLE sales.orders 
  PARTITION (year, month)
  SELECT order_id, customer_id, amount, order_date, 
         YEAR(order_date), MONTH(order_date)
  FROM staging.new_orders
""")

ScriptTransformation

case class ScriptTransformation(
  input: Seq[Expression],
  script: String,
  output: Seq[Attribute],
  child: SparkPlan,
  ioschema: HiveScriptIOSchema
) extends UnaryNode

ScriptTransformation - Executes external scripts with Hive-compatible I/O

  • input - Input expressions to pass to script
  • script - External script command to execute
  • output - Expected output schema
  • child - Source data operator
  • ioschema - Input/output format specification

Usage Example:

hiveContext.sql("""
  SELECT TRANSFORM (customer_id, amount)
  USING 'python customer_analysis.py'
  AS (customer_id string, risk_score double)
  FROM sales.orders
""")

HiveScriptIOSchema

case class HiveScriptIOSchema(
  inputRowFormat: Seq[(String, String)],
  outputRowFormat: Seq[(String, String)],
  inputSerdeClass: Option[String],
  outputSerdeClass: Option[String],
  inputSerdeProps: Seq[(String, String)],
  outputSerdeProps: Seq[(String, String)],
  recordReaderClass: Option[String],
  recordWriterClass: Option[String],
  schemaLess: Boolean
)

HiveScriptIOSchema - Defines input/output format for script transformation

Usage with Custom SerDe:

hiveContext.sql("""
  SELECT TRANSFORM (data)
  ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
  USING 'python json_processor.py'
  ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
  AS (result string)
  FROM json_data
""")

DDL Command Operators

CreateTableAsSelect

case class CreateTableAsSelect(
  tableDesc: HiveTable,
  child: SparkPlan,
  allowExisting: Boolean
) extends RunnableCommand

CreateTableAsSelect - Creates Hive table from query results

Usage Example:

hiveContext.sql("""
  CREATE TABLE analytics.customer_summary
  STORED AS PARQUET
  AS SELECT 
    customer_id,
    COUNT(*) as order_count,
    SUM(amount) as total_spent,
    MAX(order_date) as last_order
  FROM sales.orders
  GROUP BY customer_id
""")

CreateViewAsSelect

case class CreateViewAsSelect(
  tableDesc: HiveTable,
  child: LogicalPlan,
  allowExisting: Boolean,
  replace: Boolean,
  sql: String
) extends RunnableCommand

CreateViewAsSelect - Creates Hive view from query

Usage Example:

hiveContext.sql("""
  CREATE OR REPLACE VIEW analytics.monthly_revenue AS
  SELECT 
    YEAR(order_date) as year,
    MONTH(order_date) as month,
    SUM(amount) as revenue
  FROM sales.orders
  GROUP BY YEAR(order_date), MONTH(order_date)
""")

Administrative Commands

AnalyzeTable

case class AnalyzeTable(tableName: String) extends RunnableCommand

AnalyzeTable - Generates table statistics for query optimization

Usage Example:

hiveContext.sql("ANALYZE TABLE sales.orders COMPUTE STATISTICS")

DropTable

case class DropTable(
  tableName: String,
  ifExists: Boolean
) extends RunnableCommand

DropTable - Removes table from Hive metastore

Usage Example:

hiveContext.sql("DROP TABLE IF EXISTS staging.temp_table")

AddJar

case class AddJar(path: String) extends RunnableCommand

AddJar - Adds JAR to Hive classpath for UDFs and SerDes

Usage Example:

hiveContext.sql("ADD JAR /path/to/custom-udfs.jar")

AddFile

case class AddFile(path: String) extends RunnableCommand

AddFile - Adds file to distributed cache for script transformations

Usage Example:

hiveContext.sql("ADD FILE /path/to/python_script.py")

Data Source Commands

CreateMetastoreDataSource

case class CreateMetastoreDataSource(
  tableIdent: TableIdentifier,
  userSpecifiedSchema: Option[StructType],
  provider: String,
  options: Map[String, String],
  allowExisting: Boolean,
  managedIfNoPath: Boolean
) extends RunnableCommand

CreateMetastoreDataSource - Creates data source table in Hive metastore

Usage Example:

hiveContext.sql("""
  CREATE TABLE parquet_data
  USING PARQUET
  OPTIONS (
    path '/data/parquet_files'
  )
""")

CreateMetastoreDataSourceAsSelect

case class CreateMetastoreDataSourceAsSelect(
  tableIdent: TableIdentifier,
  provider: String,
  partitionColumns: Array[String],
  bucketSpec: Option[BucketSpec],
  mode: SaveMode,
  options: Map[String, String],
  query: LogicalPlan
) extends RunnableCommand

CreateMetastoreDataSourceAsSelect - Creates data source table from query

Usage Example:

hiveContext.sql("""
  CREATE TABLE analytics.orc_summary
  USING ORC
  PARTITIONED BY (year)
  AS SELECT customer_id, SUM(amount) as total, year
  FROM sales.orders
  GROUP BY customer_id, year
""")

Native Command Execution

HiveNativeCommand

case class HiveNativeCommand(sql: String) extends RunnableCommand

HiveNativeCommand - Executes raw HiveQL through Hive client

Used for commands that require direct Hive execution:

  • Complex DDL operations
  • Hive-specific administrative commands
  • Custom SerDe operations

Usage Example:

// This might fall back to HiveNativeCommand for complex operations
hiveContext.sql("""
  ALTER TABLE complex_table 
  SET SERDEPROPERTIES ('serialization.format'='1')
""")

Table Description Commands

DescribeHiveTableCommand

case class DescribeHiveTableCommand(
  table: TableIdentifier,
  outputCols: Seq[Attribute],
  isExtended: Boolean
) extends RunnableCommand

DescribeHiveTableCommand - Provides detailed table metadata

Usage Examples:

// Basic table description
hiveContext.sql("DESCRIBE sales.orders")

// Extended table description with storage details
hiveContext.sql("DESCRIBE EXTENDED sales.orders")

// Formatted description
hiveContext.sql("DESCRIBE FORMATTED sales.orders")

Integration with Catalyst

Query Planning

The Hive operators integrate with Spark's Catalyst optimizer:

  1. Logical Plans - HiveQL parser generates Hive-aware logical plans
  2. Analysis - Hive analyzer resolves table references using metastore
  3. Optimization - Standard Catalyst optimizations apply to Hive queries
  4. Physical Planning - Hive-specific operators are selected for Hive tables

Predicate Pushdown

Hive operators support predicate pushdown for:

  • Partition elimination in HiveTableScan
  • Filter pushdown to Hive SerDes
  • Projection pushdown for columnar formats

Example:

// This query benefits from partition and filter pushdown
val optimizedPlan = hiveContext.sql("""
  SELECT customer_id, amount
  FROM sales.orders
  WHERE year = 2023 AND month = 12 AND amount > 100
""")

// The optimizer will:
// 1. Push year=2023, month=12 to partition pruning
// 2. Push amount > 100 to SerDe level filtering  
// 3. Project only customer_id, amount columns

Performance Considerations

Table Scan Optimization

  • Partition Pruning: Automatically eliminates irrelevant partitions
  • Column Pruning: Reads only required columns from storage
  • SerDe Optimization: Leverages format-specific optimizations

Insert Performance

  • Batch Insertion: Groups multiple records for efficient writes
  • Dynamic Partitioning: Automatically creates partitions during insert
  • Compression: Supports all Hive compression codecs

Script Transformation

  • Process Reuse: Reuses script processes across partitions
  • Memory Management: Controls memory usage for large transformations
  • Error Handling: Graceful handling of script failures

Error Handling

Common execution errors and solutions:

Table Not Found:

try {
  hiveContext.sql("SELECT * FROM non_existent_table")
} catch {
  case e: AnalysisException if e.getMessage.contains("Table not found") =>
    println("Please check table name and database")
}

Partition Not Found:

try {
  hiveContext.sql("SELECT * FROM sales.orders WHERE year = 2030")
} catch {
  case e: NoSuchPartitionException =>
    println("Partition does not exist")
}

SerDe Errors:

try {
  hiveContext.sql("SELECT * FROM table_with_custom_serde")
} catch {
  case e: SerDeException =>
    println("SerDe processing failed - check data format")
}