The Apache Spark Hive integration provides comprehensive metastore operations through the HiveClient interface, enabling full interaction with Hive metastore for database, table, partition, and function management.
The HiveClient trait provides the primary interface for all metastore operations, abstracting the underlying Hive metastore implementation.
trait HiveClient {
// Version and configuration
def version: HiveVersion
def getConf(key: String, defaultValue: String): String
def setConf(key: String, value: String): Unit
// SQL execution
def runSqlHive(sql: String): Seq[String]
// Session management
def newSession(): HiveClient
def reset(): Unit
def close(): Unit
// Database operations
def listDatabases(pattern: String): Seq[String]
def getDatabase(name: String): CatalogDatabase
def databaseExists(dbName: String): Boolean
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
def alterDatabase(database: CatalogDatabase): Unit
def setCurrentDatabase(databaseName: String): Unit
def getCurrentDatabase: String
// Table operations
def listTables(dbName: String): Seq[String]
def listTables(dbName: String, pattern: String): Seq[String]
def getTable(dbName: String, tableName: String): CatalogTable
def tableExists(dbName: String, tableName: String): Boolean
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
def alterTable(tableName: String, table: CatalogTable): Unit
def renameTable(dbName: String, oldName: String, newName: String): Unit
// Partition operations
def listPartitions(db: String, table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]
def listPartitionNames(db: String, table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[String]
def getPartition(dbName: String, tableName: String, spec: TablePartitionSpec): CatalogTablePartition
def getPartitions(db: String, table: String, specs: Seq[TablePartitionSpec]): Seq[CatalogTablePartition]
def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit
def dropPartitions(db: String, table: String, specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit
def renamePartitions(db: String, table: String, specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit
def alterPartitions(db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit
// Function operations
def listFunctions(db: String, pattern: String): Seq[String]
def getFunction(db: String, name: String): CatalogFunction
def functionExists(db: String, name: String): Boolean
def createFunction(db: String, func: CatalogFunction): Unit
def dropFunction(db: String, name: String): Unit
def alterFunction(db: String, func: CatalogFunction): Unit
// Data loading operations
def loadTable(loadPath: String, tableName: String, replace: Boolean, isSrcLocal: Boolean): Unit
def loadPartition(loadPath: String, dbName: String, tableName: String, partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit
def loadDynamicPartitions(loadPath: String, dbName: String, tableName: String, partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, numDP: Int): Unit
}import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
// Create database through SparkSession
spark.sql("CREATE DATABASE IF NOT EXISTS my_database")
// Create database with properties
spark.sql("""
CREATE DATABASE analytics_db
COMMENT 'Analytics database for reporting'
LOCATION '/user/hive/warehouse/analytics.db'
WITH DBPROPERTIES ('owner'='analytics_team', 'created'='2023-01-01')
""")
// Create database programmatically (internal API)
val database = CatalogDatabase(
name = "my_db",
description = "My database",
locationUri = "/user/hive/warehouse/my_db.db",
properties = Map("owner" -> "user", "team" -> "data")
)
// Note: Direct HiveClient usage is internal API// List databases
spark.sql("SHOW DATABASES").show()
// List databases with pattern
spark.sql("SHOW DATABASES LIKE 'analytics_*'").show()
// Get current database
val currentDb = spark.sql("SELECT current_database()").collect()(0).getString(0)
// Switch database
spark.sql("USE analytics_db")
// Drop database
spark.sql("DROP DATABASE IF EXISTS old_database CASCADE")// Show database details
spark.sql("DESCRIBE DATABASE EXTENDED analytics_db").show()
// Alter database properties
spark.sql("""
ALTER DATABASE analytics_db SET DBPROPERTIES ('modified'='2023-12-01', 'version'='2.0')
""")
// Set database location
spark.sql("ALTER DATABASE analytics_db SET LOCATION '/new/location/analytics.db'")// Create managed table
spark.sql("""
CREATE TABLE employee (
id INT,
name STRING,
department STRING,
salary DOUBLE,
hire_date DATE
) USING HIVE
STORED AS ORC
""")
// Create external table
spark.sql("""
CREATE TABLE external_employee (
id INT,
name STRING,
department STRING
) USING HIVE
STORED AS PARQUET
LOCATION '/data/external/employee'
""")
// Create partitioned table
spark.sql("""
CREATE TABLE partitioned_sales (
transaction_id STRING,
amount DOUBLE,
customer_id STRING
) USING HIVE
PARTITIONED BY (year INT, month INT)
STORED AS ORC
""")// List tables
spark.sql("SHOW TABLES").show()
// List tables in specific database
spark.sql("SHOW TABLES IN analytics_db").show()
// List tables with pattern
spark.sql("SHOW TABLES LIKE 'employee_*'").show()
// Check if table exists
val tableExists = spark.catalog.tableExists("analytics_db", "employee")
// Get table information
spark.sql("DESCRIBE EXTENDED employee").show()
// Show table properties
spark.sql("SHOW TBLPROPERTIES employee").show()// Add column
spark.sql("ALTER TABLE employee ADD COLUMN email STRING")
// Rename column (Hive 3.0+)
spark.sql("ALTER TABLE employee CHANGE COLUMN email email_address STRING")
// Drop column (Hive 3.0+)
spark.sql("ALTER TABLE employee DROP COLUMN email_address")
// Rename table
spark.sql("ALTER TABLE old_employee RENAME TO employee_backup")
// Set table properties
spark.sql("ALTER TABLE employee SET TBLPROPERTIES ('last_modified'='2023-12-01')")
// Change table location
spark.sql("ALTER TABLE external_employee SET LOCATION '/new/data/path'")// Analyze table statistics
spark.sql("ANALYZE TABLE employee COMPUTE STATISTICS")
// Analyze column statistics
spark.sql("ANALYZE TABLE employee COMPUTE STATISTICS FOR COLUMNS id, salary")
// Show table stats
spark.sql("DESCRIBE EXTENDED employee").show()// Add partition
spark.sql("ALTER TABLE partitioned_sales ADD PARTITION (year=2023, month=12)")
// Add partition with location
spark.sql("""
ALTER TABLE partitioned_sales ADD PARTITION (year=2023, month=11)
LOCATION '/data/sales/2023/11'
""")
// Add multiple partitions
spark.sql("""
ALTER TABLE partitioned_sales ADD
PARTITION (year=2023, month=10)
PARTITION (year=2023, month=9)
""")// Show partitions
spark.sql("SHOW PARTITIONS partitioned_sales").show()
// Show partitions with filter
spark.sql("SHOW PARTITIONS partitioned_sales PARTITION(year=2023)").show()
// Drop partition
spark.sql("ALTER TABLE partitioned_sales DROP PARTITION (year=2022, month=1)")
// Drop partition if exists
spark.sql("ALTER TABLE partitioned_sales DROP IF EXISTS PARTITION (year=2022, month=2)")// Enable dynamic partitioning
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
// Insert with dynamic partitioning
spark.sql("""
INSERT INTO TABLE partitioned_sales PARTITION(year, month)
SELECT transaction_id, amount, customer_id, year(date_col), month(date_col)
FROM source_sales
""")
// Overwrite partitions
spark.sql("""
INSERT OVERWRITE TABLE partitioned_sales PARTITION(year=2023, month)
SELECT transaction_id, amount, customer_id, month(date_col)
FROM source_sales
WHERE year(date_col) = 2023
""")// Set partition properties
spark.sql("""
ALTER TABLE partitioned_sales PARTITION (year=2023, month=12)
SET SERDEPROPERTIES ('field.delim'='\t')
""")
// Change partition location
spark.sql("""
ALTER TABLE partitioned_sales PARTITION (year=2023, month=12)
SET LOCATION '/new/partition/location'
""")// Create temporary function
spark.sql("""
CREATE TEMPORARY FUNCTION my_upper AS 'com.example.UpperCaseUDF'
""")
// Create function with JAR
spark.sql("""
CREATE FUNCTION my_database.complex_function AS 'com.example.ComplexUDF'
USING JAR '/path/to/udf.jar'
""")
// Create function with multiple resources
spark.sql("""
CREATE FUNCTION analytics.ml_predict AS 'com.example.MLPredictUDF'
USING JAR '/path/to/ml-udf.jar',
FILE '/path/to/model.pkl'
""")// List functions
spark.sql("SHOW FUNCTIONS").show()
// List functions with pattern
spark.sql("SHOW FUNCTIONS LIKE 'my_*'").show()
// List user-defined functions only
spark.sql("SHOW USER FUNCTIONS").show()
// Describe function
spark.sql("DESCRIBE FUNCTION my_upper").show()
// Show extended function info
spark.sql("DESCRIBE FUNCTION EXTENDED my_upper").show()// Use UDF in query
val result = spark.sql("""
SELECT my_upper(name) as upper_name
FROM employee
WHERE my_upper(name) LIKE 'JOHN%'
""")
// Use aggregate UDF
val avgResult = spark.sql("""
SELECT department, my_avg(salary) as avg_salary
FROM employee
GROUP BY department
""")// Load data from local file
spark.sql("""
LOAD DATA LOCAL INPATH '/local/path/data.txt'
INTO TABLE employee
""")
// Load data from HDFS
spark.sql("""
LOAD DATA INPATH '/hdfs/path/data.txt'
INTO TABLE employee
""")
// Load data with overwrite
spark.sql("""
LOAD DATA INPATH '/hdfs/path/new_data.txt'
OVERWRITE INTO TABLE employee
""")// Load into specific partition
spark.sql("""
LOAD DATA INPATH '/data/2023/12/sales.txt'
INTO TABLE partitioned_sales PARTITION (year=2023, month=12)
""")
// Load with partition overwrite
spark.sql("""
LOAD DATA INPATH '/data/2023/11/sales.txt'
OVERWRITE INTO TABLE partitioned_sales PARTITION (year=2023, month=11)
""")The metastore client supports multiple Hive versions:
abstract class HiveVersion {
def fullVersion: String
def extraDeps: Seq[String]
def exclusions: Seq[String]
}
object HiveVersion {
val v12 = hive.v12 // 0.12.0
val v13 = hive.v13 // 0.13.1
val v14 = hive.v14 // 0.14.0
val v1_0 = hive.v1_0 // 1.0.0
val v1_1 = hive.v1_1 // 1.1.0
val v1_2 = hive.v1_2 // 1.2.1
val v2_0 = hive.v2_0 // 2.0.1
val v2_1 = hive.v2_1 // 2.1.1
}// Set metastore version
spark.conf.set("spark.sql.hive.metastore.version", "2.1.1")
// Configure for specific Hive installation
spark.conf.set("spark.sql.hive.metastore.jars", "/path/to/hive/lib/*")
// Use Maven for version-specific JARs
spark.conf.set("spark.sql.hive.metastore.jars", "maven")Connection Issues:
// Configure metastore URI
spark.conf.set("hive.metastore.uris", "thrift://metastore-host:9083")
// Set connection timeout
spark.conf.set("hive.metastore.client.connect.retry.delay", "5s")
spark.conf.set("hive.metastore.client.socket.timeout", "1800s")Permission Errors:
// Configure metastore authentication
spark.conf.set("hive.metastore.sasl.enabled", "true")
spark.conf.set("hive.metastore.kerberos.principal", "hive/_HOST@REALM")Schema Validation:
// Disable schema validation for development
spark.conf.set("hive.metastore.schema.verification", "false")
// Enable auto schema migration
spark.conf.set("hive.metastore.schema.verification.record.version", "false")// Enable metastore caching
spark.conf.set("hive.metastore.cache.pinobjtypes", "Table,Database,Type,FieldSchema,Order")
// Configure connection pooling
spark.conf.set("hive.metastore.ds.connection.url.max.connections", "10")
// Set batch fetch size
spark.conf.set("hive.metastore.batch.retrieve.max", "300")// Enable partition pruning
spark.conf.set("spark.sql.hive.metastorePartitionPruning", "true")
// Configure partition discovery
spark.conf.set("spark.sql.sources.parallelPartitionDiscovery.threshold", "32")// Database catalog entry
case class CatalogDatabase(
name: String,
description: String,
locationUri: String,
properties: Map[String, String]
)
// Table catalog entry
case class CatalogTable(
identifier: TableIdentifier,
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: StructType,
partitionColumnNames: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
owner: String = "",
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
createVersion: String = "",
properties: Map[String, String] = Map.empty,
stats: Option[CatalogStatistics] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true,
ignoredProperties: Map[String, String] = Map.empty
)
// Partition catalog entry
case class CatalogTablePartition(
spec: TablePartitionSpec,
storage: CatalogStorageFormat,
parameters: Map[String, String] = Map.empty,
stats: Option[CatalogStatistics] = None
)
// Function catalog entry
case class CatalogFunction(
identifier: FunctionIdentifier,
className: String,
resources: Seq[FunctionResource],
description: Option[String] = None
)
// Table partition specification
type TablePartitionSpec = Map[String, String]
// Table identifier
case class TableIdentifier(table: String, database: Option[String] = None)
// Function identifier
case class FunctionIdentifier(funcName: String, database: Option[String] = None)
// Function resource
case class FunctionResource(resourceType: FunctionResourceType, uri: String)
// Storage format specification
case class CatalogStorageFormat(
locationUri: Option[String] = None,
inputFormat: Option[String] = None,
outputFormat: Option[String] = None,
serde: Option[String] = None,
compressed: Boolean = false,
properties: Map[String, String] = Map.empty
)
// Table statistics
case class CatalogStatistics(
sizeInBytes: Long,
rowCount: Option[Long] = None,
colStats: Map[String, CatalogColumnStat] = Map.empty
)