tessl install tessl/maven-org-apache-spark--spark-hive@1.6.0Apache Spark SQL Hive integration module providing HiveContext, metastore operations, HiveQL parsing, and Hive data format compatibility
The Hive Client Interface provides an abstracted layer for interacting with different versions of Hive metastore, enabling compatibility across Hive versions 0.12.0 through 1.2.1. This interface handles all metastore operations including database, table, and partition management.
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.catalyst.expressions.Expression
import java.util.{Map => JMap}
import java.io.PrintStreamtrait ClientInterface {
def version: HiveVersion
def getConf(key: String, defaultValue: String): String
def currentDatabase: String
def newSession(): ClientInterface
def withHiveState[A](f: => A): A
def reset(): Unit
// SQL execution
def runSqlHive(sql: String): Seq[String]
def setOut(stream: PrintStream): Unit
def setInfo(stream: PrintStream): Unit
def setError(stream: PrintStream): Unit
// JAR and function management
def addJar(path: String): Unit
def loadFunction(functionName: String, className: String): Unit
}version - Returns the Hive version of this client instance
getConf - Retrieves Hive configuration value with fallback default
currentDatabase - Gets the name of the current active database
newSession - Creates a new session with separate state but shared configuration
withHiveState - Executes function with proper Hive ThreadLocal state
reset - Resets client to clean state
def getDatabase(name: String): HiveDatabase
def getDatabaseOption(name: String): Option[HiveDatabase]
def createDatabase(database: HiveDatabase): UnitgetDatabase - Retrieves database metadata, throws exception if not found
getDatabaseOption - Safely retrieves database metadata, returns None if not found
createDatabase - Creates new database in metastore
Usage Examples:
// Get database information
val dbInfo = client.getDatabase("sales_db")
println(s"Database location: ${dbInfo.location}")
// Safe database lookup
client.getDatabaseOption("analytics_db") match {
case Some(db) => println(s"Found database: ${db.name}")
case None => println("Database not found")
}
// Create new database
val newDb = HiveDatabase("test_db", "/warehouse/test_db")
client.createDatabase(newDb)def listTables(dbName: String): Seq[String]
def getTable(dbName: String, tableName: String): HiveTable
def getTableOption(dbName: String, tableName: String): Option[HiveTable]
def createTable(table: HiveTable): Unit
def alterTable(table: HiveTable): Unit
def dropTable(dbName: String, tableName: String): UnitlistTables - Returns list of table names in specified database
getTable - Retrieves table metadata, throws exception if not found
getTableOption - Safely retrieves table metadata
createTable - Creates new table in metastore
alterTable - Modifies existing table metadata
dropTable - Removes table from metastore
Usage Examples:
// List all tables in database
val tables = client.listTables("production")
tables.foreach(println)
// Get table schema
val table = client.getTable("production", "customers")
table.schema.foreach { col =>
println(s"${col.name}: ${col.hiveType}")
}
// Create new table
val newTable = HiveTable(
specifiedDatabase = Some("production"),
name = "new_table",
schema = Seq(
HiveColumn("id", "bigint", "Primary key"),
HiveColumn("name", "string", "Customer name")
),
partitionColumns = Seq.empty,
properties = Map("serialization.format" -> "1"),
tableType = ManagedTable
)
client.createTable(newTable)def createView(view: HiveTable): Unit
def alterView(view: HiveTable): UnitcreateView - Creates new view in metastore
alterView - Modifies existing view definition
def listPartitions(dbName: String, tableName: String): Seq[HivePartition]
def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
def getPartition(dbName: String, tableName: String, spec: Map[String, String]): HivePartition
def getPartitionOption(hTable: HiveTable, partitionSpec: JMap[String, String]): Option[HivePartition]
def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition]
def createPartitions(dbName: String, tableName: String, parts: Seq[HivePartition]): Unit
def dropPartition(dbName: String, tableName: String, spec: Map[String, String]): UnitlistPartitions - Returns all partitions for a table
getPartition - Retrieves specific partition metadata
getPartitionOption - Safely retrieves partition metadata
createPartitions - Creates multiple partitions atomically
dropPartition - Removes partition from table
Usage Examples:
// List all partitions
val partitions = client.listPartitions("sales", "orders")
partitions.foreach { p =>
println(s"Partition: ${p.values.mkString(",")}")
}
// Get specific partition
val partSpec = Map("year" -> "2023", "month" -> "12")
val partition = client.getPartition("sales", "orders", partSpec)
// Create new partitions
val newPartitions = Seq(
HivePartition(
values = Seq("2024", "01"),
storage = HiveStorageDescriptor(
location = "/warehouse/sales/orders/year=2024/month=01",
inputFormat = "org.apache.hadoop.mapred.TextInputFormat",
outputFormat = "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
serde = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
serdeProperties = Map("serialization.format" -> "1")
)
)
)
client.createPartitions("sales", "orders", newPartitions)def loadTable(
loadPath: String,
tableName: String,
replace: Boolean,
holdDDLTime: Boolean
): Unit
def loadPartition(
loadPath: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String],
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean
): Unit
def loadDynamicPartitions(
loadPath: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String],
replace: Boolean,
numDP: Int,
holdDDLTime: Boolean,
listBucketingEnabled: Boolean
): UnitloadTable - Loads data files into table
loadPartition - Loads data into specific partition
loadDynamicPartitions - Loads data with dynamic partitioning
Usage Examples:
// Load data into table
client.loadTable("/staging/new_data", "sales.orders", replace = true, holdDDLTime = false)
// Load data into specific partition
val partSpec = new java.util.LinkedHashMap[String, String]()
partSpec.put("year", "2024")
partSpec.put("month", "01")
client.loadPartition(
"/staging/jan_data",
"sales.orders",
partSpec,
replace = false,
holdDDLTime = false,
inheritTableSpecs = true,
isSkewedStoreAsSubdir = false
)def runSqlHive(sql: String): Seq[String]
def setOut(stream: PrintStream): Unit
def setInfo(stream: PrintStream): Unit
def setError(stream: PrintStream): UnitrunSqlHive - Executes raw HiveQL and returns results as strings
setOut/setInfo/setError - Redirects Hive output streams
Usage Examples:
// Execute DDL
val result = client.runSqlHive("SHOW TABLES")
result.foreach(println)
// Execute DML
client.runSqlHive("INSERT INTO staging.temp_table SELECT * FROM prod.source_table")
// Redirect output
import java.io.{PrintStream, ByteArrayOutputStream}
val outputStream = new ByteArrayOutputStream()
client.setOut(new PrintStream(outputStream))
client.runSqlHive("DESCRIBE FORMATTED my_table")
val description = outputStream.toStringdef addJar(path: String): Unit
def loadFunction(functionName: String, className: String): UnitaddJar - Adds JAR file to Hive classpath for UDFs and SerDes
loadFunction - Registers custom function with Hive
Usage Examples:
// Add custom UDF JAR
client.addJar("/path/to/custom-udfs.jar")
// Register custom function
client.loadFunction("my_custom_function", "com.company.hive.CustomUDF")case class HiveDatabase(
name: String,
location: String
)Represents Hive database metadata with name and warehouse location.
case class HiveTable(
specifiedDatabase: Option[String],
name: String,
schema: Seq[HiveColumn],
partitionColumns: Seq[HiveColumn],
properties: Map[String, String],
serdeProperties: Map[String, String],
tableType: TableType,
storage: HiveStorageDescriptor,
location: Option[String],
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
comment: Option[String]
)Complete table definition with schema, partitioning, storage, and SerDe information.
case class HiveColumn(
name: String,
hiveType: String,
comment: String
)Column definition with name, Hive data type, and optional comment.
case class HiveStorageDescriptor(
location: String,
inputFormat: String,
outputFormat: String,
serde: String,
serdeProperties: Map[String, String]
)Storage format specification including location, InputFormat, OutputFormat, and SerDe configuration.
case class HivePartition(
values: Seq[String],
storage: HiveStorageDescriptor
)Partition specification with partition values and storage descriptor.
abstract class TableType { val name: String }
case object ExternalTable extends TableType {
val name = "EXTERNAL_TABLE"
}
case object IndexTable extends TableType {
val name = "INDEX_TABLE"
}
case object ManagedTable extends TableType {
val name = "MANAGED_TABLE"
}
case object VirtualView extends TableType {
val name = "VIRTUAL_VIEW"
}class IsolatedClientLoader(
version: HiveVersion,
execJars: Seq[URL],
config: Map[String, String],
isolationOn: Boolean,
baseClassLoader: ClassLoader,
sharedPrefixes: Seq[String],
barrierPrefixes: Seq[String]
) {
def createClient(): ClientInterface
}
object IsolatedClientLoader {
def forVersion(
hiveMetastoreVersion: String,
hadoopVersion: String,
config: Map[String, String],
barrierPrefixes: Seq[String] = Seq.empty,
sharedPrefixes: Seq[String] = Seq.empty
): IsolatedClientLoader
def hiveVersion(version: String): HiveVersion
}IsolatedClientLoader - Creates isolated Hive client instances with different versions
forVersion - Factory method for creating version-specific loaders
hiveVersion - Parses version string to HiveVersion object
Usage Examples:
// Create client loader for specific version
val loader = IsolatedClientLoader.forVersion(
hiveMetastoreVersion = "1.2.1",
hadoopVersion = "2.7.3",
config = Map("hive.metastore.uris" -> "thrift://localhost:9083")
)
// Create client instance
val client = loader.createClient()
// Use client for metastore operations
val tables = client.listTables("default")abstract class HiveVersion(
val fullVersion: String,
val extraDeps: Seq[String] = Nil,
val exclusions: Seq[String] = Nil
) {
def name: String = fullVersion
}
object hive {
val v12: HiveVersion = new HiveVersion("0.12.0") // Hive 0.12.0
val v13: HiveVersion = new HiveVersion("0.13.1") // Hive 0.13.1
val v14: HiveVersion = new HiveVersion("0.14.0") // Hive 0.14.0
val v1_0: HiveVersion = new HiveVersion("1.0.0") // Hive 1.0.0
val v1_1: HiveVersion = new HiveVersion("1.1.0") // Hive 1.1.0
val v1_2: HiveVersion = new HiveVersion("1.2.1") // Hive 1.2.1 (default)
}The client interface supports multiple Hive versions with automatic dependency resolution and classloader isolation. The default metastore version is 1.2.1, but execution always uses 1.2.1 for consistency.