The Spark SQL Catalog provides a programmatic interface for managing databases, tables, functions, and cached data. It serves as the central metadata repository and enables runtime introspection of the Spark SQL environment.
class Catalog {
// Database operations
def currentDatabase: String
def setCurrentDatabase(dbName: String): Unit
def listDatabases(): Dataset[Database]
def databaseExists(dbName: String): Boolean
def getDatabase(dbName: String): Database
// Table operations
def listTables(): Dataset[Table]
def listTables(dbName: String): Dataset[Table]
def tableExists(tableName: String): Boolean
def tableExists(dbName: String, tableName: String): Boolean
def getTable(tableName: String): Table
def getTable(dbName: String, tableName: String): Table
def listColumns(tableName: String): Dataset[Column]
def listColumns(dbName: String, tableName: String): Dataset[Column]
// Function operations
def listFunctions(): Dataset[Function]
def listFunctions(dbName: String): Dataset[Function]
def functionExists(functionName: String): Boolean
def functionExists(dbName: String, functionName: String): Boolean
def getFunction(functionName: String): Function
def getFunction(dbName: String, functionName: String): Function
// Temporary view operations
def dropTempView(viewName: String): Boolean
def dropGlobalTempView(viewName: String): Boolean
// Table management (Experimental)
def createTable(tableName: String): DataFrameWriter[Row]
def createTable(tableName: String, path: String): DataFrameWriter[Row]
def createTable(tableName: String, source: String): DataFrameWriter[Row]
def createTable(tableName: String, path: String, source: String): DataFrameWriter[Row]
// Maintenance operations
def recoverPartitions(tableName: String): Unit
// Caching operations
def isCached(tableName: String): Boolean
def cacheTable(tableName: String): Unit
def cacheTable(tableName: String, storageLevel: StorageLevel): Unit
def uncacheTable(tableName: String): Unit
def clearCache(): Unit
def refreshTable(tableName: String): Unit
def refreshByPath(path: String): Unit
}Usage Examples:
val catalog = spark.catalog
// Get current database
val currentDb = catalog.currentDatabase
println(s"Current database: $currentDb")
// Change database
catalog.setCurrentDatabase("my_database")
// List all databases
val databases = catalog.listDatabases()
databases.show()
// Check if database exists
val exists = catalog.databaseExists("production")
if (!exists) {
spark.sql("CREATE DATABASE production")
}
// Get database information
val dbInfo = catalog.getDatabase("production")
println(s"Database: ${dbInfo.name}, Description: ${dbInfo.description}")case class Database(
name: String,
description: String,
locationUri: String
)Usage Examples:
// Inspect database details
catalog.listDatabases().collect().foreach { db =>
println(s"Database: ${db.name}")
println(s"Description: ${db.description}")
println(s"Location: ${db.locationUri}")
println("---")
}
// Filter databases
val prodDatabases = catalog.listDatabases()
.filter(col("name").like("%prod%"))
.select("name", "description")
.show()case class Table(
name: String,
database: String,
description: String,
tableType: String,
isTemporary: Boolean
)Usage Examples:
// List all tables in current database
val tables = catalog.listTables()
tables.show()
// List tables in specific database
val prodTables = catalog.listTables("production")
prodTables.filter(col("tableType") === "MANAGED").show()
// Check if table exists
val tableExists = catalog.tableExists("users")
val specificExists = catalog.tableExists("production", "sales")
// Get table information
val tableInfo = catalog.getTable("users")
println(s"Table: ${tableInfo.name}")
println(s"Database: ${tableInfo.database}")
println(s"Type: ${tableInfo.tableType}")
println(s"Is Temporary: ${tableInfo.isTemporary}")
// List all temporary tables
catalog.listTables()
.filter(col("isTemporary") === true)
.select("name", "database")
.show()case class Column(
name: String,
description: String,
dataType: String,
nullable: Boolean,
isPartition: Boolean,
isBucket: Boolean
)Usage Examples:
// List columns for a table
val columns = catalog.listColumns("users")
columns.show()
// Get detailed column information
columns.collect().foreach { col =>
println(s"Column: ${col.name}")
println(s"Type: ${col.dataType}")
println(s"Nullable: ${col.nullable}")
println(s"Partition: ${col.isPartition}")
println(s"Bucket: ${col.isBucket}")
println("---")
}
// Find partition columns
val partitionCols = catalog.listColumns("partitioned_table")
.filter(col("isPartition") === true)
.select("name", "dataType")
.show()
// Analyze table schema
def analyzeTableSchema(tableName: String): Unit = {
val columns = catalog.listColumns(tableName)
println(s"Schema for table: $tableName")
println("=" * 50)
columns.collect().foreach { col =>
val nullable = if (col.nullable) "NULL" else "NOT NULL"
val partition = if (col.isPartition) " (PARTITION)" else ""
val bucket = if (col.isBucket) " (BUCKET)" else ""
println(f"${col.name}%-20s ${col.dataType}%-15s $nullable$partition$bucket")
}
}
analyzeTableSchema("my_table")case class Function(
name: String,
database: String,
description: String,
className: String,
isTemporary: Boolean
)Usage Examples:
// List all functions
val functions = catalog.listFunctions()
functions.show()
// List functions in specific database
val dbFunctions = catalog.listFunctions("my_database")
dbFunctions.show()
// Check if function exists
val funcExists = catalog.functionExists("my_udf")
val specificFuncExists = catalog.functionExists("my_database", "custom_agg")
// Get function information
val funcInfo = catalog.getFunction("upper")
println(s"Function: ${funcInfo.name}")
println(s"Database: ${funcInfo.database}")
println(s"Class: ${funcInfo.className}")
println(s"Is Temporary: ${funcInfo.isTemporary}")
// Filter user-defined functions
val udfs = catalog.listFunctions()
.filter(col("isTemporary") === true)
.select("name", "description")
.show()
// Categorize functions
def categorizeFunction(func: Function): String = {
if (func.isTemporary) "UDF"
else if (func.className.startsWith("org.apache.spark.sql.catalyst.expressions")) "Built-in"
else "System"
}
val categorized = catalog.listFunctions().collect().groupBy(categorizeFunction)
categorized.foreach { case (category, funcs) =>
println(s"$category functions: ${funcs.length}")
}Usage Examples:
// Create temporary views
df.createOrReplaceTempView("temp_users")
df.createGlobalTempView("global_temp_data")
// List temporary tables
val tempTables = catalog.listTables()
.filter(col("isTemporary") === true)
.show()
// Drop temporary views
val dropped = catalog.dropTempView("temp_users")
println(s"View dropped: $dropped")
val globalDropped = catalog.dropGlobalTempView("global_temp_data")
println(s"Global view dropped: $globalDropped")
// Manage temporary views
def cleanupTempViews(): Unit = {
val tempViews = catalog.listTables()
.filter(col("isTemporary") === true)
.select("name")
.collect()
.map(_.getString(0))
tempViews.foreach { viewName =>
val success = catalog.dropTempView(viewName)
println(s"Dropped temp view $viewName: $success")
}
}Usage Examples:
import org.apache.spark.storage.StorageLevel
// Cache a table
catalog.cacheTable("frequently_used_table")
// Cache with specific storage level
catalog.cacheTable("large_table", StorageLevel.MEMORY_AND_DISK_SER)
// Check if table is cached
val isCached = catalog.isCached("my_table")
println(s"Table cached: $isCached")
// Uncache a table
catalog.uncacheTable("my_table")
// Clear all cached tables
catalog.clearCache()
// Cache management workflow
def manageCaching(tableName: String): Unit = {
if (!catalog.isCached(tableName)) {
println(s"Caching table: $tableName")
catalog.cacheTable(tableName)
} else {
println(s"Table $tableName is already cached")
}
}
// Cache frequently accessed tables
val frequentTables = Seq("dim_users", "fact_sales", "lookup_regions")
frequentTables.foreach(manageCaching)// Monitor cached tables
def showCachedTables(): Unit = {
val allTables = catalog.listTables().collect()
println("Cached Tables:")
println("=" * 40)
allTables.foreach { table =>
val cached = catalog.isCached(table.name)
if (cached) {
println(s"${table.database}.${table.name}")
}
}
}
showCachedTables()
// Refresh cached table metadata
catalog.refreshTable("my_cached_table")
// Refresh by file path (for file-based tables)
catalog.refreshByPath("/path/to/parquet/files")Usage Examples:
// Create table from DataFrame (Experimental API)
val writer = catalog.createTable("new_table")
df.write
.mode("overwrite")
.option("path", "/path/to/table")
.saveAsTable("new_table")
// Create external table
val externalWriter = catalog.createTable("external_table", "/external/path")
df.write
.mode("overwrite")
.format("parquet")
.save("/external/path")
// Create table with specific format
val formatWriter = catalog.createTable("json_table", "json")
df.write
.mode("overwrite")
.format("json")
.saveAsTable("json_table")
// Create partitioned table using SQL
spark.sql("""
CREATE TABLE partitioned_sales (
id BIGINT,
amount DOUBLE,
customer_id STRING,
sale_date DATE
)
USING PARQUET
PARTITIONED BY (year INT, month INT)
LOCATION '/data/sales'
""")// Recover partitions for external tables
catalog.recoverPartitions("external_partitioned_table")
// Workflow for partition management
def managePartitions(tableName: String): Unit = {
println(s"Recovering partitions for: $tableName")
try {
catalog.recoverPartitions(tableName)
println("Partition recovery completed successfully")
// Refresh the table to update metadata
catalog.refreshTable(tableName)
println("Table metadata refreshed")
} catch {
case e: Exception =>
println(s"Partition recovery failed: ${e.getMessage}")
}
}
managePartitions("my_partitioned_table")// Comprehensive metadata inspection
def inspectCatalog(): Unit = {
println("=== CATALOG INSPECTION ===")
// Current context
println(s"Current Database: ${catalog.currentDatabase}")
println()
// Database summary
val databases = catalog.listDatabases().collect()
println(s"Total Databases: ${databases.length}")
databases.foreach(db => println(s" - ${db.name}: ${db.description}"))
println()
// Table summary
val tables = catalog.listTables().collect()
val managedTables = tables.count(_.tableType == "MANAGED")
val externalTables = tables.count(_.tableType == "EXTERNAL")
val tempTables = tables.count(_.isTemporary)
println(s"Total Tables: ${tables.length}")
println(s" - Managed: $managedTables")
println(s" - External: $externalTables")
println(s" - Temporary: $tempTables")
println()
// Function summary
val functions = catalog.listFunctions().collect()
val builtinFuncs = functions.count(!_.isTemporary)
val udfs = functions.count(_.isTemporary)
println(s"Total Functions: ${functions.length}")
println(s" - Built-in: $builtinFuncs")
println(s" - UDFs: $udfs")
println()
// Cache summary
val cachedTables = tables.count(table => catalog.isCached(table.name))
println(s"Cached Tables: $cachedTables")
}
inspectCatalog()// Find table dependencies
def findTableDependencies(tableName: String): Unit = {
try {
val table = catalog.getTable(tableName)
println(s"Table: ${table.name}")
println(s"Database: ${table.database}")
println(s"Type: ${table.tableType}")
// Get columns with their characteristics
val columns = catalog.listColumns(tableName).collect()
val partitionCols = columns.filter(_.isPartition).map(_.name)
val bucketCols = columns.filter(_.isBucket).map(_.name)
if (partitionCols.nonEmpty) {
println(s"Partitioned by: ${partitionCols.mkString(", ")}")
}
if (bucketCols.nonEmpty) {
println(s"Bucketed by: ${bucketCols.mkString(", ")}")
}
// Check if cached
if (catalog.isCached(tableName)) {
println("Status: CACHED")
}
} catch {
case e: Exception =>
println(s"Error inspecting table $tableName: ${e.getMessage}")
}
}
// Usage
findTableDependencies("my_important_table")// Create database
spark.sql("CREATE DATABASE IF NOT EXISTS analytics")
catalog.setCurrentDatabase("analytics")
// Verify creation
if (catalog.databaseExists("analytics")) {
println("Analytics database created successfully")
}
// Create managed table
spark.sql("""
CREATE TABLE user_analytics (
user_id STRING,
session_count BIGINT,
total_duration DOUBLE,
last_activity TIMESTAMP
)
USING DELTA
TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true')
""")
// Verify table creation
if (catalog.tableExists("user_analytics")) {
val table = catalog.getTable("user_analytics")
println(s"Created table: ${table.name} (${table.tableType})")
// Show schema
catalog.listColumns("user_analytics").show()
}
// Create view
spark.sql("""
CREATE VIEW active_users AS
SELECT user_id, session_count
FROM user_analytics
WHERE last_activity > current_date() - INTERVAL 30 DAYS
""")
// List all objects in the database
println("Tables and Views:")
catalog.listTables("analytics").show()// Safe catalog operations with error handling
def safeTableOperation(tableName: String)(operation: => Unit): Unit = {
try {
if (catalog.tableExists(tableName)) {
operation
} else {
println(s"Table $tableName does not exist")
}
} catch {
case e: Exception =>
println(s"Error operating on table $tableName: ${e.getMessage}")
}
}
// Safe caching
def safeCacheTable(tableName: String): Unit = {
safeTableOperation(tableName) {
if (!catalog.isCached(tableName)) {
catalog.cacheTable(tableName)
println(s"Cached table: $tableName")
} else {
println(s"Table $tableName is already cached")
}
}
}
// Safe cleanup
def safeCleanup(tempViewName: String): Unit = {
try {
val dropped = catalog.dropTempView(tempViewName)
if (dropped) {
println(s"Dropped temporary view: $tempViewName")
} else {
println(s"Temporary view $tempViewName was not found")
}
} catch {
case e: Exception =>
println(s"Error dropping view $tempViewName: ${e.getMessage}")
}
}
// Batch operations with error handling
def batchTableOperations(tableNames: Seq[String]): Unit = {
tableNames.foreach { tableName =>
safeTableOperation(tableName) {
// Refresh and cache frequently used tables
catalog.refreshTable(tableName)
catalog.cacheTable(tableName)
println(s"Processed: $tableName")
}
}
}