or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddata-type-conversion.mdfile-formats.mdindex.mdmetastore-operations.mdsession-management.mdudf-integration.md
tile.json

metastore-operations.mddocs/

Metastore Operations

Direct access to Hive metastore for programmatic database, table, partition, and function management through the HiveClient interface. This provides low-level control over Hive metastore operations beyond what's available through SQL.

Capabilities

HiveClient Interface

Core interface for interacting with Hive metastore programmatically.

/**
 * Interface to Hive client for metastore operations
 * Shared across internal and external classloaders
 */
private[hive] trait HiveClient {
  
  /** Returns the Hive Version of this client */
  def version: HiveVersion
  
  /** Returns configuration value for the given key */
  def getConf(key: String, defaultValue: String): String
  
  /** Returns the associated Hive SessionState */
  def getState: Any
  
  /** Execute HiveQL command and return results as strings */
  def runSqlHive(sql: String): Seq[String]
  
  /** Set output streams for Hive operations */
  def setOut(stream: PrintStream): Unit
  def setInfo(stream: PrintStream): Unit  
  def setError(stream: PrintStream): Unit
  
  /** Add JAR to class loader */
  def addJar(path: String): Unit
  
  /** Create new client session sharing class loader and Hive client */
  def newSession(): HiveClient
  
  /** Run function within Hive state (SessionState, HiveConf, Hive client and class loader) */
  def withHiveState[A](f: => A): A
  
  /** Remove all metadata - for testing only */
  def reset(): Unit
}

Database Operations

Comprehensive database management operations.

/**
 * Database management operations
 */
trait HiveClient {
  /** List database names matching pattern */
  def listDatabases(pattern: String): Seq[String]
  
  /** Get database metadata - throws exception if not found */
  def getDatabase(name: String): CatalogDatabase
  
  /** Check if database exists */
  def databaseExists(dbName: String): Boolean
  
  /** Set current database */
  def setCurrentDatabase(databaseName: String): Unit
  
  /** Create new database */
  def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
  
  /** 
   * Drop database
   * @param name - Database name to drop
   * @param ignoreIfNotExists - Don't throw error if database doesn't exist
   * @param cascade - Remove all associated objects (tables, functions)
   */
  def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
  
  /** Alter existing database */
  def alterDatabase(database: CatalogDatabase): Unit
}

Usage Examples:

// Assuming you have access to HiveClient instance
val client: HiveClient = // obtained from HiveExternalCatalog or similar

// List all databases
val databases = client.listDatabases("*")
println(s"Available databases: ${databases.mkString(", ")}")

// Create new database
val newDb = CatalogDatabase(
  name = "analytics_db",
  description = "Analytics database", 
  locationUri = "hdfs://cluster/user/hive/warehouse/analytics_db.db",
  properties = Map("owner" -> "analytics_team")
)
client.createDatabase(newDb, ignoreIfExists = true)

// Switch to database
client.setCurrentDatabase("analytics_db")

// Get database info
val dbInfo = client.getDatabase("analytics_db")
println(s"Database location: ${dbInfo.locationUri}")

Table Operations

Complete table management functionality.

/**
 * Table management operations
 */
trait HiveClient {
  /** List all tables in database */
  def listTables(dbName: String): Seq[String]
  
  /** List tables matching pattern in database */
  def listTables(dbName: String, pattern: String): Seq[String]
  
  /** Check if table exists */
  def tableExists(dbName: String, tableName: String): Boolean
  
  /** Get table metadata - throws NoSuchTableException if not found */
  def getTable(dbName: String, tableName: String): CatalogTable
  
  /** Get table metadata - returns None if not found */
  def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
  
  /** Create new table */
  def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
  
  /** Drop table */
  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
  
  /** Alter existing table */
  def alterTable(table: CatalogTable): Unit
  def alterTable(dbName: String, tableName: String, table: CatalogTable): Unit
  
  /** Update table schema and properties */
  def alterTableDataSchema(
    dbName: String,
    tableName: String, 
    newDataSchema: StructType,
    schemaProps: Map[String, String]
  ): Unit
  
  /** Get partitions matching partial spec with optional filtering */
  def getPartitionsByFilter(
    catalogTable: CatalogTable,
    predicates: Seq[Expression]
  ): Seq[CatalogTablePartition]
  
  /** Add JAR to class loader for UDF/SerDe usage */
  def addJar(path: String): Unit
  
  /** Remove all metadata - used for testing only */
  def reset(): Unit
}

Usage Examples:

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.types._

// List tables
val tables = client.listTables("default")
println(s"Tables in default database: ${tables.mkString(", ")}")

// Create table
val tableSchema = StructType(Seq(
  StructField("id", IntegerType, nullable = false),
  StructField("name", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true)
))

val newTable = CatalogTable(
  identifier = TableIdentifier("users", Some("default")),
  tableType = CatalogTableType.MANAGED,
  storage = CatalogStorageFormat.empty,
  schema = tableSchema,
  partitionColumnNames = Seq.empty,
  properties = Map("comment" -> "User information table")
)

client.createTable(newTable, ignoreIfExists = true)

// Get table info
val tableInfo = client.getTable("default", "users")
println(s"Table schema: ${tableInfo.schema}")
println(s"Table type: ${tableInfo.tableType}")

// Check if table exists
if (client.tableExists("default", "users")) {
  println("Table exists")
}

Partition Operations

Comprehensive partition management for partitioned tables.

/**
 * Partition management operations
 */
trait HiveClient {
  /** Create one or more partitions */
  def createPartitions(
    db: String,
    table: String,
    parts: Seq[CatalogTablePartition],
    ignoreIfExists: Boolean
  ): Unit
  
