abstract class Catalog {
// Current database operations
def currentDatabase: String
def setCurrentDatabase(dbName: String): Unit
// Database management
def listDatabases(): Dataset[Database]
def listDatabases(pattern: String): Dataset[Database]
def databaseExists(dbName: String): Boolean
def getDatabase(dbName: String): Database
def createDatabase(dbName: String, description: String, location: String): Unit
def dropDatabase(dbName: String): Unit
def dropDatabase(dbName: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
// Table management
def listTables(): Dataset[Table]
def listTables(dbName: String): Dataset[Table]
def listTables(dbName: String, pattern: String): Dataset[Table]
def getTable(tableName: String): Table
def getTable(dbName: String, tableName: String): Table
def tableExists(tableName: String): Boolean
def tableExists(dbName: String, tableName: String): Boolean
def createTable(tableName: String, path: String): DataFrame
def createTable(tableName: String, path: String, source: String): DataFrame
def createTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame
def dropTempView(viewName: String): Boolean
def dropGlobalTempView(viewName: String): Boolean
// Column information
def listColumns(tableName: String): Dataset[Column]
def listColumns(dbName: String, tableName: String): Dataset[Column]
// Function management
def listFunctions(): Dataset[Function]
def listFunctions(dbName: String): Dataset[Function]
def listFunctions(dbName: String, pattern: String): Dataset[Function]
def getFunction(functionName: String): Function
def getFunction(dbName: String, functionName: String): Function
def functionExists(functionName: String): Boolean
def functionExists(dbName: String, functionName: String): Boolean
// Caching operations
def cacheTable(tableName: String): Unit
def cacheTable(tableName: String, storageLevel: StorageLevel): Unit
def uncacheTable(tableName: String): Unit
def clearCache(): Unit
def isCached(tableName: String): Boolean
def refreshTable(tableName: String): Unit
def refreshByPath(path: String): Unit
// Recovery operations
def recoverPartitions(tableName: String): Unit
}case class Database(
name: String,
description: String,
locationUri: String) extends DefinedByConstructorParams {
override def toString: String = {
s"Database[name='$name', description='$description', path='$locationUri']"
}
}case class Table(
name: String,
database: String,
description: String,
tableType: String,
isTemporary: Boolean) extends DefinedByConstructorParams {
override def toString: String = {
s"Table[name='$name', database='$database', description='$description', " +
s"tableType='$tableType', isTemporary='$isTemporary']"
}
}case class Column(
name: String,
description: String,
dataType: String,
nullable: Boolean,
isPartition: Boolean,
isBucket: Boolean) extends DefinedByConstructorParams {
override def toString: String = {
s"Column[name='$name', description='$description', dataType='$dataType', " +
s"nullable='$nullable', isPartition='$isPartition', isBucket='$isBucket']"
}
}case class Function(
name: String,
database: String,
description: String,
className: String,
isTemporary: Boolean) extends DefinedByConstructorParams {
override def toString: String = {
s"Function[name='$name', database='$database', description='$description', " +
s"className='$className', isTemporary='$isTemporary']"
}
}// Table creation with DataFrameWriter
class DataFrameWriter[T] {
def saveAsTable(tableName: String): Unit
def insertInto(tableName: String): Unit
// V2 table operations
def writeTo(tableName: String): DataFrameWriterV2[T]
}
// Advanced table creation
class DataFrameWriterV2[T] {
def create(): Unit
def replace(): Unit
def createOrReplace(): Unit
def append(): Unit
def overwrite(): Unit
def overwritePartitions(): Unit
// Table properties
def tableProperty(property: String, value: String): DataFrameWriterV2[T]
def partitionedBy(column: Column, columns: Column*): DataFrameWriterV2[T]
def using(provider: String): DataFrameWriterV2[T]
}object StorageLevel {
val NONE: StorageLevel
val DISK_ONLY: StorageLevel
val DISK_ONLY_2: StorageLevel
val DISK_ONLY_3: StorageLevel
val MEMORY_ONLY: StorageLevel
val MEMORY_ONLY_2: StorageLevel
val MEMORY_ONLY_SER: StorageLevel
val MEMORY_ONLY_SER_2: StorageLevel
val MEMORY_AND_DISK: StorageLevel
val MEMORY_AND_DISK_2: StorageLevel
val MEMORY_AND_DISK_SER: StorageLevel
val MEMORY_AND_DISK_SER_2: StorageLevel
val OFF_HEAP: StorageLevel
}import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog._
val spark = SparkSession.builder()
.appName("Catalog Operations Demo")
.enableHiveSupport() // Enable Hive metastore support
.getOrCreate()
// Current database operations
println(s"Current database: ${spark.catalog.currentDatabase}")
// List all databases
val databases = spark.catalog.listDatabases()
databases.show()
databases.collect().foreach { db =>
println(s"Database: ${db.name}, Description: ${db.description}, Location: ${db.locationUri}")
}
// Filter databases by pattern
val testDatabases = spark.catalog.listDatabases("test*")
testDatabases.show()
// Check if database exists
val dbExists = spark.catalog.databaseExists("analytics")
println(s"Analytics database exists: $dbExists")
// Create a new database
spark.catalog.createDatabase(
dbName = "analytics",
description = "Analytics and reporting database",
location = "s3a://my-bucket/analytics/"
)
// Get database information
val analyticsDB = spark.catalog.getDatabase("analytics")
println(s"Analytics DB: ${analyticsDB.name} at ${analyticsDB.locationUri}")
// Switch to different database
spark.catalog.setCurrentDatabase("analytics")
println(s"Switched to database: ${spark.catalog.currentDatabase}")
// Drop database (with cascade to drop all tables)
spark.catalog.dropDatabase("old_database", ignoreIfNotExists = true, cascade = true)// List all tables in current database
val tables = spark.catalog.listTables()
tables.show()
// List tables in specific database
val analyticsTables = spark.catalog.listTables("analytics")
analyticsTables.show()
// Filter tables by pattern
val salesTables = spark.catalog.listTables("analytics", "sales*")
salesTables.show()
// Get detailed table information
tables.collect().foreach { table =>
println(s"Table: ${table.database}.${table.name}")
println(s" Type: ${table.tableType}, Temporary: ${table.isTemporary}")
println(s" Description: ${table.description}")
println()
}
// Check if table exists
val tableExists = spark.catalog.tableExists("sales_data")
val dbTableExists = spark.catalog.tableExists("analytics", "user_events")
println(s"sales_data exists: $tableExists")
println(s"analytics.user_events exists: $dbTableExists")
// Get specific table metadata
if (spark.catalog.tableExists("analytics", "sales_data")) {
val salesTable = spark.catalog.getTable("analytics", "sales_data")
println(s"Sales table: ${salesTable.name} in ${salesTable.database}")
println(s"Table type: ${salesTable.tableType}")
println(s"Is temporary: ${salesTable.isTemporary}")
}
// Create external table
val externalTable = spark.catalog.createTable(
tableName = "external_sales",
path = "s3a://data-lake/sales/",
source = "parquet"
)
// Create table with schema and options
import org.apache.spark.sql.types._
val salesSchema = StructType(Array(
StructField("transaction_id", StringType, nullable = false),
StructField("customer_id", StringType, nullable = false),
StructField("product_id", StringType, nullable = false),
StructField("amount", DecimalType(10, 2), nullable = false),
StructField("transaction_date", DateType, nullable = false),
StructField("region", StringType, nullable = false)
))
val managedTable = spark.catalog.createTable(
tableName = "managed_sales",
source = "delta",
schema = salesSchema,
options = Map(
"path" -> "/path/to/delta/sales",
"delta.autoOptimize.optimizeWrite" -> "true",
"delta.autoOptimize.autoCompact" -> "true"
)
)// List columns for a table
val salesColumns = spark.catalog.listColumns("analytics", "sales_data")
salesColumns.show(truncate = false)
// Get detailed column information
salesColumns.collect().foreach { column =>
println(s"Column: ${column.name}")
println(s" Data Type: ${column.dataType}")
println(s" Nullable: ${column.nullable}")
println(s" Is Partition: ${column.isPartition}")
println(s" Is Bucket: ${column.isBucket}")
println(s" Description: ${column.description}")
println()
}
// Analyze table schema programmatically
def analyzeTableSchema(dbName: String, tableName: String): Unit = {
val columns = spark.catalog.listColumns(dbName, tableName).collect()
println(s"Schema Analysis for $dbName.$tableName:")
println(s"Total columns: ${columns.length}")
val partitionColumns = columns.filter(_.isPartition)
val bucketColumns = columns.filter(_.isBucket)
val nullableColumns = columns.filter(_.nullable)
println(s"Partition columns: ${partitionColumns.map(_.name).mkString(", ")}")
println(s"Bucket columns: ${bucketColumns.map(_.name).mkString(", ")}")
println(s"Nullable columns: ${nullableColumns.length}/${columns.length}")
// Group by data type
val typeGroups = columns.groupBy(_.dataType)
typeGroups.foreach { case (dataType, cols) =>
println(s"$dataType: ${cols.map(_.name).mkString(", ")}")
}
}
analyzeTableSchema("analytics", "sales_data")
// Compare schemas between tables
def compareSchemas(db1: String, table1: String, db2: String, table2: String): Unit = {
val schema1 = spark.catalog.listColumns(db1, table1).collect().map(c => c.name -> c.dataType).toMap
val schema2 = spark.catalog.listColumns(db2, table2).collect().map(c => c.name -> c.dataType).toMap
val commonColumns = schema1.keySet.intersect(schema2.keySet)
val onlyInFirst = schema1.keySet -- schema2.keySet
val onlyInSecond = schema2.keySet -- schema1.keySet
println(s"Schema Comparison: $db1.$table1 vs $db2.$table2")
println(s"Common columns: ${commonColumns.size}")
println(s"Only in first: ${onlyInFirst.mkString(", ")}")
println(s"Only in second: ${onlyInSecond.mkString(", ")}")
// Check for type mismatches in common columns
val typeMismatches = commonColumns.filter(col => schema1(col) != schema2(col))
if (typeMismatches.nonEmpty) {
println("Type mismatches:")
typeMismatches.foreach { col =>
println(s" $col: ${schema1(col)} vs ${schema2(col)}")
}
}
}// List all functions
val allFunctions = spark.catalog.listFunctions()
allFunctions.show()
// List functions in specific database
val analyticsFunctions = spark.catalog.listFunctions("analytics")
analyticsFunctions.show()
// Filter functions by pattern
val mathFunctions = spark.catalog.listFunctions("default", "*math*")
mathFunctions.show()
// Get function information
allFunctions.collect().foreach { func =>
println(s"Function: ${func.database}.${func.name}")
println(s" Class: ${func.className}")
println(s" Temporary: ${func.isTemporary}")
println(s" Description: ${func.description}")
println()
}
// Check if function exists
val funcExists = spark.catalog.functionExists("my_custom_function")
val dbFuncExists = spark.catalog.functionExists("analytics", "sales_metrics")
println(s"my_custom_function exists: $funcExists")
println(s"analytics.sales_metrics exists: $dbFuncExists")
// Get specific function details
if (spark.catalog.functionExists("default", "substring")) {
val substringFunc = spark.catalog.getFunction("default", "substring")
println(s"Substring function: ${substringFunc.name}")
println(s"Class: ${substringFunc.className}")
}
// Register custom function (example)
spark.udf.register("calculate_tax", (amount: Double, rate: Double) => amount * rate)
// Verify registration
val taxFuncExists = spark.catalog.functionExists("calculate_tax")
println(s"calculate_tax function registered: $taxFuncExists")
// Function discovery and documentation
def documentFunctions(databaseName: String): Unit = {
val functions = spark.catalog.listFunctions(databaseName).collect()
println(s"Function Documentation for Database: $databaseName")
println("=" * 50)
val grouped = functions.groupBy(_.isTemporary)
println("PERSISTENT FUNCTIONS:")
grouped.getOrElse(false, Array()).foreach { func =>
println(s" ${func.name}: ${func.description}")
}
println("\nTEMPORARY FUNCTIONS:")
grouped.getOrElse(true, Array()).foreach { func =>
println(s" ${func.name}: ${func.description}")
}
}
documentFunctions("default")import org.apache.spark.storage.StorageLevel
// Create sample data for caching examples
val salesData = Seq(
("TXN001", "CUST001", "PROD001", 100.50, "2023-01-15", "US"),
("TXN002", "CUST002", "PROD002", 250.75, "2023-01-16", "UK"),
("TXN003", "CUST003", "PROD001", 180.25, "2023-01-17", "CA")
).toDF("transaction_id", "customer_id", "product_id", "amount", "transaction_date", "region")
// Save as table for caching examples
salesData.write
.mode("overwrite")
.saveAsTable("sales_cache_demo")
// Basic table caching
spark.catalog.cacheTable("sales_cache_demo")
// Check if table is cached
val isCached = spark.catalog.isCached("sales_cache_demo")
println(s"sales_cache_demo is cached: $isCached")
// Cache with specific storage level
spark.catalog.cacheTable("sales_cache_demo", StorageLevel.MEMORY_AND_DISK_SER)
// Cache multiple tables with different strategies
val largeTables = spark.catalog.listTables().filter(_.name.contains("large")).collect()
largeTables.foreach { table =>
spark.catalog.cacheTable(table.name, StorageLevel.DISK_ONLY)
println(s"Cached large table: ${table.name} to disk only")
}
val frequentTables = spark.catalog.listTables().filter(_.name.contains("frequent")).collect()
frequentTables.foreach { table =>
spark.catalog.cacheTable(table.name, StorageLevel.MEMORY_ONLY)
println(s"Cached frequent table: ${table.name} to memory only")
}
// Uncache specific table
spark.catalog.uncacheTable("sales_cache_demo")
println(s"sales_cache_demo is cached after uncache: ${spark.catalog.isCached("sales_cache_demo")}")
// Clear all cached tables
spark.catalog.clearCache()
println("All cached tables cleared")
// Refresh table metadata (useful after external changes)
spark.catalog.refreshTable("sales_cache_demo")
// Refresh by path (for external tables)
spark.catalog.refreshByPath("/path/to/external/data")
// Cache management utility
def manageCacheForDatabase(databaseName: String, cacheStrategy: String): Unit = {
val tables = spark.catalog.listTables(databaseName).collect()
cacheStrategy.toLowerCase match {
case "memory" =>
tables.foreach { table =>
spark.catalog.cacheTable(s"${table.database}.${table.name}", StorageLevel.MEMORY_ONLY)
println(s"Cached ${table.database}.${table.name} in memory")
}
case "disk" =>
tables.foreach { table =>
spark.catalog.cacheTable(s"${table.database}.${table.name}", StorageLevel.DISK_ONLY)
println(s"Cached ${table.database}.${table.name} on disk")
}
case "mixed" =>
tables.foreach { table =>
spark.catalog.cacheTable(s"${table.database}.${table.name}", StorageLevel.MEMORY_AND_DISK)
println(s"Cached ${table.database}.${table.name} with memory and disk")
}
case "clear" =>
tables.foreach { table =>
if (spark.catalog.isCached(s"${table.database}.${table.name}")) {
spark.catalog.uncacheTable(s"${table.database}.${table.name}")
println(s"Uncached ${table.database}.${table.name}")
}
}
}
}
// Cache all tables in analytics database in memory and disk
manageCacheForDatabase("analytics", "mixed")// Create temporary views
salesData.createOrReplaceTempView("temp_sales")
salesData.createGlobalTempView("global_sales")
// List all tables including temporary views
val allTables = spark.catalog.listTables().collect()
allTables.filter(_.isTemporary).foreach { table =>
println(s"Temporary view: ${table.name} in database: ${table.database}")
}
// Access global temporary views (in global_temp database)
val globalTempTables = spark.catalog.listTables("global_temp").collect()
globalTempTables.foreach { table =>
println(s"Global temporary view: ${table.name}")
}
// Drop temporary views
val tempDropped = spark.catalog.dropTempView("temp_sales")
val globalTempDropped = spark.catalog.dropGlobalTempView("global_sales")
println(s"Temporary view dropped: $tempDropped")
println(s"Global temporary view dropped: $globalTempDropped")
// Temporary view lifecycle management
def manageTemporaryViews(sessionName: String): Unit = {
// Create session-specific temporary views
salesData.filter($"region" === "US")
.createOrReplaceTempView(s"${sessionName}_us_sales")
salesData.filter($"region" === "UK")
.createOrReplaceTempView(s"${sessionName}_uk_sales")
// List session views
val sessionViews = spark.catalog.listTables().filter(_.name.startsWith(sessionName)).collect()
println(s"Session views for $sessionName:")
sessionViews.foreach(view => println(s" - ${view.name}"))
// Cleanup function
def cleanup(): Unit = {
sessionViews.foreach { view =>
spark.catalog.dropTempView(view.name)
println(s"Dropped temporary view: ${view.name}")
}
}
// Register cleanup for shutdown
sys.addShutdownHook(cleanup())
}
manageTemporaryViews("analytics_session")// Create partitioned table for recovery demo
val partitionedSales = salesData.withColumn("year", year(to_date($"transaction_date")))
.withColumn("month", month(to_date($"transaction_date")))
partitionedSales.write
.mode("overwrite")
.partitionBy("year", "month")
.saveAsTable("partitioned_sales")
// Simulate adding partitions externally (outside Spark)
// In practice, this would be done by external processes
// Recover partitions to sync metastore with filesystem
spark.catalog.recoverPartitions("partitioned_sales")
println("Recovered partitions for partitioned_sales table")
// Comprehensive catalog maintenance
def performCatalogMaintenance(): Unit = {
println("Starting catalog maintenance...")
// Get all databases
val databases = spark.catalog.listDatabases().collect()
databases.foreach { db =>
println(s"Maintaining database: ${db.name}")
// Get all tables in database
val tables = spark.catalog.listTables(db.name).collect()
tables.foreach { table =>
if (!table.isTemporary) {
try {
// Refresh table metadata
spark.catalog.refreshTable(s"${table.database}.${table.name}")
// Recover partitions for partitioned tables
val columns = spark.catalog.listColumns(table.database, table.name).collect()
val hasPartitions = columns.exists(_.isPartition)
if (hasPartitions) {
spark.catalog.recoverPartitions(s"${table.database}.${table.name}")
println(s" Recovered partitions for ${table.database}.${table.name}")
}
} catch {
case e: Exception =>
println(s" Error maintaining ${table.database}.${table.name}: ${e.getMessage}")
}
}
}
}
println("Catalog maintenance completed")
}
// Run maintenance
performCatalogMaintenance()
// Catalog health check
def catalogHealthCheck(): Unit = {
println("Catalog Health Check")
println("=" * 20)
val databases = spark.catalog.listDatabases().collect()
println(s"Total databases: ${databases.length}")
databases.foreach { db =>
val tables = spark.catalog.listTables(db.name).collect()
val tempTables = tables.count(_.isTemporary)
val permanentTables = tables.length - tempTables
println(s"Database ${db.name}: $permanentTables permanent, $tempTables temporary tables")
// Check for tables with issues
tables.filter(!_.isTemporary).foreach { table =>
try {
val columns = spark.catalog.listColumns(table.database, table.name).collect()
val partitionCount = columns.count(_.isPartition)
val bucketCount = columns.count(_.isBucket)
if (partitionCount > 0 || bucketCount > 0) {
println(s" ${table.name}: $partitionCount partition columns, $bucketCount bucket columns")
}
} catch {
case e: Exception =>
println(s" WARNING: Cannot access ${table.name}: ${e.getMessage}")
}
}
}
val allFunctions = spark.catalog.listFunctions().collect()
val tempFunctions = allFunctions.count(_.isTemporary)
val permanentFunctions = allFunctions.length - tempFunctions
println(s"Total functions: $permanentFunctions permanent, $tempFunctions temporary")
}
catalogHealthCheck()