CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-hive-2-13

Apache Spark Hive integration module that provides seamless integration with Apache Hive data warehouse software, enabling Spark SQL to work with Hive tables, metastore, and SerDes

Overview
Eval results
Files

hive-client.mddocs/

Hive Client Interface

Low-level interface to Hive metastore client with version abstraction, supporting direct SQL execution and raw Hive operations. The HiveClient trait provides a unified interface across different Hive versions (2.0.x through 4.0.x) while HiveClientImpl provides the concrete implementation.

Capabilities

HiveClient Interface

Core trait defining the interface to Hive metastore operations.

/**
 * External interface to Hive client for all metastore operations
 * Provides version abstraction and unified API across Hive versions
 */
trait HiveClient {
  def version: HiveVersion
  def getConf(key: String, defaultValue: String): String
  def getState: Any
  def userName: String
}

SQL Execution

Execute Hive SQL directly through the client.

/**
 * Execute Hive SQL and return results
 * @param sql SQL statement to execute
 * @return Sequence of result strings
 */
def runSqlHive(sql: String): Seq[String]

Usage Example:

val client: HiveClient = // obtain client instance
val results = client.runSqlHive("SHOW TABLES")
results.foreach(println)

// Execute DDL
client.runSqlHive("CREATE TABLE test (id INT, name STRING)")

I/O Stream Management

Control output streams for Hive operations.

/**
 * Set output stream for Hive operations
 * @param stream PrintStream for output
 */
def setOut(stream: PrintStream): Unit

/**
 * Set info stream for Hive operations  
 * @param stream PrintStream for info messages
 */
def setInfo(stream: PrintStream): Unit

/**
 * Set error stream for Hive operations
 * @param stream PrintStream for error messages
 */
def setError(stream: PrintStream): Unit

Database Operations

Direct database management through Hive client.

/**
 * List all tables in a database
 * @param dbName Database name
 * @return Sequence of table names
 */
def listTables(dbName: String): Seq[String]

/**
 * List tables matching a pattern
 * @param dbName Database name
 * @param pattern Pattern to match (glob pattern)
 * @return Sequence of matching table names
 */
def listTables(dbName: String, pattern: String): Seq[String]

/**
 * Set the current database
 * @param databaseName Database name to set as current
 */
def setCurrentDatabase(databaseName: String): Unit

/**
 * Get database metadata
 * @param name Database name
 * @return Database definition
 */
def getDatabase(name: String): CatalogDatabase

/**
 * Check if a database exists
 * @param dbName Database name
 * @return True if database exists
 */
def databaseExists(dbName: String): Boolean

/**
 * List databases matching a pattern
 * @param pattern Pattern to match (glob pattern)
 * @return Sequence of matching database names
 */
def listDatabases(pattern: String): Seq[String]

/**
 * Create a new database
 * @param database Database definition
 * @param ignoreIfExists If true, don't fail if database already exists
 */
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit

/**
 * Drop a database
 * @param name Database name
 * @param ignoreIfNotExists If true, don't fail if database doesn't exist
 * @param cascade If true, drop all tables in the database
 */
def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit

/**
 * Modify database metadata
 * @param database Updated database definition
 */
def alterDatabase(database: CatalogDatabase): Unit

Usage Examples:

// List databases
val databases = client.listDatabases("*")
println(s"Available databases: ${databases.mkString(", ")}")

// Set current database
client.setCurrentDatabase("my_database")

// Create database
val dbDef = CatalogDatabase(
  name = "test_db",
  description = "Test database",
  locationUri = new URI("hdfs://namenode:9000/user/hive/warehouse/test_db.db"),
  properties = Map.empty
)
client.createDatabase(dbDef, ignoreIfExists = true)

Table Operations

Comprehensive table management including raw Hive table access.

/**
 * Check if a table exists
 * @param dbName Database name
 * @param tableName Table name
 * @return True if table exists
 */
