or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-hive-integration.mdexecution-engine.mdfile-formats.mdindex.mdmetastore-operations.mdudf-integration.md
tile.json

metastore-operations.mddocs/

Metastore Operations

The Apache Spark Hive integration provides comprehensive metastore operations through the HiveClient interface, enabling full interaction with Hive metastore for database, table, partition, and function management.

HiveClient Interface

The HiveClient trait provides the primary interface for all metastore operations, abstracting the underlying Hive metastore implementation.

trait HiveClient {
  // Version and configuration
  def version: HiveVersion
  def getConf(key: String, defaultValue: String): String
  def setConf(key: String, value: String): Unit
  
  // SQL execution
  def runSqlHive(sql: String): Seq[String]
  
  // Session management  
  def newSession(): HiveClient
  def reset(): Unit
  def close(): Unit
  
  // Database operations
  def listDatabases(pattern: String): Seq[String]
  def getDatabase(name: String): CatalogDatabase
  def databaseExists(dbName: String): Boolean
  def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit  
  def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
  def alterDatabase(database: CatalogDatabase): Unit
  def setCurrentDatabase(databaseName: String): Unit
  def getCurrentDatabase: String
  
  // Table operations
  def listTables(dbName: String): Seq[String]
  def listTables(dbName: String, pattern: String): Seq[String]  
  def getTable(dbName: String, tableName: String): CatalogTable
  def tableExists(dbName: String, tableName: String): Boolean
  def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
  def alterTable(tableName: String, table: CatalogTable): Unit
  def renameTable(dbName: String, oldName: String, newName: String): Unit
  