  /** Drop one or more partitions */
  def dropPartitions(
    db: String,
    table: String,
    specs: Seq[TablePartitionSpec],
    ignoreIfNotExists: Boolean,
    purge: Boolean,
    retainData: Boolean
  ): Unit
  
  /** Rename existing partitions */
  def renamePartitions(
    db: String,
    table: String,
    specs: Seq[TablePartitionSpec],
    newSpecs: Seq[TablePartitionSpec]
  ): Unit
  
  /** Alter existing partitions */
  def alterPartitions(
    db: String,
    table: String,
    newParts: Seq[CatalogTablePartition]
  ): Unit
  
  /** Get partition - throws NoSuchPartitionException if not found */
  def getPartition(
    dbName: String,
    tableName: String,
    spec: TablePartitionSpec
  ): CatalogTablePartition
  
  /** Get partition - returns None if not found */
  def getPartitionOption(
    table: CatalogTable,
    spec: TablePartitionSpec
  ): Option[CatalogTablePartition]
  
  /** Get partitions matching partial spec */
  def getPartitions(
    catalogTable: CatalogTable,
    partialSpec: Option[TablePartitionSpec] = None
  ): Seq[CatalogTablePartition]
  
  /** Get partition names matching partial spec */
  def getPartitionNames(
    table: CatalogTable,
    partialSpec: Option[TablePartitionSpec] = None
  ): Seq[String]
  
  /** Get partitions filtered by predicates */
  def getPartitionsByFilter(
    catalogTable: CatalogTable,
    predicates: Seq[Expression]
  ): Seq[CatalogTablePartition]
}

Usage Examples:

import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec

// Get table for partition operations
val table = client.getTable("sales_db", "monthly_sales")

// List all partitions
val allPartitions = client.getPartitions(table)
println(s"Total partitions: ${allPartitions.length}")

// Get specific partition
val partSpec: TablePartitionSpec = Map("year" -> "2023", "month" -> "12")
val partition = client.getPartitionOption(table, partSpec)
partition match {
  case Some(part) => println(s"Partition location: ${part.storage.locationUri}")
  case None => println("Partition not found")
}

// Create new partition
val newPartition = CatalogTablePartition(
  spec = Map("year" -> "2024", "month" -> "01"),
  storage = CatalogStorageFormat(
    locationUri = Some("hdfs://cluster/user/hive/warehouse/sales_db.db/monthly_sales/year=2024/month=01"),
    inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
    outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"),
    serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
  )
)

client.createPartitions("sales_db", "monthly_sales", Seq(newPartition), ignoreIfExists = true)

Data Loading Operations

Operations for loading data into tables and partitions.

/**
 * Data loading operations
 */
trait HiveClient {
  /** Load data into static partition */
  def loadPartition(
    loadPath: String,
    dbName: String,
    tableName: String,
    partSpec: java.util.LinkedHashMap[String, String], // Hive requires LinkedHashMap ordering
    replace: Boolean,
    inheritTableSpecs: Boolean,
    isSrcLocal: Boolean
  ): Unit
  
  /** Load data into existing table */
  def loadTable(
    loadPath: String,
    tableName: String,
    replace: Boolean,
    isSrcLocal: Boolean  
  ): Unit
  
  /** Load data creating dynamic partitions */
  def loadDynamicPartitions(
    loadPath: String,
    dbName: String,
    tableName: String,
    partSpec: java.util.LinkedHashMap[String, String], // Hive requires LinkedHashMap ordering
    replace: Boolean,
    numDP: Int
  ): Unit
}

Function Operations

Management of user-defined functions in Hive metastore.

/**
 * Function management operations
 */
trait HiveClient {
  /** Create function in database */
  def createFunction(db: String, func: CatalogFunction): Unit
  
  /** Drop existing function */
  def dropFunction(db: String, name: String): Unit
  
  /** Rename existing function */
  def renameFunction(db: String, oldName: String, newName: String): Unit
  
  /** Alter existing function */
  def alterFunction(db: String, func: CatalogFunction): Unit
  
  /** Get function - throws NoSuchPermanentFunctionException if not found */
  def getFunction(db: String, name: String): CatalogFunction
  
  /** Get function - returns None if not found */
  def getFunctionOption(db: String, name: String): Option[CatalogFunction]
  
  /** Check if function exists */
  def functionExists(db: String, name: String): Boolean
  
  /** List functions matching pattern */
  def listFunctions(db: String, pattern: String): Seq[String]
}

Usage Examples:

import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource}

// List functions
val functions = client.listFunctions("default", "*")
println(s"Available functions: ${functions.mkString(", ")}")

// Create custom function
val customFunction = CatalogFunction(
  identifier = FunctionIdentifier("my_upper", Some("default")),
  className = "com.example.MyUpperUDF",
  resources = Seq(FunctionResource(
    resourceType = org.apache.spark.sql.catalyst.catalog.FunctionResourceType.jar,
    uri = "hdfs://cluster/jars/my-udfs.jar"
  ))
)

client.createFunction("default", customFunction)

// Check if function exists
if (client.functionExists("default", "my_upper")) {
  println("Custom function created successfully")
}

Types

Hive Version Support

/**
 * Abstract class representing Hive version with dependencies
 */
abstract class HiveVersion(
  val fullVersion: String,
  val extraDeps: Seq[String] = Nil,
  val exclusions: Seq[String] = Nil
)

// Supported Hive versions
val allSupportedHiveVersions: Set[HiveVersion] = Set(
  v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3
)