def tableExists(dbName: String, tableName: String): Boolean

/**
 * Get table metadata
 * @param dbName Database name
 * @param tableName Table name
 * @return Table definition
 */
def getTable(dbName: String, tableName: String): CatalogTable

/**
 * Get table metadata optionally
 * @param dbName Database name
 * @param tableName Table name
 * @return Optional table definition
 */
def getTableOption(dbName: String, tableName: String): Option[CatalogTable]

/**
 * Get raw Hive table object
 * @param dbName Database name
 * @param tableName Table name
 * @return Raw Hive table wrapper
 */
def getRawHiveTable(dbName: String, tableName: String): RawHiveTable

/**
 * Get raw Hive table object optionally
 * @param dbName Database name
 * @param tableName Table name
 * @return Optional raw Hive table wrapper
 */
def getRawHiveTableOption(dbName: String, tableName: String): Option[RawHiveTable]

/**
 * Get multiple tables by name
 * @param dbName Database name
 * @param tableNames Sequence of table names
 * @return Sequence of table definitions
 */
def getTablesByName(dbName: String, tableNames: Seq[String]): Seq[CatalogTable]

/**
 * Create a new table
 * @param table Table definition
 * @param ignoreIfExists If true, don't fail if table already exists
 */
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit

/**
 * Drop a table
 * @param dbName Database name
 * @param tableName Table name
 * @param ignoreIfNotExists If true, don't fail if table doesn't exist
 * @param purge If true, delete table data immediately
 */
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit

/**
 * Modify table metadata
 * @param table Updated table definition
 */
def alterTable(table: CatalogTable): Unit

/**
 * Alter table properties
 * @param dbName Database name
 * @param tableName Table name
 * @param props Properties to set
 */
def alterTableProps(dbName: String, tableName: String, props: Map[String, String]): Unit

/**
 * Alter table data schema
 * @param dbName Database name
 * @param tableName Table name
 * @param newDataSchema New schema structure
 */
def alterTableDataSchema(dbName: String, tableName: String, newDataSchema: StructType): Unit

Usage Examples:

// Check if table exists
if (client.tableExists("my_db", "my_table")) {
  println("Table exists")
}

// Get table metadata
val table = client.getTable("my_db", "my_table")
println(s"Table schema: ${table.schema}")

// Get raw Hive table for advanced operations
val rawTable = client.getRawHiveTable("my_db", "my_table")
val hiveProps = rawTable.hiveTableProps()
println(s"Hive table properties: $hiveProps")

Partition Operations

Manage table partitions through the Hive client.

/**
 * Create table partitions
 * @param db Database name
 * @param table Table name
 * @param parts Sequence of partition definitions
 * @param ignoreIfExists If true, don't fail if partitions already exist
 */
def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition], 
                    ignoreIfExists: Boolean): Unit

/**
 * Drop table partitions
 * @param db Database name
 * @param table Table name
 * @param specs Sequence of partition specifications
 * @param ignoreIfNotExists If true, don't fail if partitions don't exist
 * @param purge If true, delete partition data immediately
 * @param retainData If true, keep partition data files
 */