  // Partition operations
  def listPartitions(db: String, table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]
  def listPartitionNames(db: String, table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[String]
  def getPartition(dbName: String, tableName: String, spec: TablePartitionSpec): CatalogTablePartition
  def getPartitions(db: String, table: String, specs: Seq[TablePartitionSpec]): Seq[CatalogTablePartition]
  def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit
  def dropPartitions(db: String, table: String, specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit
  def renamePartitions(db: String, table: String, specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit
  def alterPartitions(db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit
  
  // Function operations
  def listFunctions(db: String, pattern: String): Seq[String]
  def getFunction(db: String, name: String): CatalogFunction
  def functionExists(db: String, name: String): Boolean
  def createFunction(db: String, func: CatalogFunction): Unit
  def dropFunction(db: String, name: String): Unit
  def alterFunction(db: String, func: CatalogFunction): Unit
  
  // Data loading operations
  def loadTable(loadPath: String, tableName: String, replace: Boolean, isSrcLocal: Boolean): Unit
  def loadPartition(loadPath: String, dbName: String, tableName: String, partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit
  def loadDynamicPartitions(loadPath: String, dbName: String, tableName: String, partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, numDP: Int): Unit
}

Database Operations

Creating Databases

import org.apache.spark.sql.catalyst.catalog.CatalogDatabase

// Create database through SparkSession
spark.sql("CREATE DATABASE IF NOT EXISTS my_database")

// Create database with properties
spark.sql("""
CREATE DATABASE analytics_db
COMMENT 'Analytics database for reporting'
LOCATION '/user/hive/warehouse/analytics.db'
WITH DBPROPERTIES ('owner'='analytics_team', 'created'='2023-01-01')
""")

// Create database programmatically (internal API)
val database = CatalogDatabase(
  name = "my_db",
  description = "My database",
  locationUri = "/user/hive/warehouse/my_db.db",
  properties = Map("owner" -> "user", "team" -> "data")
)
// Note: Direct HiveClient usage is internal API

Managing Databases

// List databases
spark.sql("SHOW DATABASES").show()

// List databases with pattern
spark.sql("SHOW DATABASES LIKE 'analytics_*'").show()

// Get current database
val currentDb = spark.sql("SELECT current_database()").collect()(0).getString(0)

// Switch database
spark.sql("USE analytics_db")

// Drop database
spark.sql("DROP DATABASE IF EXISTS old_database CASCADE")

Database Properties

// Show database details
spark.sql("DESCRIBE DATABASE EXTENDED analytics_db").show()

// Alter database properties
spark.sql("""
ALTER DATABASE analytics_db SET DBPROPERTIES ('modified'='2023-12-01', 'version'='2.0')
""")

// Set database location
spark.sql("ALTER DATABASE analytics_db SET LOCATION '/new/location/analytics.db'")

Table Operations

Creating Tables

// Create managed table
spark.sql("""
CREATE TABLE employee (
  id INT,
  name STRING,
  department STRING,
  salary DOUBLE,
  hire_date DATE
) USING HIVE
STORED AS ORC
""")

// Create external table
spark.sql("""
CREATE TABLE external_employee (
  id INT,
  name STRING,
  department STRING
) USING HIVE
STORED AS PARQUET
LOCATION '/data/external/employee'
""")

// Create partitioned table
spark.sql("""
CREATE TABLE partitioned_sales (
  transaction_id STRING,
  amount DOUBLE,
  customer_id STRING
) USING HIVE
PARTITIONED BY (year INT, month INT)
STORED AS ORC
""")

Table Management

// List tables
spark.sql("SHOW TABLES").show()

// List tables in specific database
spark.sql("SHOW TABLES IN analytics_db").show()

// List tables with pattern
spark.sql("SHOW TABLES LIKE 'employee_*'").show()

// Check if table exists
val tableExists = spark.catalog.tableExists("analytics_db", "employee")

// Get table information
spark.sql("DESCRIBE EXTENDED employee").show()

// Show table properties
spark.sql("SHOW TBLPROPERTIES employee").show()

Altering Tables

// Add column
spark.sql("ALTER TABLE employee ADD COLUMN email STRING")

// Rename column (Hive 3.0+)
spark.sql("ALTER TABLE employee CHANGE COLUMN email email_address STRING")

// Drop column (Hive 3.0+)
spark.sql("ALTER TABLE employee DROP COLUMN email_address")

// Rename table
spark.sql("ALTER TABLE old_employee RENAME TO employee_backup")

// Set table properties
spark.sql("ALTER TABLE employee SET TBLPROPERTIES ('last_modified'='2023-12-01')")

// Change table location
spark.sql("ALTER TABLE external_employee SET LOCATION '/new/data/path'")

Table Statistics

// Analyze table statistics
spark.sql("ANALYZE TABLE employee COMPUTE STATISTICS")

// Analyze column statistics
spark.sql("ANALYZE TABLE employee COMPUTE STATISTICS FOR COLUMNS id, salary")

// Show table stats
spark.sql("DESCRIBE EXTENDED employee").show()

Partition Operations

Creating Partitions

// Add partition
spark.sql("ALTER TABLE partitioned_sales ADD PARTITION (year=2023, month=12)")

// Add partition with location
spark.sql("""
ALTER TABLE partitioned_sales ADD PARTITION (year=2023, month=11)
LOCATION '/data/sales/2023/11'
""")

// Add multiple partitions
spark.sql("""
ALTER TABLE partitioned_sales ADD 
PARTITION (year=2023, month=10)
PARTITION (year=2023, month=9)
""")

Managing Partitions

// Show partitions
spark.sql("SHOW PARTITIONS partitioned_sales").show()

// Show partitions with filter
spark.sql("SHOW PARTITIONS partitioned_sales PARTITION(year=2023)").show()

// Drop partition
spark.sql("ALTER TABLE partitioned_sales DROP PARTITION (year=2022, month=1)")

// Drop partition if exists
spark.sql("ALTER TABLE partitioned_sales DROP IF EXISTS PARTITION (year=2022, month=2)")

Dynamic Partitioning

// Enable dynamic partitioning
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

// Insert with dynamic partitioning
spark.sql("""
INSERT INTO TABLE partitioned_sales PARTITION(year, month)
SELECT transaction_id, amount, customer_id, year(date_col), month(date_col)
FROM source_sales
""")

// Overwrite partitions
spark.sql("""
INSERT OVERWRITE TABLE partitioned_sales PARTITION(year=2023, month)
SELECT transaction_id, amount, customer_id, month(date_col)  
FROM source_sales
WHERE year(date_col) = 2023
""")

Partition Properties

// Set partition properties
spark.sql("""
ALTER TABLE partitioned_sales PARTITION (year=2023, month=12)
SET SERDEPROPERTIES ('field.delim'='\t')
""")

// Change partition location
spark.sql("""
ALTER TABLE partitioned_sales PARTITION (year=2023, month=12)
SET LOCATION '/new/partition/location'
""")

Function Operations

Creating Functions

// Create temporary function
spark.sql("""
CREATE TEMPORARY FUNCTION my_upper AS 'com.example.UpperCaseUDF'
""")

// Create function with JAR
spark.sql("""
CREATE FUNCTION my_database.complex_function AS 'com.example.ComplexUDF'
USING JAR '/path/to/udf.jar'
""")

// Create function with multiple resources
spark.sql("""
CREATE FUNCTION analytics.ml_predict AS 'com.example.MLPredictUDF'
USING JAR '/path/to/ml-udf.jar',
      FILE '/path/to/model.pkl'
""")

Managing Functions

// List functions
spark.sql("SHOW FUNCTIONS").show()

// List functions with pattern
spark.sql("SHOW FUNCTIONS LIKE 'my_*'").show()

// List user-defined functions only
spark.sql("SHOW USER FUNCTIONS").show()

// Describe function
spark.sql("DESCRIBE FUNCTION my_upper").show()

// Show extended function info
spark.sql("DESCRIBE FUNCTION EXTENDED my_upper").show()

Function Usage

// Use UDF in query
val result = spark.sql("""
SELECT my_upper(name) as upper_name
FROM employee  
WHERE my_upper(name) LIKE 'JOHN%'
""")

// Use aggregate UDF
val avgResult = spark.sql("""
SELECT department, my_avg(salary) as avg_salary
FROM employee
GROUP BY department  
""")

Data Loading Operations

Loading Data into Tables

// Load data from local file
spark.sql("""
LOAD DATA LOCAL INPATH '/local/path/data.txt'
INTO TABLE employee
""")

// Load data from HDFS
spark.sql("""
LOAD DATA INPATH '/hdfs/path/data.txt'
INTO TABLE employee
""")

// Load data with overwrite
spark.sql("""
LOAD DATA INPATH '/hdfs/path/new_data.txt'
OVERWRITE INTO TABLE employee
""")

Loading Partitioned Data

// Load into specific partition
spark.sql("""
LOAD DATA INPATH '/data/2023/12/sales.txt'
INTO TABLE partitioned_sales PARTITION (year=2023, month=12)
""")

// Load with partition overwrite
spark.sql("""
LOAD DATA INPATH '/data/2023/11/sales.txt'
OVERWRITE INTO TABLE partitioned_sales PARTITION (year=2023, month=11)
""")

Version Compatibility

Supported Hive Versions

The metastore client supports multiple Hive versions:

abstract class HiveVersion {
  def fullVersion: String
  def extraDeps: Seq[String]  
  def exclusions: Seq[String]
}

object HiveVersion {
  val v12 = hive.v12  // 0.12.0
  val v13 = hive.v13  // 0.13.1
  val v14 = hive.v14  // 0.14.0
  val v1_0 = hive.v1_0 // 1.0.0
  val v1_1 = hive.v1_1 // 1.1.0
  val v1_2 = hive.v1_2 // 1.2.1
  val v2_0 = hive.v2_0 // 2.0.1
  val v2_1 = hive.v2_1 // 2.1.1
}

Configuration for Different Versions

// Set metastore version
spark.conf.set("spark.sql.hive.metastore.version", "2.1.1")

// Configure for specific Hive installation
spark.conf.set("spark.sql.hive.metastore.jars", "/path/to/hive/lib/*")

// Use Maven for version-specific JARs
spark.conf.set("spark.sql.hive.metastore.jars", "maven")

Error Handling

Common Metastore Errors

Connection Issues:

// Configure metastore URI
spark.conf.set("hive.metastore.uris", "thrift://metastore-host:9083")

// Set connection timeout
spark.conf.set("hive.metastore.client.connect.retry.delay", "5s")
spark.conf.set("hive.metastore.client.socket.timeout", "1800s")

Permission Errors:

// Configure metastore authentication
spark.conf.set("hive.metastore.sasl.enabled", "true")
spark.conf.set("hive.metastore.kerberos.principal", "hive/_HOST@REALM")

Schema Validation:

// Disable schema validation for development
spark.conf.set("hive.metastore.schema.verification", "false")

// Enable auto schema migration
spark.conf.set("hive.metastore.schema.verification.record.version", "false")

Performance Optimization

Metastore Performance

// Enable metastore caching
spark.conf.set("hive.metastore.cache.pinobjtypes", "Table,Database,Type,FieldSchema,Order")

// Configure connection pooling
spark.conf.set("hive.metastore.ds.connection.url.max.connections", "10")

// Set batch fetch size
spark.conf.set("hive.metastore.batch.retrieve.max", "300")

Partition Pruning

// Enable partition pruning
spark.conf.set("spark.sql.hive.metastorePartitionPruning", "true")

// Configure partition discovery
spark.conf.set("spark.sql.sources.parallelPartitionDiscovery.threshold", "32")

Types

// Database catalog entry
case class CatalogDatabase(
  name: String,
  description: String,
  locationUri: String,
  properties: Map[String, String]
)

// Table catalog entry  
case class CatalogTable(
  identifier: TableIdentifier,
  tableType: CatalogTableType,
  storage: CatalogStorageFormat,
  schema: StructType,
  partitionColumnNames: Seq[String] = Seq.empty,
  bucketSpec: Option[BucketSpec] = None,
  owner: String = "",
  createTime: Long = System.currentTimeMillis,
  lastAccessTime: Long = -1,
  createVersion: String = "",
  properties: Map[String, String] = Map.empty,
  stats: Option[CatalogStatistics] = None,
  viewText: Option[String] = None,
  comment: Option[String] = None,
  unsupportedFeatures: Seq[String] = Seq.empty,
  tracksPartitionsInCatalog: Boolean = false,
  schemaPreservesCase: Boolean = true,
  ignoredProperties: Map[String, String] = Map.empty
)

// Partition catalog entry
case class CatalogTablePartition(
  spec: TablePartitionSpec,
  storage: CatalogStorageFormat,
  parameters: Map[String, String] = Map.empty,
  stats: Option[CatalogStatistics] = None
)

// Function catalog entry  
case class CatalogFunction(
  identifier: FunctionIdentifier,
  className: String,
  resources: Seq[FunctionResource],
  description: Option[String] = None
)

// Table partition specification
type TablePartitionSpec = Map[String, String]

// Table identifier
case class TableIdentifier(table: String, database: Option[String] = None)

// Function identifier  
case class FunctionIdentifier(funcName: String, database: Option[String] = None)

// Function resource
case class FunctionResource(resourceType: FunctionResourceType, uri: String)

// Storage format specification
case class CatalogStorageFormat(
  locationUri: Option[String] = None,
  inputFormat: Option[String] = None,
  outputFormat: Option[String] = None,
  serde: Option[String] = None,
  compressed: Boolean = false,
  properties: Map[String, String] = Map.empty
)

// Table statistics
case class CatalogStatistics(
  sizeInBytes: Long,
  rowCount: Option[Long] = None,
  colStats: Map[String, CatalogColumnStat] = Map.empty
)