Database, table, and metadata management through the catalog interface. Provides programmatic access to metastore operations, table management, and schema inspection capabilities.
Main interface for catalog operations and metadata management.
/**
* Interface for catalog operations (databases, tables, functions, columns)
*/
trait Catalog {
/** Get current database name */
def currentDatabase(): String
/** Set current database */
def setCurrentDatabase(dbName: String): Unit
/** List all available databases */
def listDatabases(): Dataset[Database]
/** List tables in current database */
def listTables(): Dataset[Table]
/** List tables in specified database */
def listTables(dbName: String): Dataset[Table]
/** List columns for specified table */
def listColumns(tableName: String): Dataset[Column]
def listColumns(dbName: String, tableName: String): Dataset[Column]
/** List all available functions */
def listFunctions(): Dataset[Function]
/** List functions in specified database */
def listFunctions(dbName: String): Dataset[Function]
/** Check if database exists */
def databaseExists(dbName: String): Boolean
/** Check if table exists */
def tableExists(tableName: String): Boolean
def tableExists(dbName: String, tableName: String): Boolean
/** Check if function exists */
def functionExists(functionName: String): Boolean
def functionExists(dbName: String, functionName: String): Boolean
/** Get table metadata */
def getTable(tableName: String): Table
def getTable(dbName: String, tableName: String): Table
/** Get function metadata */
def getFunction(functionName: String): Function
def getFunction(dbName: String, functionName: String): Function
/** Cache table in memory */
def cacheTable(tableName: String): Unit
def cacheTable(tableName: String, storageLevel: StorageLevel): Unit
/** Uncache table from memory */
def uncacheTable(tableName: String): Unit
/** Check if table is cached */
def isCached(tableName: String): Boolean
/** Clear all cached tables */
def clearCache(): Unit
/** Refresh table metadata */
def refreshTable(tableName: String): Unit
/** Refresh function metadata */
def refreshFunction(functionName: String): Unit
/** Create database */
def createDatabase(dbName: String, ignoreIfExists: Boolean): Unit
def createDatabase(dbName: String, ignoreIfExists: Boolean, path: String): Unit
/** Drop database */
def dropDatabase(dbName: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/** Create table */
def createTable(tableName: String, path: String): DataFrame
def createTable(tableName: String, path: String, source: String): DataFrame
def createTable(tableName: String, source: String, options: Map[String, String]): DataFrame
def createTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame
/** Drop table */
def dropTempView(viewName: String): Boolean
def dropGlobalTempView(viewName: String): Boolean
/** Recover partitions for table */
def recoverPartitions(tableName: String): Unit
}Represents database information in the catalog.
/**
* Database metadata from the catalog
*/
case class Database(
name: String, // Database name
description: String, // Database description
locationUri: String // Database location URI
)Represents table information in the catalog.
/**
* Table metadata from the catalog
*/
case class Table(
name: String, // Table name
database: String, // Database name
description: String, // Table description
tableType: String, // Table type (MANAGED, EXTERNAL, VIEW, etc.)
isTemporary: Boolean // Whether table is temporary
)Represents column information in the catalog.
/**
* Column metadata from the catalog
*/
case class Column(
name: String, // Column name
description: String, // Column description
dataType: String, // Column data type
nullable: Boolean, // Whether column can be null
isPartition: Boolean, // Whether column is partition key
isBucket: Boolean // Whether column is bucket key
)Represents function information in the catalog.
/**
* Function metadata from the catalog
*/
case class Function(
name: String, // Function name
database: String, // Database name
description: String, // Function description
className: String, // Implementation class name
isTemporary: Boolean // Whether function is temporary
)Usage Examples:
val catalog = spark.catalog
// Database operations
println(s"Current database: ${catalog.currentDatabase()}")
val databases = catalog.listDatabases().collect()
databases.foreach(db => println(s"Database: ${db.name} at ${db.locationUri}"))
catalog.setCurrentDatabase("my_database")
// Table operations
val tables = catalog.listTables().collect()
tables.foreach(table =>
println(s"Table: ${table.name} (${table.tableType}) in ${table.database}")
)
// Check existence
if (catalog.tableExists("employees")) {
val tableInfo = catalog.getTable("employees")
println(s"Table type: ${tableInfo.tableType}")
// Get column information
val columns = catalog.listColumns("employees").collect()
columns.foreach(col =>
println(s"Column: ${col.name} (${col.dataType}, nullable: ${col.nullable})")
)
}
// Function operations
val functions = catalog.listFunctions().collect()
functions.foreach(func =>
println(s"Function: ${func.name} (${func.className})")
)
// Caching operations
catalog.cacheTable("frequently_used_table")
println(s"Table cached: ${catalog.isCached("frequently_used_table")}")
catalog.uncacheTable("frequently_used_table")Advanced table management and DDL operations.
Creating and dropping tables:
// Create external table
catalog.createTable(
tableName = "external_data",
path = "s3://bucket/path/to/data",
source = "parquet"
)
// Create table with schema and options
val schema = StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("created_date", DateType, nullable = true)
))
catalog.createTable(
tableName = "structured_data",
source = "delta",
schema = schema,
options = Map(
"path" -> "s3://bucket/delta/table",
"overwriteSchema" -> "true"
)
)
// Create database
catalog.createDatabase("analytics", ignoreIfExists = true)
catalog.createDatabase("data_lake", ignoreIfExists = true, path = "s3://bucket/databases/data_lake")
// Drop database (cascade removes all tables)
catalog.dropDatabase("old_database", ignoreIfNotExists = true, cascade = true)
// Create temporary views
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW active_users AS
SELECT * FROM users WHERE status = 'active'
""")
// Drop temporary views
catalog.dropTempView("active_users")Table caching for improved query performance.
import org.apache.spark.storage.StorageLevel
// Cache with default storage level (MEMORY_AND_DISK)
catalog.cacheTable("hot_data")
// Cache with specific storage level
catalog.cacheTable("lookup_table", StorageLevel.MEMORY_ONLY)
// Check cache status
val cachedTables = catalog.listTables()
.filter(_.isTemporary == false)
.collect()
.filter(table => catalog.isCached(table.name))
cachedTables.foreach(table =>
println(s"Cached table: ${table.name}")
)
// Clear specific table from cache
catalog.uncacheTable("hot_data")
// Clear all cached tables
catalog.clearCache()
// Refresh table metadata after external changes
catalog.refreshTable("external_table")Working with partitioned tables.
// Recover partitions for external tables
catalog.recoverPartitions("partitioned_external_table")
// List partitions (using SQL for detailed partition info)
val partitions = spark.sql("SHOW PARTITIONS partitioned_table").collect()
partitions.foreach(row => println(s"Partition: ${row.getString(0)}"))
// Add specific partitions
spark.sql("""
ALTER TABLE partitioned_table
ADD PARTITION (year=2023, month=12)
LOCATION 's3://bucket/data/year=2023/month=12'
""")
// Drop partition
spark.sql("""
ALTER TABLE partitioned_table
DROP PARTITION (year=2022, month=01)
""")Complex metadata queries and analysis.
// Find large tables
val largeTables = catalog.listTables()
.select("name", "database", "tableType")
.collect()
.filter(_.getString(2) != "VIEW") // Exclude views
// Analyze table schemas
def analyzeTableSchema(tableName: String): Unit = {
val columns = catalog.listColumns(tableName).collect()
println(s"Schema analysis for $tableName:")
println(s"Total columns: ${columns.length}")
val partitionCols = columns.filter(_.isPartition)
if (partitionCols.nonEmpty) {
println(s"Partition columns: ${partitionCols.map(_.name).mkString(", ")}")
}
val bucketCols = columns.filter(_.isBucket)
if (bucketCols.nonEmpty) {
println(s"Bucket columns: ${bucketCols.map(_.name).mkString(", ")}")
}
val dataTypes = columns.groupBy(_.dataType).mapValues(_.length)
println("Data type distribution:")
dataTypes.foreach { case (dataType, count) =>
println(s" $dataType: $count columns")
}
}
// Find tables with specific patterns
val userTables = catalog.listTables()
.filter(col("name").like("%user%"))
.collect()
// Cross-database analysis
val allDatabases = catalog.listDatabases().collect()
allDatabases.foreach { db =>
val tableCount = catalog.listTables(db.name).count()
println(s"Database ${db.name}: $tableCount tables")
}Working with Hive metastore and external catalog systems.
// Enable Hive support for metastore integration
val spark = SparkSession.builder()
.appName("Catalog Operations")
.enableHiveSupport()
.getOrCreate()
// Set Hive metastore location
spark.conf.set("spark.sql.warehouse.dir", "/path/to/warehouse")
spark.conf.set("hive.metastore.uris", "thrift://metastore-host:9083")
// Create Hive-managed table
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_table (
id BIGINT,
name STRING,
created_date DATE
)
USING HIVE
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET
""")
// Register external table in Hive metastore
spark.sql("""
CREATE TABLE external_parquet_table (
id BIGINT,
name STRING,
value DOUBLE
)
USING PARQUET
LOCATION 's3://bucket/path/to/parquet'
""")
// Work with Hive databases
spark.sql("CREATE DATABASE IF NOT EXISTS hive_db")
catalog.setCurrentDatabase("hive_db")
// MSCK repair for Hive tables
spark.sql("MSCK REPAIR TABLE partitioned_hive_table")Helper functions for common catalog operations.
object CatalogUtils {
def tableExists(spark: SparkSession, dbName: String, tableName: String): Boolean = {
try {
spark.catalog.getTable(dbName, tableName)
true
} catch {
case _: AnalysisException => false
}
}
def createDatabaseIfNotExists(spark: SparkSession, dbName: String, location: Option[String] = None): Unit = {
if (!spark.catalog.databaseExists(dbName)) {
location match {
case Some(path) => spark.catalog.createDatabase(dbName, ignoreIfExists = true, path)
case None => spark.catalog.createDatabase(dbName, ignoreIfExists = true)
}
println(s"Created database: $dbName")
} else {
println(s"Database already exists: $dbName")
}
}
def getTableStatistics(spark: SparkSession, tableName: String): Map[String, Any] = {
val table = spark.catalog.getTable(tableName)
val columns = spark.catalog.listColumns(tableName).collect()
Map(
"tableName" -> table.name,
"database" -> table.database,
"tableType" -> table.tableType,
"isTemporary" -> table.isTemporary,
"columnCount" -> columns.length,
"partitionColumns" -> columns.filter(_.isPartition).map(_.name),
"bucketColumns" -> columns.filter(_.isBucket).map(_.name),
"dataTypes" -> columns.map(_.dataType).distinct
)
}
def copyTableSchema(spark: SparkSession, sourceTable: String, targetTable: String): Unit = {
val sourceColumns = spark.catalog.listColumns(sourceTable).collect()
val schema = StructType(sourceColumns.map { col =>
val dataType = DataType.fromDDL(col.dataType)
StructField(col.name, dataType, col.nullable)
})
// Create empty table with same schema
val emptyDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
emptyDf.write.saveAsTable(targetTable)
}
}
// Usage examples
CatalogUtils.createDatabaseIfNotExists(spark, "analytics", Some("s3://bucket/analytics"))
val stats = CatalogUtils.getTableStatistics(spark, "employees")
println(s"Table statistics: $stats")
if (CatalogUtils.tableExists(spark, "default", "source_table")) {
CatalogUtils.copyTableSchema(spark, "source_table", "target_table")
}