def dropPartitions(db: String, table: String, specs: Seq[TablePartitionSpec], 
                  ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit

/**
 * Rename table partitions
 * @param db Database name
 * @param table Table name
 * @param specs Current partition specifications
 * @param newSpecs New partition specifications
 */
def renamePartitions(db: String, table: String, specs: Seq[TablePartitionSpec], 
                    newSpecs: Seq[TablePartitionSpec]): Unit

/**
 * Modify partition metadata
 * @param db Database name
 * @param table Table name
 * @param newParts Updated partition definitions
 */
def alterPartitions(db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit

/**
 * Get partition metadata
 * @param db Database name
 * @param table Table name
 * @param spec Partition specification
 * @return Partition definition
 */
def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition

/**
 * Get partition metadata optionally
 * @param db Database name
 * @param table Table name
 * @param spec Partition specification
 * @return Optional partition definition
 */
def getPartitionOption(db: String, table: String, spec: TablePartitionSpec): Option[CatalogTablePartition]

/**
 * Get partition names
 * @param db Database name
 * @param table Table name
 * @param partialSpec Optional partial partition specification for filtering
 * @return Sequence of partition names
 */
def getPartitionNames(db: String, table: String, 
                     partialSpec: Option[TablePartitionSpec]): Seq[String]

/**
 * Get partitions
 * @param db Database name
 * @param table Table name
 * @param partialSpec Optional partial partition specification for filtering
 * @return Sequence of partition definitions
 */
def getPartitions(db: String, table: String, 
                 partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition]

/**
 * Get partitions matching filter predicates
 * @param db Database name
 * @param table Table name
 * @param predicates Filter expressions
 * @return Sequence of matching partition definitions
 */
def getPartitionsByFilter(db: String, table: String, 
                         predicates: Seq[Expression]): Seq[CatalogTablePartition]

Data Loading Operations

Load data into tables and partitions through Hive client.

/**
 * Load data into a partition
 * @param db Database name
 * @param table Table name
 * @param loadPath Path to data files
 * @param partition Partition specification
 * @param isOverwrite If true, overwrite existing data
 * @param inheritTableSpecs If true, inherit table specifications
 * @param isSrcLocal If true, source is local filesystem
 */
def loadPartition(db: String, table: String, loadPath: String, 
                  partition: TablePartitionSpec, isOverwrite: Boolean, 
                  inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit

/**
 * Load data into a table
 * @param db Database name
 * @param table Table name
 * @param loadPath Path to data files
 * @param isOverwrite If true, overwrite existing data
 * @param isSrcLocal If true, source is local filesystem
 */
def loadTable(db: String, table: String, loadPath: String, 
              isOverwrite: Boolean, isSrcLocal: Boolean): Unit

/**
 * Load data into dynamic partitions
 * @param db Database name
 * @param table Table name
 * @param loadPath Path to data files
 * @param partition Partition specification
 * @param replace If true, replace existing partitions
 * @param numDP Number of dynamic partitions
 */
def loadDynamicPartitions(db: String, table: String, loadPath: String, 
                         partition: TablePartitionSpec, replace: Boolean, 
                         numDP: Int): Unit

Function Operations

Manage user-defined functions through the Hive client.

/**
 * Create a user-defined function
 * @param db Database name
 * @param func Function definition
 */
def createFunction(db: String, func: CatalogFunction): Unit

/**
 * Drop a user-defined function
 * @param db Database name
 * @param name Function name
 */
def dropFunction(db: String, name: String): Unit

/**
 * Rename a function
 * @param db Database name
 * @param oldName Current function name
 * @param newName New function name
 */
def renameFunction(db: String, oldName: String, newName: String): Unit

/**
 * Modify function metadata
 * @param db Database name
 * @param func Updated function definition
 */
def alterFunction(db: String, func: CatalogFunction): Unit

/**
 * Get function metadata
 * @param db Database name
 * @param funcName Function name
 * @return Function definition
 */
def getFunction(db: String, funcName: String): CatalogFunction

/**
 * Get function metadata optionally
 * @param db Database name
 * @param funcName Function name
 * @return Optional function definition
 */
def getFunctionOption(db: String, funcName: String): Option[CatalogFunction]

/**
 * Check if a function exists
 * @param db Database name
 * @param funcName Function name
 * @return True if function exists
 */
def functionExists(db: String, funcName: String): Boolean

/**
 * List functions matching a pattern
 * @param db Database name
 * @param pattern Pattern to match (glob pattern)
 * @return Sequence of matching function names
 */
def listFunctions(db: String, pattern: String): Seq[String]

Session Management

Manage client sessions and resources.

/**
 * Add JAR file to the Hive session
 * @param path Path to JAR file
 */
def addJar(path: String): Unit

/**
 * Create a new client session
 * @return New HiveClient instance
 */
def newSession(): HiveClient

/**
 * Execute code with Hive state context
 * @param f Function to execute
 * @return Result of function execution
 */
def withHiveState[A](f: => A): A

/**
 * Reset client state
 */
def reset(): Unit

Raw Hive Table Interface

Interface for accessing raw Hive table objects.

/**
 * Raw Hive table abstraction providing access to underlying Hive objects
 */
trait RawHiveTable {
  /**
   * Get the raw Hive table object
   * @return Raw Hive table
   */
  def rawTable: Object
  
  /**
   * Convert to Spark catalog table
   * @return Spark CatalogTable representation
   */
  def toCatalogTable: CatalogTable
  
  /**
   * Get Hive-specific table properties
   * @return Map of Hive table properties
   */
  def hiveTableProps(): Map[String, String]
}

Usage Example:

val rawTable = client.getRawHiveTable("my_db", "my_table")

// Access raw Hive object for advanced operations
val hiveTable = rawTable.rawTable
// Cast to appropriate Hive type for specific operations

// Convert to Spark representation
val catalogTable = rawTable.toCatalogTable
println(s"Table type: ${catalogTable.tableType}")

// Get Hive-specific properties
val hiveProps = rawTable.hiveTableProps()
hiveProps.foreach { case (key, value) =>
  println(s"$key = $value")
}

Version Management

Hive version abstraction and supported versions.

/**
 * Represents a Hive version with dependencies and exclusions
 * @param fullVersion Full version string (e.g., "2.3.10")
 * @param extraDeps Extra dependencies required for this version
 * @param exclusions Dependencies to exclude for this version
 */
abstract class HiveVersion(
  val fullVersion: String,
  val extraDeps: Seq[String] = Nil,
  val exclusions: Seq[String] = Nil
) extends Ordered[HiveVersion] {
  def compare(that: HiveVersion): Int
}

object hive {
  case object v2_0 extends HiveVersion("2.0.1")
  case object v2_1 extends HiveVersion("2.1.1")
  case object v2_2 extends HiveVersion("2.2.0")
  case object v2_3 extends HiveVersion("2.3.10")
  case object v3_0 extends HiveVersion("3.0.0")
  case object v3_1 extends HiveVersion("3.1.3")
  case object v4_0 extends HiveVersion("4.0.1")
  
  val allSupportedHiveVersions: Set[HiveVersion]
}

Usage Example:

import org.apache.spark.sql.hive.client.hive

// Check client version
val clientVersion = client.version
println(s"Using Hive version: ${clientVersion.fullVersion}")

// Compare versions
if (clientVersion >= hive.v3_0) {
  println("Using Hive 3.0 or later")
}

// List all supported versions
hive.allSupportedHiveVersions.foreach { version =>
  println(s"Supported: ${version.fullVersion}")
}

Client Implementation

The concrete implementation provided by Spark.

/**
 * Concrete implementation of HiveClient
 * Created through HiveUtils factory methods
 */
class HiveClientImpl extends HiveClient {
  // Implementation details are internal
}

Usage Example:

import org.apache.spark.sql.hive.HiveUtils

// Create client for execution context
val executionClient = HiveUtils.newClientForExecution(sparkConf, hadoopConf)

// Create client for metadata operations
val metadataClient = HiveUtils.newClientForMetadata(
  sparkConf, 
  hadoopConf, 
  Map("hive.metastore.uris" -> "thrift://localhost:9083")
)

// Use client for operations
val tables = metadataClient.listTables("default")
println(s"Tables in default database: ${tables.mkString(", ")}")

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-hive-2-13

docs

configuration.md

data-conversion.md

external-catalog.md

hive-client.md

index.md

udf-support.md

tile.json