or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddata-conversion.mdexternal-catalog.mdhive-client.mdindex.mdudf-support.md
tile.json

hive-client.mddocs/

Hive Client Interface

Low-level interface to Hive metastore client with version abstraction, supporting direct SQL execution and raw Hive operations. The HiveClient trait provides a unified interface across different Hive versions (2.0.x through 4.0.x) while HiveClientImpl provides the concrete implementation.

Capabilities

HiveClient Interface

Core trait defining the interface to Hive metastore operations.

/**
 * External interface to Hive client for all metastore operations
 * Provides version abstraction and unified API across Hive versions
 */
trait HiveClient {
  def version: HiveVersion
  def getConf(key: String, defaultValue: String): String
  def getState: Any
  def userName: String
}

SQL Execution

Execute Hive SQL directly through the client.

/**
 * Execute Hive SQL and return results
 * @param sql SQL statement to execute
 * @return Sequence of result strings
 */
def runSqlHive(sql: String): Seq[String]

Usage Example:

val client: HiveClient = // obtain client instance
val results = client.runSqlHive("SHOW TABLES")
results.foreach(println)

// Execute DDL
client.runSqlHive("CREATE TABLE test (id INT, name STRING)")

I/O Stream Management

Control output streams for Hive operations.

/**
 * Set output stream for Hive operations
 * @param stream PrintStream for output
 */
def setOut(stream: PrintStream): Unit

/**
 * Set info stream for Hive operations  
 * @param stream PrintStream for info messages
 */
def setInfo(stream: PrintStream): Unit

/**
 * Set error stream for Hive operations
 * @param stream PrintStream for error messages
 */
def setError(stream: PrintStream): Unit

Database Operations

Direct database management through Hive client.

/**
 * List all tables in a database
 * @param dbName Database name
 * @return Sequence of table names
 */
def listTables(dbName: String): Seq[String]

/**
 * List tables matching a pattern
 * @param dbName Database name
 * @param pattern Pattern to match (glob pattern)
 * @return Sequence of matching table names
 */
def listTables(dbName: String, pattern: String): Seq[String]

/**
 * Set the current database
 * @param databaseName Database name to set as current
 */
def setCurrentDatabase(databaseName: String): Unit

/**
 * Get database metadata
 * @param name Database name
 * @return Database definition
 */
def getDatabase(name: String): CatalogDatabase

/**
 * Check if a database exists
 * @param dbName Database name
 * @return True if database exists
 */
def databaseExists(dbName: String): Boolean

/**
 * List databases matching a pattern
 * @param pattern Pattern to match (glob pattern)
 * @return Sequence of matching database names
 */
def listDatabases(pattern: String): Seq[String]

/**
 * Create a new database
 * @param database Database definition
 * @param ignoreIfExists If true, don't fail if database already exists
 */
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit

/**
 * Drop a database
 * @param name Database name
 * @param ignoreIfNotExists If true, don't fail if database doesn't exist
 * @param cascade If true, drop all tables in the database
 */
def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit

/**
 * Modify database metadata
 * @param database Updated database definition
 */
def alterDatabase(database: CatalogDatabase): Unit

Usage Examples:

// List databases
val databases = client.listDatabases("*")
println(s"Available databases: ${databases.mkString(", ")}")

// Set current database
client.setCurrentDatabase("my_database")

// Create database
val dbDef = CatalogDatabase(
  name = "test_db",
  description = "Test database",
  locationUri = new URI("hdfs://namenode:9000/user/hive/warehouse/test_db.db"),
  properties = Map.empty
)
client.createDatabase(dbDef, ignoreIfExists = true)

Table Operations

Comprehensive table management including raw Hive table access.

/**
 * Check if a table exists
 * @param dbName Database name
 * @param tableName Table name
 * @return True if table exists
 */
def tableExists(dbName: String, tableName: String): Boolean

/**
 * Get table metadata
 * @param dbName Database name
 * @param tableName Table name
 * @return Table definition
 */
def getTable(dbName: String, tableName: String): CatalogTable

/**
 * Get table metadata optionally
 * @param dbName Database name
 * @param tableName Table name
 * @return Optional table definition
 */
def getTableOption(dbName: String, tableName: String): Option[CatalogTable]

/**
 * Get raw Hive table object
 * @param dbName Database name
 * @param tableName Table name
 * @return Raw Hive table wrapper
 */
def getRawHiveTable(dbName: String, tableName: String): RawHiveTable

/**
 * Get raw Hive table object optionally
 * @param dbName Database name
 * @param tableName Table name
 * @return Optional raw Hive table wrapper
 */
def getRawHiveTableOption(dbName: String, tableName: String): Option[RawHiveTable]

/**
 * Get multiple tables by name
 * @param dbName Database name
 * @param tableNames Sequence of table names
 * @return Sequence of table definitions
 */
def getTablesByName(dbName: String, tableNames: Seq[String]): Seq[CatalogTable]

/**
 * Create a new table
 * @param table Table definition
 * @param ignoreIfExists If true, don't fail if table already exists
 */
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit

/**
 * Drop a table
 * @param dbName Database name
 * @param tableName Table name
 * @param ignoreIfNotExists If true, don't fail if table doesn't exist
 * @param purge If true, delete table data immediately
 */
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit

/**
 * Modify table metadata
 * @param table Updated table definition
 */
def alterTable(table: CatalogTable): Unit

/**
 * Alter table properties
 * @param dbName Database name
 * @param tableName Table name
 * @param props Properties to set
 */
def alterTableProps(dbName: String, tableName: String, props: Map[String, String]): Unit

/**
 * Alter table data schema
 * @param dbName Database name
 * @param tableName Table name
 * @param newDataSchema New schema structure
 */
def alterTableDataSchema(dbName: String, tableName: String, newDataSchema: StructType): Unit

Usage Examples:

// Check if table exists
if (client.tableExists("my_db", "my_table")) {
  println("Table exists")
}

// Get table metadata
val table = client.getTable("my_db", "my_table")
println(s"Table schema: ${table.schema}")

// Get raw Hive table for advanced operations
val rawTable = client.getRawHiveTable("my_db", "my_table")
val hiveProps = rawTable.hiveTableProps()
println(s"Hive table properties: $hiveProps")

Partition Operations

Manage table partitions through the Hive client.

/**
 * Create table partitions
 * @param db Database name
 * @param table Table name
 * @param parts Sequence of partition definitions
 * @param ignoreIfExists If true, don't fail if partitions already exist
 */
def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition], 
                    ignoreIfExists: Boolean): Unit

/**
 * Drop table partitions
 * @param db Database name
 * @param table Table name
 * @param specs Sequence of partition specifications
 * @param ignoreIfNotExists If true, don't fail if partitions don't exist
 * @param purge If true, delete partition data immediately
 * @param retainData If true, keep partition data files
 */
