Complete Hive metastore integration providing database, table, partition, and function operations through the Spark catalog interface. The HiveExternalCatalog serves as the primary bridge between Spark's catalog system and Hive's metastore.
Main implementation of Spark's ExternalCatalog interface backed by Hive metastore.
/**
* Hive-backed external catalog implementation for metastore operations
* @param conf Spark configuration
* @param hadoopConf Hadoop configuration
*/
class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration) extends ExternalCatalog {
lazy val client: HiveClient
}Usage Example:
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.SparkConf
import org.apache.hadoop.conf.Configuration
val conf = new SparkConf()
val hadoopConf = new Configuration()
val catalog = new HiveExternalCatalog(conf, hadoopConf)
// Access the underlying Hive client
val hiveClient = catalog.clientManage Hive databases through the external catalog interface.
/**
* Create a new database in the Hive metastore
* @param dbDefinition Database definition with metadata
* @param ignoreIfExists If true, don't fail if database already exists
*/
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
/**
* Drop a database from the Hive metastore
* @param db Database name
* @param ignoreIfNotExists If true, don't fail if database doesn't exist
* @param cascade If true, drop all tables in the database
*/
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/**
* Modify an existing database's metadata
* @param dbDefinition Updated database definition
*/
def alterDatabase(dbDefinition: CatalogDatabase): Unit
/**
* Get database metadata
* @param db Database name
* @return Database definition
*/
def getDatabase(db: String): CatalogDatabase
/**
* Check if a database exists
* @param db Database name
* @return True if database exists
*/
def databaseExists(db: String): Boolean
/**
* List all databases
* @return Sequence of database names
*/
def listDatabases(): Seq[String]
/**
* List databases matching a pattern
* @param pattern Pattern to match (SQL LIKE pattern)
* @return Sequence of matching database names
*/
def listDatabases(pattern: String): Seq[String]
/**
* Set the current database
* @param db Database name to set as current
*/
def setCurrentDatabase(db: String): UnitUsage Examples:
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
// Create a database
val dbDef = CatalogDatabase(
name = "my_database",
description = "Test database",
locationUri = new URI("hdfs://namenode:9000/user/hive/warehouse/my_database.db"),
properties = Map.empty
)
catalog.createDatabase(dbDef, ignoreIfExists = true)
// List databases
val databases = catalog.listDatabases()
println(s"Available databases: ${databases.mkString(", ")}")
// Get database info
val dbInfo = catalog.getDatabase("my_database")
println(s"Database location: ${dbInfo.locationUri}")Comprehensive table management including creation, modification, and metadata access.
/**
* Create a new table in the Hive metastore
* @param tableDefinition Complete table definition
* @param ignoreIfExists If true, don't fail if table already exists
*/
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
/**
* Drop a table from the Hive metastore
* @param db Database name
* @param table Table name
* @param ignoreIfNotExists If true, don't fail if table doesn't exist
* @param purge If true, delete table data immediately
*/
def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
/**
* Rename a table
* @param db Database name
* @param oldName Current table name
* @param newName New table name
*/
def renameTable(db: String, oldName: String, newName: String): Unit
/**
* Modify table metadata
* @param tableDefinition Updated table definition
*/
def alterTable(tableDefinition: CatalogTable): Unit
/**
* Alter table's data schema
* @param db Database name
* @param table Table name
* @param newDataSchema New schema structure
*/
def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit
/**
* Update table statistics
* @param db Database name
* @param table Table name
* @param stats Optional statistics to set
*/
def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit
/**
* Get table metadata
* @param db Database name
* @param table Table name
* @return Complete table definition
*/
def getTable(db: String, table: String): CatalogTable
/**
* Get multiple tables by name
* @param db Database name
* @param tables Sequence of table names
* @return Sequence of table definitions
*/
def getTablesByName(db: String, tables: Seq[String]): Seq[CatalogTable]
/**
* Check if a table exists
* @param db Database name
* @param table Table name
* @return True if table exists
*/
def tableExists(db: String, table: String): Boolean
/**
* List all tables in a database
* @param db Database name
* @return Sequence of table names
*/
def listTables(db: String): Seq[String]
/**
* List tables matching a pattern
* @param db Database name
* @param pattern Pattern to match (SQL LIKE pattern)
* @return Sequence of matching table names
*/
def listTables(db: String, pattern: String): Seq[String]
/**
* List all views in a database
* @param db Database name
* @return Sequence of view names
*/
def listViews(db: String): Seq[String]
/**
* List views matching a pattern
* @param db Database name
* @param pattern Pattern to match (SQL LIKE pattern)
* @return Sequence of matching view names
*/
def listViews(db: String, pattern: String): Seq[String]Usage Examples:
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.types._
// Create a table
val schema = StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("created_at", TimestampType, nullable = true)
))
val tableDef = CatalogTable(
identifier = TableIdentifier("users", Some("my_database")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(
inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"),
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
),
schema = schema
)
catalog.createTable(tableDef, ignoreIfExists = true)
// List tables
val tables = catalog.listTables("my_database")
println(s"Tables: ${tables.mkString(", ")}")
// Get table info
val tableInfo = catalog.getTable("my_database", "users")
println(s"Table schema: ${tableInfo.schema}")Load data into Hive tables and partitions.
/**
* Load data into a table
* @param db Database name
* @param table Table name
* @param loadPath Path to data files
* @param isOverwrite If true, overwrite existing data
* @param isSrcLocal If true, source is local filesystem
*/
def loadTable(db: String, table: String, loadPath: String,
isOverwrite: Boolean, isSrcLocal: Boolean): Unit
/**
* Load data into a partition
* @param db Database name
* @param table Table name
* @param loadPath Path to data files
* @param partition Partition specification
* @param isOverwrite If true, overwrite existing data
* @param inheritTableSpecs If true, inherit table specifications
* @param isSrcLocal If true, source is local filesystem
*/
def loadPartition(db: String, table: String, loadPath: String,
partition: TablePartitionSpec, isOverwrite: Boolean,
inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit
/**
* Load data into dynamic partitions
* @param db Database name
* @param table Table name
* @param loadPath Path to data files
* @param partition Partition specification
* @param replace If true, replace existing partitions
* @param numDP Number of dynamic partitions
*/
def loadDynamicPartitions(db: String, table: String, loadPath: String,
partition: TablePartitionSpec, replace: Boolean,
numDP: Int): UnitManage table partitions including creation, modification, and querying.
/**
* Create table partitions
* @param db Database name
* @param table Table name
* @param parts Sequence of partition definitions
* @param ignoreIfExists If true, don't fail if partitions already exist
*/
def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit
/**
* Drop table partitions
* @param db Database name
* @param table Table name
* @param parts Sequence of partition specifications
* @param ignoreIfNotExists If true, don't fail if partitions don't exist
* @param purge If true, delete partition data immediately
* @param retainData If true, keep partition data files
*/
def dropPartitions(db: String, table: String, parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit
/**
* Rename table partitions
* @param db Database name
* @param table Table name
* @param specs Current partition specifications
* @param newSpecs New partition specifications
*/
def renamePartitions(db: String, table: String, specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit
/**
* Modify partition metadata
* @param db Database name
* @param table Table name
* @param parts Updated partition definitions
*/
def alterPartitions(db: String, table: String, parts: Seq[CatalogTablePartition]): Unit
/**
* Get partition metadata
* @param db Database name
* @param table Table name
* @param spec Partition specification
* @return Partition definition
*/
def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
/**
* Get partition metadata optionally
* @param db Database name
* @param table Table name
* @param spec Partition specification
* @return Optional partition definition
*/
def getPartitionOption(db: String, table: String, spec: TablePartitionSpec): Option[CatalogTablePartition]
/**
* List partition names
* @param db Database name
* @param table Table name
* @param partialSpec Optional partial partition specification for filtering
* @return Sequence of partition names
*/
def listPartitionNames(db: String, table: String,
partialSpec: Option[TablePartitionSpec]): Seq[String]
/**
* List partitions
* @param db Database name
* @param table Table name
* @param partialSpec Optional partial partition specification for filtering
* @return Sequence of partition definitions
*/
def listPartitions(db: String, table: String,
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition]
/**
* List partitions matching filter predicates
* @param db Database name
* @param table Table name
* @param predicates Filter expressions
* @param defaultTimeZoneId Default timezone for date/time operations
* @return Sequence of matching partition definitions
*/
def listPartitionsByFilter(db: String, table: String, predicates: Seq[Expression],
defaultTimeZoneId: String): Seq[CatalogTablePartition]Manage Hive user-defined functions in the metastore.
/**
* Create a user-defined function
* @param db Database name
* @param funcDefinition Function definition
*/
def createFunction(db: String, funcDefinition: CatalogFunction): Unit
/**
* Drop a user-defined function
* @param db Database name
* @param funcName Function name
*/
def dropFunction(db: String, funcName: String): Unit
/**
* Modify function metadata
* @param db Database name
* @param funcDefinition Updated function definition
*/
def alterFunction(db: String, funcDefinition: CatalogFunction): Unit
/**
* Rename a function
* @param db Database name
* @param oldName Current function name
* @param newName New function name
*/
def renameFunction(db: String, oldName: String, newName: String): Unit
/**
* Get function metadata
* @param db Database name
* @param funcName Function name
* @return Function definition
*/
def getFunction(db: String, funcName: String): CatalogFunction
/**
* Check if a function exists
* @param db Database name
* @param funcName Function name
* @return True if function exists
*/
def functionExists(db: String, funcName: String): Boolean
/**
* List functions matching a pattern
* @param db Database name
* @param pattern Pattern to match (SQL LIKE pattern)
* @return Sequence of matching function names
*/
def listFunctions(db: String, pattern: String): Seq[String]Utility methods and constants for working with Hive tables.
object HiveExternalCatalog {
// Metadata key constants
val SPARK_SQL_PREFIX: String
val DATASOURCE_PREFIX: String
val DATASOURCE_PROVIDER: String
val DATASOURCE_SCHEMA: String
val STATISTICS_PREFIX: String
val STATISTICS_TOTAL_SIZE: String
val STATISTICS_NUM_ROWS: String
val TABLE_PARTITION_PROVIDER: String
val CREATED_SPARK_VERSION: String
val EMPTY_DATA_SCHEMA: StructType
/**
* Check if a table is a data source table
* @param table Table definition
* @return True if table uses Spark data source format
*/
def isDatasourceTable(table: CatalogTable): Boolean
/**
* Check if a data type is compatible with Hive
* @param dt Data type to check
* @return True if type is Hive-compatible
*/
def isHiveCompatibleDataType(dt: DataType): Boolean
}Important: HiveExternalCatalog is not thread-safe and requires external synchronization when accessed from multiple threads concurrently. Spark's catalog implementations typically handle this synchronization automatically.