Low-level interface to Hive metastore client with version abstraction, supporting direct SQL execution and raw Hive operations. The HiveClient trait provides a unified interface across different Hive versions (2.0.x through 4.0.x) while HiveClientImpl provides the concrete implementation.
Core trait defining the interface to Hive metastore operations.
/**
* External interface to Hive client for all metastore operations
* Provides version abstraction and unified API across Hive versions
*/
trait HiveClient {
def version: HiveVersion
def getConf(key: String, defaultValue: String): String
def getState: Any
def userName: String
}Execute Hive SQL directly through the client.
/**
* Execute Hive SQL and return results
* @param sql SQL statement to execute
* @return Sequence of result strings
*/
def runSqlHive(sql: String): Seq[String]Usage Example:
val client: HiveClient = // obtain client instance
val results = client.runSqlHive("SHOW TABLES")
results.foreach(println)
// Execute DDL
client.runSqlHive("CREATE TABLE test (id INT, name STRING)")Control output streams for Hive operations.
/**
* Set output stream for Hive operations
* @param stream PrintStream for output
*/
def setOut(stream: PrintStream): Unit
/**
* Set info stream for Hive operations
* @param stream PrintStream for info messages
*/
def setInfo(stream: PrintStream): Unit
/**
* Set error stream for Hive operations
* @param stream PrintStream for error messages
*/
def setError(stream: PrintStream): UnitDirect database management through Hive client.
/**
* List all tables in a database
* @param dbName Database name
* @return Sequence of table names
*/
def listTables(dbName: String): Seq[String]
/**
* List tables matching a pattern
* @param dbName Database name
* @param pattern Pattern to match (glob pattern)
* @return Sequence of matching table names
*/
def listTables(dbName: String, pattern: String): Seq[String]
/**
* Set the current database
* @param databaseName Database name to set as current
*/
def setCurrentDatabase(databaseName: String): Unit
/**
* Get database metadata
* @param name Database name
* @return Database definition
*/
def getDatabase(name: String): CatalogDatabase
/**
* Check if a database exists
* @param dbName Database name
* @return True if database exists
*/
def databaseExists(dbName: String): Boolean
/**
* List databases matching a pattern
* @param pattern Pattern to match (glob pattern)
* @return Sequence of matching database names
*/
def listDatabases(pattern: String): Seq[String]
/**
* Create a new database
* @param database Database definition
* @param ignoreIfExists If true, don't fail if database already exists
*/
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
/**
* Drop a database
* @param name Database name
* @param ignoreIfNotExists If true, don't fail if database doesn't exist
* @param cascade If true, drop all tables in the database
*/
def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/**
* Modify database metadata
* @param database Updated database definition
*/
def alterDatabase(database: CatalogDatabase): UnitUsage Examples:
// List databases
val databases = client.listDatabases("*")
println(s"Available databases: ${databases.mkString(", ")}")
// Set current database
client.setCurrentDatabase("my_database")
// Create database
val dbDef = CatalogDatabase(
name = "test_db",
description = "Test database",
locationUri = new URI("hdfs://namenode:9000/user/hive/warehouse/test_db.db"),
properties = Map.empty
)
client.createDatabase(dbDef, ignoreIfExists = true)Comprehensive table management including raw Hive table access.
/**
* Check if a table exists
* @param dbName Database name
* @param tableName Table name
* @return True if table exists
*/
def tableExists(dbName: String, tableName: String): Boolean
/**
* Get table metadata
* @param dbName Database name
* @param tableName Table name
* @return Table definition
*/
def getTable(dbName: String, tableName: String): CatalogTable
/**
* Get table metadata optionally
* @param dbName Database name
* @param tableName Table name
* @return Optional table definition
*/
def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
/**
* Get raw Hive table object
* @param dbName Database name
* @param tableName Table name
* @return Raw Hive table wrapper
*/
def getRawHiveTable(dbName: String, tableName: String): RawHiveTable
/**
* Get raw Hive table object optionally
* @param dbName Database name
* @param tableName Table name
* @return Optional raw Hive table wrapper
*/
def getRawHiveTableOption(dbName: String, tableName: String): Option[RawHiveTable]
/**
* Get multiple tables by name
* @param dbName Database name
* @param tableNames Sequence of table names
* @return Sequence of table definitions
*/
def getTablesByName(dbName: String, tableNames: Seq[String]): Seq[CatalogTable]
/**
* Create a new table
* @param table Table definition
* @param ignoreIfExists If true, don't fail if table already exists
*/
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
/**
* Drop a table
* @param dbName Database name
* @param tableName Table name
* @param ignoreIfNotExists If true, don't fail if table doesn't exist
* @param purge If true, delete table data immediately
*/
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
/**
* Modify table metadata
* @param table Updated table definition
*/
def alterTable(table: CatalogTable): Unit
/**
* Alter table properties
* @param dbName Database name
* @param tableName Table name
* @param props Properties to set
*/
def alterTableProps(dbName: String, tableName: String, props: Map[String, String]): Unit
/**
* Alter table data schema
* @param dbName Database name
* @param tableName Table name
* @param newDataSchema New schema structure
*/
def alterTableDataSchema(dbName: String, tableName: String, newDataSchema: StructType): UnitUsage Examples:
// Check if table exists
if (client.tableExists("my_db", "my_table")) {
println("Table exists")
}
// Get table metadata
val table = client.getTable("my_db", "my_table")
println(s"Table schema: ${table.schema}")
// Get raw Hive table for advanced operations
val rawTable = client.getRawHiveTable("my_db", "my_table")
val hiveProps = rawTable.hiveTableProps()
println(s"Hive table properties: $hiveProps")Manage table partitions through the Hive client.
/**
* Create table partitions
* @param db Database name
* @param table Table name
* @param parts Sequence of partition definitions
* @param ignoreIfExists If true, don't fail if partitions already exist
*/
def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit
/**
* Drop table partitions
* @param db Database name
* @param table Table name
* @param specs Sequence of partition specifications
* @param ignoreIfNotExists If true, don't fail if partitions don't exist
* @param purge If true, delete partition data immediately
* @param retainData If true, keep partition data files
*/
def dropPartitions(db: String, table: String, specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit
/**
* Rename table partitions
* @param db Database name
* @param table Table name
* @param specs Current partition specifications
* @param newSpecs New partition specifications
*/
def renamePartitions(db: String, table: String, specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit
/**
* Modify partition metadata
* @param db Database name
* @param table Table name
* @param newParts Updated partition definitions
*/
def alterPartitions(db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit
/**
* Get partition metadata
* @param db Database name
* @param table Table name
* @param spec Partition specification
* @return Partition definition
*/
def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
/**
* Get partition metadata optionally
* @param db Database name
* @param table Table name
* @param spec Partition specification
* @return Optional partition definition
*/
def getPartitionOption(db: String, table: String, spec: TablePartitionSpec): Option[CatalogTablePartition]
/**
* Get partition names
* @param db Database name
* @param table Table name
* @param partialSpec Optional partial partition specification for filtering
* @return Sequence of partition names
*/
def getPartitionNames(db: String, table: String,
partialSpec: Option[TablePartitionSpec]): Seq[String]
/**
* Get partitions
* @param db Database name
* @param table Table name
* @param partialSpec Optional partial partition specification for filtering
* @return Sequence of partition definitions
*/
def getPartitions(db: String, table: String,
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition]
/**
* Get partitions matching filter predicates
* @param db Database name
* @param table Table name
* @param predicates Filter expressions
* @return Sequence of matching partition definitions
*/
def getPartitionsByFilter(db: String, table: String,
predicates: Seq[Expression]): Seq[CatalogTablePartition]Load data into tables and partitions through Hive client.
/**
* Load data into a partition
* @param db Database name
* @param table Table name
* @param loadPath Path to data files
* @param partition Partition specification
* @param isOverwrite If true, overwrite existing data
* @param inheritTableSpecs If true, inherit table specifications
* @param isSrcLocal If true, source is local filesystem
*/
def loadPartition(db: String, table: String, loadPath: String,
partition: TablePartitionSpec, isOverwrite: Boolean,
inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit
/**
* Load data into a table
* @param db Database name
* @param table Table name
* @param loadPath Path to data files
* @param isOverwrite If true, overwrite existing data
* @param isSrcLocal If true, source is local filesystem
*/
def loadTable(db: String, table: String, loadPath: String,
isOverwrite: Boolean, isSrcLocal: Boolean): Unit
/**
* Load data into dynamic partitions
* @param db Database name
* @param table Table name
* @param loadPath Path to data files
* @param partition Partition specification
* @param replace If true, replace existing partitions
* @param numDP Number of dynamic partitions
*/
def loadDynamicPartitions(db: String, table: String, loadPath: String,
partition: TablePartitionSpec, replace: Boolean,
numDP: Int): UnitManage user-defined functions through the Hive client.
/**
* Create a user-defined function
* @param db Database name
* @param func Function definition
*/
def createFunction(db: String, func: CatalogFunction): Unit
/**
* Drop a user-defined function
* @param db Database name
* @param name Function name
*/
def dropFunction(db: String, name: String): Unit
/**
* Rename a function
* @param db Database name
* @param oldName Current function name
* @param newName New function name
*/
def renameFunction(db: String, oldName: String, newName: String): Unit
/**
* Modify function metadata
* @param db Database name
* @param func Updated function definition
*/
def alterFunction(db: String, func: CatalogFunction): Unit
/**
* Get function metadata
* @param db Database name
* @param funcName Function name
* @return Function definition
*/
def getFunction(db: String, funcName: String): CatalogFunction
/**
* Get function metadata optionally
* @param db Database name
* @param funcName Function name
* @return Optional function definition
*/
def getFunctionOption(db: String, funcName: String): Option[CatalogFunction]
/**
* Check if a function exists
* @param db Database name
* @param funcName Function name
* @return True if function exists
*/
def functionExists(db: String, funcName: String): Boolean
/**
* List functions matching a pattern
* @param db Database name
* @param pattern Pattern to match (glob pattern)
* @return Sequence of matching function names
*/
def listFunctions(db: String, pattern: String): Seq[String]Manage client sessions and resources.
/**
* Add JAR file to the Hive session
* @param path Path to JAR file
*/
def addJar(path: String): Unit
/**
* Create a new client session
* @return New HiveClient instance
*/
def newSession(): HiveClient
/**
* Execute code with Hive state context
* @param f Function to execute
* @return Result of function execution
*/
def withHiveState[A](f: => A): A
/**
* Reset client state
*/
def reset(): UnitInterface for accessing raw Hive table objects.
/**
* Raw Hive table abstraction providing access to underlying Hive objects
*/
trait RawHiveTable {
/**
* Get the raw Hive table object
* @return Raw Hive table
*/
def rawTable: Object
/**
* Convert to Spark catalog table
* @return Spark CatalogTable representation
*/
def toCatalogTable: CatalogTable
/**
* Get Hive-specific table properties
* @return Map of Hive table properties
*/
def hiveTableProps(): Map[String, String]
}Usage Example:
val rawTable = client.getRawHiveTable("my_db", "my_table")
// Access raw Hive object for advanced operations
val hiveTable = rawTable.rawTable
// Cast to appropriate Hive type for specific operations
// Convert to Spark representation
val catalogTable = rawTable.toCatalogTable
println(s"Table type: ${catalogTable.tableType}")
// Get Hive-specific properties
val hiveProps = rawTable.hiveTableProps()
hiveProps.foreach { case (key, value) =>
println(s"$key = $value")
}Hive version abstraction and supported versions.
/**
* Represents a Hive version with dependencies and exclusions
* @param fullVersion Full version string (e.g., "2.3.10")
* @param extraDeps Extra dependencies required for this version
* @param exclusions Dependencies to exclude for this version
*/
abstract class HiveVersion(
val fullVersion: String,
val extraDeps: Seq[String] = Nil,
val exclusions: Seq[String] = Nil
) extends Ordered[HiveVersion] {
def compare(that: HiveVersion): Int
}
object hive {
case object v2_0 extends HiveVersion("2.0.1")
case object v2_1 extends HiveVersion("2.1.1")
case object v2_2 extends HiveVersion("2.2.0")
case object v2_3 extends HiveVersion("2.3.10")
case object v3_0 extends HiveVersion("3.0.0")
case object v3_1 extends HiveVersion("3.1.3")
case object v4_0 extends HiveVersion("4.0.1")
val allSupportedHiveVersions: Set[HiveVersion]
}Usage Example:
import org.apache.spark.sql.hive.client.hive
// Check client version
val clientVersion = client.version
println(s"Using Hive version: ${clientVersion.fullVersion}")
// Compare versions
if (clientVersion >= hive.v3_0) {
println("Using Hive 3.0 or later")
}
// List all supported versions
hive.allSupportedHiveVersions.foreach { version =>
println(s"Supported: ${version.fullVersion}")
}The concrete implementation provided by Spark.
/**
* Concrete implementation of HiveClient
* Created through HiveUtils factory methods
*/
class HiveClientImpl extends HiveClient {
// Implementation details are internal
}Usage Example:
import org.apache.spark.sql.hive.HiveUtils
// Create client for execution context
val executionClient = HiveUtils.newClientForExecution(sparkConf, hadoopConf)
// Create client for metadata operations
val metadataClient = HiveUtils.newClientForMetadata(
sparkConf,
hadoopConf,
Map("hive.metastore.uris" -> "thrift://localhost:9083")
)
// Use client for operations
val tables = metadataClient.listTables("default")
println(s"Tables in default database: ${tables.mkString(", ")}")