Apache Spark Hive integration module that provides support for Hive tables, queries, and SerDes
Direct access to Hive metastore for programmatic database, table, partition, and function management through the HiveClient interface. This provides low-level control over Hive metastore operations beyond what's available through SQL.
Core interface for interacting with Hive metastore programmatically.
/**
* Interface to Hive client for metastore operations
* Shared across internal and external classloaders
*/
private[hive] trait HiveClient {
/** Returns the Hive Version of this client */
def version: HiveVersion
/** Returns configuration value for the given key */
def getConf(key: String, defaultValue: String): String
/** Returns the associated Hive SessionState */
def getState: Any
/** Execute HiveQL command and return results as strings */
def runSqlHive(sql: String): Seq[String]
/** Set output streams for Hive operations */
def setOut(stream: PrintStream): Unit
def setInfo(stream: PrintStream): Unit
def setError(stream: PrintStream): Unit
/** Add JAR to class loader */
def addJar(path: String): Unit
/** Create new client session sharing class loader and Hive client */
def newSession(): HiveClient
/** Run function within Hive state (SessionState, HiveConf, Hive client and class loader) */
def withHiveState[A](f: => A): A
/** Remove all metadata - for testing only */
def reset(): Unit
}Comprehensive database management operations.
/**
* Database management operations
*/
trait HiveClient {
/** List database names matching pattern */
def listDatabases(pattern: String): Seq[String]
/** Get database metadata - throws exception if not found */
def getDatabase(name: String): CatalogDatabase
/** Check if database exists */
def databaseExists(dbName: String): Boolean
/** Set current database */
def setCurrentDatabase(databaseName: String): Unit
/** Create new database */
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
/**
* Drop database
* @param name - Database name to drop
* @param ignoreIfNotExists - Don't throw error if database doesn't exist
* @param cascade - Remove all associated objects (tables, functions)
*/
def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/** Alter existing database */
def alterDatabase(database: CatalogDatabase): Unit
}Usage Examples:
// Assuming you have access to HiveClient instance
val client: HiveClient = // obtained from HiveExternalCatalog or similar
// List all databases
val databases = client.listDatabases("*")
println(s"Available databases: ${databases.mkString(", ")}")
// Create new database
val newDb = CatalogDatabase(
name = "analytics_db",
description = "Analytics database",
locationUri = "hdfs://cluster/user/hive/warehouse/analytics_db.db",
properties = Map("owner" -> "analytics_team")
)
client.createDatabase(newDb, ignoreIfExists = true)
// Switch to database
client.setCurrentDatabase("analytics_db")
// Get database info
val dbInfo = client.getDatabase("analytics_db")
println(s"Database location: ${dbInfo.locationUri}")Complete table management functionality.
/**
* Table management operations
*/
trait HiveClient {
/** List all tables in database */
def listTables(dbName: String): Seq[String]
/** List tables matching pattern in database */
def listTables(dbName: String, pattern: String): Seq[String]
/** Check if table exists */
def tableExists(dbName: String, tableName: String): Boolean
/** Get table metadata - throws NoSuchTableException if not found */
def getTable(dbName: String, tableName: String): CatalogTable
/** Get table metadata - returns None if not found */
def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
/** Create new table */
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
/** Drop table */
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
/** Alter existing table */
def alterTable(table: CatalogTable): Unit
def alterTable(dbName: String, tableName: String, table: CatalogTable): Unit
/** Update table schema and properties */
def alterTableDataSchema(
dbName: String,
tableName: String,
newDataSchema: StructType,
schemaProps: Map[String, String]
): Unit
/** Get partitions matching partial spec with optional filtering */
def getPartitionsByFilter(
catalogTable: CatalogTable,
predicates: Seq[Expression]
): Seq[CatalogTablePartition]
/** Add JAR to class loader for UDF/SerDe usage */
def addJar(path: String): Unit
/** Remove all metadata - used for testing only */
def reset(): Unit
}Usage Examples:
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.types._
// List tables
val tables = client.listTables("default")
println(s"Tables in default database: ${tables.mkString(", ")}")
// Create table
val tableSchema = StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true)
))
val newTable = CatalogTable(
identifier = TableIdentifier("users", Some("default")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = tableSchema,
partitionColumnNames = Seq.empty,
properties = Map("comment" -> "User information table")
)
client.createTable(newTable, ignoreIfExists = true)
// Get table info
val tableInfo = client.getTable("default", "users")
println(s"Table schema: ${tableInfo.schema}")
println(s"Table type: ${tableInfo.tableType}")
// Check if table exists
if (client.tableExists("default", "users")) {
println("Table exists")
}Comprehensive partition management for partitioned tables.
/**
* Partition management operations
*/
trait HiveClient {
/** Create one or more partitions */
def createPartitions(
db: String,
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean
): Unit
/** Drop one or more partitions */
def dropPartitions(
db: String,
table: String,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean,
retainData: Boolean
): Unit
/** Rename existing partitions */
def renamePartitions(
db: String,
table: String,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]
): Unit
/** Alter existing partitions */
def alterPartitions(
db: String,
table: String,
newParts: Seq[CatalogTablePartition]
): Unit
/** Get partition - throws NoSuchPartitionException if not found */
def getPartition(
dbName: String,
tableName: String,
spec: TablePartitionSpec
): CatalogTablePartition
/** Get partition - returns None if not found */
def getPartitionOption(
table: CatalogTable,
spec: TablePartitionSpec
): Option[CatalogTablePartition]
/** Get partitions matching partial spec */
def getPartitions(
catalogTable: CatalogTable,
partialSpec: Option[TablePartitionSpec] = None
): Seq[CatalogTablePartition]
/** Get partition names matching partial spec */
def getPartitionNames(
table: CatalogTable,
partialSpec: Option[TablePartitionSpec] = None
): Seq[String]
/** Get partitions filtered by predicates */
def getPartitionsByFilter(
catalogTable: CatalogTable,
predicates: Seq[Expression]
): Seq[CatalogTablePartition]
}Usage Examples:
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
// Get table for partition operations
val table = client.getTable("sales_db", "monthly_sales")
// List all partitions
val allPartitions = client.getPartitions(table)
println(s"Total partitions: ${allPartitions.length}")
// Get specific partition
val partSpec: TablePartitionSpec = Map("year" -> "2023", "month" -> "12")
val partition = client.getPartitionOption(table, partSpec)
partition match {
case Some(part) => println(s"Partition location: ${part.storage.locationUri}")
case None => println("Partition not found")
}
// Create new partition
val newPartition = CatalogTablePartition(
spec = Map("year" -> "2024", "month" -> "01"),
storage = CatalogStorageFormat(
locationUri = Some("hdfs://cluster/user/hive/warehouse/sales_db.db/monthly_sales/year=2024/month=01"),
inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"),
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
)
)
client.createPartitions("sales_db", "monthly_sales", Seq(newPartition), ignoreIfExists = true)Operations for loading data into tables and partitions.
/**
* Data loading operations
*/
trait HiveClient {
/** Load data into static partition */
def loadPartition(
loadPath: String,
dbName: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive requires LinkedHashMap ordering
replace: Boolean,
inheritTableSpecs: Boolean,
isSrcLocal: Boolean
): Unit
/** Load data into existing table */
def loadTable(
loadPath: String,
tableName: String,
replace: Boolean,
isSrcLocal: Boolean
): Unit
/** Load data creating dynamic partitions */
def loadDynamicPartitions(
loadPath: String,
dbName: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive requires LinkedHashMap ordering
replace: Boolean,
numDP: Int
): Unit
}Management of user-defined functions in Hive metastore.
/**
* Function management operations
*/
trait HiveClient {
/** Create function in database */
def createFunction(db: String, func: CatalogFunction): Unit
/** Drop existing function */
def dropFunction(db: String, name: String): Unit
/** Rename existing function */
def renameFunction(db: String, oldName: String, newName: String): Unit
/** Alter existing function */
def alterFunction(db: String, func: CatalogFunction): Unit
/** Get function - throws NoSuchPermanentFunctionException if not found */
def getFunction(db: String, name: String): CatalogFunction
/** Get function - returns None if not found */
def getFunctionOption(db: String, name: String): Option[CatalogFunction]
/** Check if function exists */
def functionExists(db: String, name: String): Boolean
/** List functions matching pattern */
def listFunctions(db: String, pattern: String): Seq[String]
}Usage Examples:
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource}
// List functions
val functions = client.listFunctions("default", "*")
println(s"Available functions: ${functions.mkString(", ")}")
// Create custom function
val customFunction = CatalogFunction(
identifier = FunctionIdentifier("my_upper", Some("default")),
className = "com.example.MyUpperUDF",
resources = Seq(FunctionResource(
resourceType = org.apache.spark.sql.catalyst.catalog.FunctionResourceType.jar,
uri = "hdfs://cluster/jars/my-udfs.jar"
))
)
client.createFunction("default", customFunction)
// Check if function exists
if (client.functionExists("default", "my_upper")) {
println("Custom function created successfully")
}/**
* Abstract class representing Hive version with dependencies
*/
abstract class HiveVersion(
val fullVersion: String,
val extraDeps: Seq[String] = Nil,
val exclusions: Seq[String] = Nil
)
// Supported Hive versions
val allSupportedHiveVersions: Set[HiveVersion] = Set(
v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3
)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-hive-2-11