def dropPartitions(db: String, table: String, specs: Seq[TablePartitionSpec], 
                  ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit

/**
 * Rename table partitions
 * @param db Database name
 * @param table Table name
 * @param specs Current partition specifications
 * @param newSpecs New partition specifications
 */
def renamePartitions(db: String, table: String, specs: Seq[TablePartitionSpec], 
                    newSpecs: Seq[TablePartitionSpec]): Unit

/**
 * Modify partition metadata
 * @param db Database name
 * @param table Table name
 * @param newParts Updated partition definitions
 */
def alterPartitions(db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit

/**
 * Get partition metadata
 * @param db Database name
 * @param table Table name
 * @param spec Partition specification
 * @return Partition definition
 */
def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition

/**
 * Get partition metadata optionally
 * @param db Database name
 * @param table Table name
 * @param spec Partition specification
 * @return Optional partition definition
 */
def getPartitionOption(db: String, table: String, spec: TablePartitionSpec): Option[CatalogTablePartition]

/**
 * Get partition names
 * @param db Database name
 * @param table Table name
 * @param partialSpec Optional partial partition specification for filtering
 * @return Sequence of partition names
 */
def getPartitionNames(db: String, table: String, 
                     partialSpec: Option[TablePartitionSpec]): Seq[String]

/**
 * Get partitions
 * @param db Database name
 * @param table Table name
 * @param partialSpec Optional partial partition specification for filtering
 * @return Sequence of partition definitions
 */
def getPartitions(db: String, table: String, 
                 partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition]

/**
 * Get partitions matching filter predicates
 * @param db Database name
 * @param table Table name
 * @param predicates Filter expressions
 * @return Sequence of matching partition definitions
 */
def getPartitionsByFilter(db: String, table: String, 
                         predicates: Seq[Expression]): Seq[CatalogTablePartition]

Data Loading Operations

Load data into tables and partitions through Hive client.

/**
 * Load data into a partition
 * @param db Database name
 * @param table Table name
 * @param loadPath Path to data files
 * @param partition Partition specification
 * @param isOverwrite If true, overwrite existing data
 * @param inheritTableSpecs If true, inherit table specifications
 * @param isSrcLocal If true, source is local filesystem
 */
def loadPartition(db: String, table: String, loadPath: String, 
                  partition: TablePartitionSpec, isOverwrite: Boolean, 
                  inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit

/**
 * Load data into a table
 * @param db Database name
 * @param table Table name
 * @param loadPath Path to data files
 * @param isOverwrite If true, overwrite existing data
 * @param isSrcLocal If true, source is local filesystem
 */
def loadTable(db: String, table: String, loadPath: String, 
              isOverwrite: Boolean, isSrcLocal: Boolean): Unit

/**
 * Load data into dynamic partitions
 * @param db Database name
 * @param table Table name
 * @param loadPath Path to data files
 * @param partition Partition specification
 * @param replace If true, replace existing partitions
 * @param numDP Number of dynamic partitions
 */
def loadDynamicPartitions(db: String, table: String, loadPath: String, 
                         partition: TablePartitionSpec, replace: Boolean, 
                         numDP: Int): Unit

Function Operations

Manage user-defined functions through the Hive client.

/**
 * Create a user-defined function
 * @param db Database name
 * @param func Function definition
 */
def createFunction(db: String, func: CatalogFunction): Unit

/**
 * Drop a user-defined function
 * @param db Database name
 * @param name Function name
 */
def dropFunction(db: String, name: String): Unit

/**
 * Rename a function
 * @param db Database name
 * @param oldName Current function name
 * @param newName New function name
 */
def renameFunction(db: String, oldName: String, newName: String): Unit

/**
 * Modify function metadata
 * @param db Database name
 * @param func Updated function definition
 */
def alterFunction(db: String, func: CatalogFunction): Unit

/**
 * Get function metadata
 * @param db Database name
 * @param funcName Function name
 * @return Function definition
 */
def getFunction(db: String, funcName: String): CatalogFunction

/**
 * Get function metadata optionally
 * @param db Database name
 * @param funcName Function name
 * @return Optional function definition
 */
def getFunctionOption(db: String, funcName: String): Option[CatalogFunction]

/**
 * Check if a function exists
 * @param db Database name
 * @param funcName Function name
 * @return True if function exists
 */
def functionExists(db: String, funcName: String): Boolean

/**
 * List functions matching a pattern
 * @param db Database name
 * @param pattern Pattern to match (glob pattern)
 * @return Sequence of matching function names
 */
def listFunctions(db: String, pattern: String): Seq[String]

Session Management

Manage client sessions and resources.

/**
 * Add JAR file to the Hive session
 * @param path Path to JAR file
 */
def addJar(path: String): Unit

/**
 * Create a new client session
 * @return New HiveClient instance
 */
def newSession(): HiveClient

/**
 * Execute code with Hive state context
 * @param f Function to execute
 * @return Result of function execution
 */
def withHiveState[A](f: => A): A

/**
 * Reset client state
 */
def reset(): Unit

Raw Hive Table Interface

Interface for accessing raw Hive table objects.

/**
 * Raw Hive table abstraction providing access to underlying Hive objects
 */
trait RawHiveTable {
  /**
   * Get the raw Hive table object
   * @return Raw Hive table
   */
  def rawTable: Object
  
  /**
   * Convert to Spark catalog table
   * @return Spark CatalogTable representation
   */
  def toCatalogTable: CatalogTable
  
  /**
   * Get Hive-specific table properties
   * @return Map of Hive table properties
   */
  def hiveTableProps(): Map[String, String]
}

Usage Example:

val rawTable = client.getRawHiveTable("my_db", "my_table")

// Access raw Hive object for advanced operations
val hiveTable = rawTable.rawTable
// Cast to appropriate Hive type for specific operations

// Convert to Spark representation
val catalogTable = rawTable.toCatalogTable
println(s"Table type: ${catalogTable.tableType}")

// Get Hive-specific properties
val hiveProps = rawTable.hiveTableProps()
hiveProps.foreach { case (key, value) =>
  println(s"$key = $value")
}

Version Management

Hive version abstraction and supported versions.

/**
 * Represents a Hive version with dependencies and exclusions
 * @param fullVersion Full version string (e.g., "2.3.10")
 * @param extraDeps Extra dependencies required for this version
 * @param exclusions Dependencies to exclude for this version
 */
abstract class HiveVersion(
  val fullVersion: String,
  val extraDeps: Seq[String] = Nil,
  val exclusions: Seq[String] = Nil
) extends Ordered[HiveVersion] {
  def compare(that: HiveVersion): Int
}

object hive {
  case object v2_0 extends HiveVersion("2.0.1")
  case object v2_1 extends HiveVersion("2.1.1")
  case object v2_2 extends HiveVersion("2.2.0")
  case object v2_3 extends HiveVersion("2.3.10")
  case object v3_0 extends HiveVersion("3.0.0")
  case object v3_1 extends HiveVersion("3.1.3")
  case object v4_0 extends HiveVersion("4.0.1")
  
  val allSupportedHiveVersions: Set[HiveVersion]
}

Usage Example:

import org.apache.spark.sql.hive.client.hive

// Check client version
val clientVersion = client.version
println(s"Using Hive version: ${clientVersion.fullVersion}")

// Compare versions
if (clientVersion >= hive.v3_0) {
  println("Using Hive 3.0 or later")
}

// List all supported versions
hive.allSupportedHiveVersions.foreach { version =>
  println(s"Supported: ${version.fullVersion}")
}

Client Implementation

The concrete implementation provided by Spark.

/**
 * Concrete implementation of HiveClient
 * Created through HiveUtils factory methods
 */
class HiveClientImpl extends HiveClient {
  // Implementation details are internal
}

Usage Example:

import org.apache.spark.sql.hive.HiveUtils

// Create client for execution context
val executionClient = HiveUtils.newClientForExecution(sparkConf, hadoopConf)

// Create client for metadata operations
val metadataClient = HiveUtils.newClientForMetadata(
  sparkConf, 
  hadoopConf, 
  Map("hive.metastore.uris" -> "thrift://localhost:9083")
)

// Use client for operations
val tables = metadataClient.listTables("default")
println(s"Tables in default database: ${tables.mkString(", ")}")