or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mddata-sources.mddata-types.mddataframe-dataset.mdindex.mdsession-management.mdsql-functions.mdstreaming.mdudfs.md
tile.json

catalog.mddocs/

Apache Spark SQL - Catalog Operations

Capabilities

Database and Namespace Management

  • Manage multiple databases and namespaces within Spark's metastore for logical data organization
  • Create, drop, and list databases with configurable properties and location settings
  • Switch between databases and manage current database context for query execution
  • Handle database-level permissions and access control through metastore integration

Table and View Management Operations

  • Create, drop, and manage both managed and external tables with comprehensive metadata support
  • Handle temporary and global temporary views for session-scoped and cross-session data sharing
  • Support for table creation with custom storage formats, partitioning, and bucketing strategies
  • Manage table properties, statistics, and optimization hints for query performance tuning

Function Registry and Management

  • Register and manage user-defined functions (UDFs) and user-defined aggregate functions (UDAFs)
  • Handle both temporary and persistent function registrations with namespace scoping
  • Support for function overloading and parameter type checking for type-safe operations
  • Enable function discovery and introspection for development and debugging workflows

Metadata Discovery and Introspection

  • Query comprehensive metadata about databases, tables, columns, and functions through programmatic APIs
  • Support for schema discovery and data lineage tracking across tables and views
  • Handle table statistics and partition information for query optimization and monitoring
  • Enable catalog browsing and exploration for data governance and documentation purposes

API Reference

Catalog Class

abstract class Catalog {
  // Current database operations
  def currentDatabase: String
  def setCurrentDatabase(dbName: String): Unit
  
  // Database management
  def listDatabases(): Dataset[Database]
  def listDatabases(pattern: String): Dataset[Database]
  def databaseExists(dbName: String): Boolean
  def getDatabase(dbName: String): Database
  def createDatabase(dbName: String, description: String, location: String): Unit
  def dropDatabase(dbName: String): Unit
  def dropDatabase(dbName: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
  
  // Table management
  def listTables(): Dataset[Table]
  def listTables(dbName: String): Dataset[Table]
  def listTables(dbName: String, pattern: String): Dataset[Table]
  def getTable(tableName: String): Table
  def getTable(dbName: String, tableName: String): Table
  def tableExists(tableName: String): Boolean
  def tableExists(dbName: String, tableName: String): Boolean
  def createTable(tableName: String, path: String): DataFrame
  def createTable(tableName: String, path: String, source: String): DataFrame
  def createTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame
  def dropTempView(viewName: String): Boolean
  def dropGlobalTempView(viewName: String): Boolean
  
  // Column information
  def listColumns(tableName: String): Dataset[Column]
  def listColumns(dbName: String, tableName: String): Dataset[Column]
  
  // Function management
  def listFunctions(): Dataset[Function]
  def listFunctions(dbName: String): Dataset[Function]
  def listFunctions(dbName: String, pattern: String): Dataset[Function]
  def getFunction(functionName: String): Function
  def getFunction(dbName: String, functionName: String): Function
  def functionExists(functionName: String): Boolean
  def functionExists(dbName: String, functionName: String): Boolean
  
  // Caching operations
  def cacheTable(tableName: String): Unit
  def cacheTable(tableName: String, storageLevel: StorageLevel): Unit
  def uncacheTable(tableName: String): Unit
  def clearCache(): Unit
  def isCached(tableName: String): Boolean
  def refreshTable(tableName: String): Unit
  def refreshByPath(path: String): Unit
  
  // Recovery operations
  def recoverPartitions(tableName: String): Unit
}

Database Metadata

case class Database(
    name: String,
    description: String,
    locationUri: String) extends DefinedByConstructorParams {
  
  override def toString: String = {
    s"Database[name='$name', description='$description', path='$locationUri']"
  }
}

Table Metadata

case class Table(
    name: String,
    database: String,
    description: String,
    tableType: String,
    isTemporary: Boolean) extends DefinedByConstructorParams {
  
  override def toString: String = {
    s"Table[name='$name', database='$database', description='$description', " +
    s"tableType='$tableType', isTemporary='$isTemporary']"
  }
}

Column Metadata

case class Column(
    name: String,
    description: String,
    dataType: String,
    nullable: Boolean,
    isPartition: Boolean,
    isBucket: Boolean) extends DefinedByConstructorParams {
  
  override def toString: String = {
    s"Column[name='$name', description='$description', dataType='$dataType', " +
    s"nullable='$nullable', isPartition='$isPartition', isBucket='$isBucket']"
  }
}

Function Metadata

case class Function(
    name: String,
    database: String,
    description: String,
    className: String,
    isTemporary: Boolean) extends DefinedByConstructorParams {
  
  override def toString: String = {
    s"Function[name='$name', database='$database', description='$description', " +
    s"className='$className', isTemporary='$isTemporary']"
  }
}

Table Creation Options

// Table creation with DataFrameWriter
class DataFrameWriter[T] {
  def saveAsTable(tableName: String): Unit
  def insertInto(tableName: String): Unit
  
  // V2 table operations
  def writeTo(tableName: String): DataFrameWriterV2[T]
}

// Advanced table creation
class DataFrameWriterV2[T] {
  def create(): Unit
  def replace(): Unit  
  def createOrReplace(): Unit
  def append(): Unit
  def overwrite(): Unit
  def overwritePartitions(): Unit
  
  // Table properties
  def tableProperty(property: String, value: String): DataFrameWriterV2[T]
  def partitionedBy(column: Column, columns: Column*): DataFrameWriterV2[T]
  def using(provider: String): DataFrameWriterV2[T]
}

Storage Level for Caching

object StorageLevel {
  val NONE: StorageLevel
  val DISK_ONLY: StorageLevel
  val DISK_ONLY_2: StorageLevel
  val DISK_ONLY_3: StorageLevel
  val MEMORY_ONLY: StorageLevel
  val MEMORY_ONLY_2: StorageLevel
  val MEMORY_ONLY_SER: StorageLevel
  val MEMORY_ONLY_SER_2: StorageLevel
  val MEMORY_AND_DISK: StorageLevel
  val MEMORY_AND_DISK_2: StorageLevel
  val MEMORY_AND_DISK_SER: StorageLevel
  val MEMORY_AND_DISK_SER_2: StorageLevel
  val OFF_HEAP: StorageLevel
}

Usage Examples

Database Management

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog._

val spark = SparkSession.builder()
  .appName("Catalog Operations Demo")
  .enableHiveSupport() // Enable Hive metastore support
  .getOrCreate()

// Current database operations
println(s"Current database: ${spark.catalog.currentDatabase}")

// List all databases
val databases = spark.catalog.listDatabases()
databases.show()

databases.collect().foreach { db =>
  println(s"Database: ${db.name}, Description: ${db.description}, Location: ${db.locationUri}")
}

// Filter databases by pattern
val testDatabases = spark.catalog.listDatabases("test*")
testDatabases.show()

// Check if database exists
val dbExists = spark.catalog.databaseExists("analytics")
println(s"Analytics database exists: $dbExists")

// Create a new database
spark.catalog.createDatabase(
  dbName = "analytics", 
  description = "Analytics and reporting database",
  location = "s3a://my-bucket/analytics/"
)

// Get database information
val analyticsDB = spark.catalog.getDatabase("analytics")
println(s"Analytics DB: ${analyticsDB.name} at ${analyticsDB.locationUri}")

// Switch to different database
spark.catalog.setCurrentDatabase("analytics")
println(s"Switched to database: ${spark.catalog.currentDatabase}")

// Drop database (with cascade to drop all tables)
spark.catalog.dropDatabase("old_database", ignoreIfNotExists = true, cascade = true)

Table Management and Discovery

// List all tables in current database
val tables = spark.catalog.listTables()
tables.show()

// List tables in specific database
val analyticsTables = spark.catalog.listTables("analytics")
analyticsTables.show()

// Filter tables by pattern
val salesTables = spark.catalog.listTables("analytics", "sales*")
salesTables.show()

// Get detailed table information
tables.collect().foreach { table =>
  println(s"Table: ${table.database}.${table.name}")
  println(s"  Type: ${table.tableType}, Temporary: ${table.isTemporary}")
  println(s"  Description: ${table.description}")
  println()
}

// Check if table exists
val tableExists = spark.catalog.tableExists("sales_data")
val dbTableExists = spark.catalog.tableExists("analytics", "user_events")
println(s"sales_data exists: $tableExists")
println(s"analytics.user_events exists: $dbTableExists")

// Get specific table metadata
if (spark.catalog.tableExists("analytics", "sales_data")) {
  val salesTable = spark.catalog.getTable("analytics", "sales_data")
  println(s"Sales table: ${salesTable.name} in ${salesTable.database}")
  println(s"Table type: ${salesTable.tableType}")
  println(s"Is temporary: ${salesTable.isTemporary}")
}

// Create external table
val externalTable = spark.catalog.createTable(
  tableName = "external_sales",
  path = "s3a://data-lake/sales/",
  source = "parquet"
)

// Create table with schema and options
import org.apache.spark.sql.types._
val salesSchema = StructType(Array(
  StructField("transaction_id", StringType, nullable = false),
  StructField("customer_id", StringType, nullable = false),
  StructField("product_id", StringType, nullable = false),
  StructField("amount", DecimalType(10, 2), nullable = false),
  StructField("transaction_date", DateType, nullable = false),
  StructField("region", StringType, nullable = false)
))

val managedTable = spark.catalog.createTable(
  tableName = "managed_sales",
  source = "delta",
  schema = salesSchema,
  options = Map(
    "path" -> "/path/to/delta/sales",
    "delta.autoOptimize.optimizeWrite" -> "true",
    "delta.autoOptimize.autoCompact" -> "true"
  )
)

Column Information and Schema Discovery

// List columns for a table
val salesColumns = spark.catalog.listColumns("analytics", "sales_data")
salesColumns.show(truncate = false)

// Get detailed column information
salesColumns.collect().foreach { column =>
  println(s"Column: ${column.name}")
  println(s"  Data Type: ${column.dataType}")
  println(s"  Nullable: ${column.nullable}")
  println(s"  Is Partition: ${column.isPartition}")
  println(s"  Is Bucket: ${column.isBucket}")
  println(s"  Description: ${column.description}")
  println()
}

// Analyze table schema programmatically
def analyzeTableSchema(dbName: String, tableName: String): Unit = {
  val columns = spark.catalog.listColumns(dbName, tableName).collect()
  
  println(s"Schema Analysis for $dbName.$tableName:")
  println(s"Total columns: ${columns.length}")
  
  val partitionColumns = columns.filter(_.isPartition)
  val bucketColumns = columns.filter(_.isBucket)
  val nullableColumns = columns.filter(_.nullable)
  
  println(s"Partition columns: ${partitionColumns.map(_.name).mkString(", ")}")
  println(s"Bucket columns: ${bucketColumns.map(_.name).mkString(", ")}")
  println(s"Nullable columns: ${nullableColumns.length}/${columns.length}")
  
  // Group by data type
  val typeGroups = columns.groupBy(_.dataType)
  typeGroups.foreach { case (dataType, cols) =>
    println(s"$dataType: ${cols.map(_.name).mkString(", ")}")
  }
}

analyzeTableSchema("analytics", "sales_data")

// Compare schemas between tables
def compareSchemas(db1: String, table1: String, db2: String, table2: String): Unit = {
  val schema1 = spark.catalog.listColumns(db1, table1).collect().map(c => c.name -> c.dataType).toMap
  val schema2 = spark.catalog.listColumns(db2, table2).collect().map(c => c.name -> c.dataType).toMap
  
  val commonColumns = schema1.keySet.intersect(schema2.keySet)
  val onlyInFirst = schema1.keySet -- schema2.keySet
  val onlyInSecond = schema2.keySet -- schema1.keySet
  
  println(s"Schema Comparison: $db1.$table1 vs $db2.$table2")
  println(s"Common columns: ${commonColumns.size}")
  println(s"Only in first: ${onlyInFirst.mkString(", ")}")
  println(s"Only in second: ${onlyInSecond.mkString(", ")}")
  
  // Check for type mismatches in common columns
  val typeMismatches = commonColumns.filter(col => schema1(col) != schema2(col))
  if (typeMismatches.nonEmpty) {
    println("Type mismatches:")
    typeMismatches.foreach { col =>
      println(s"  $col: ${schema1(col)} vs ${schema2(col)}")
    }
  }
}

Function Management

// List all functions
val allFunctions = spark.catalog.listFunctions()
allFunctions.show()

// List functions in specific database
val analyticsFunctions = spark.catalog.listFunctions("analytics")
analyticsFunctions.show()

// Filter functions by pattern
val mathFunctions = spark.catalog.listFunctions("default", "*math*")
mathFunctions.show()

// Get function information
allFunctions.collect().foreach { func =>
  println(s"Function: ${func.database}.${func.name}")
  println(s"  Class: ${func.className}")
  println(s"  Temporary: ${func.isTemporary}")
  println(s"  Description: ${func.description}")
  println()
}

// Check if function exists
val funcExists = spark.catalog.functionExists("my_custom_function")
val dbFuncExists = spark.catalog.functionExists("analytics", "sales_metrics")
println(s"my_custom_function exists: $funcExists")
println(s"analytics.sales_metrics exists: $dbFuncExists")

// Get specific function details
if (spark.catalog.functionExists("default", "substring")) {
  val substringFunc = spark.catalog.getFunction("default", "substring")
  println(s"Substring function: ${substringFunc.name}")
  println(s"Class: ${substringFunc.className}")
}

// Register custom function (example)
spark.udf.register("calculate_tax", (amount: Double, rate: Double) => amount * rate)

// Verify registration
val taxFuncExists = spark.catalog.functionExists("calculate_tax")
println(s"calculate_tax function registered: $taxFuncExists")

// Function discovery and documentation
def documentFunctions(databaseName: String): Unit = {
  val functions = spark.catalog.listFunctions(databaseName).collect()
  
  println(s"Function Documentation for Database: $databaseName")
  println("=" * 50)
  
  val grouped = functions.groupBy(_.isTemporary)
  
  println("PERSISTENT FUNCTIONS:")
  grouped.getOrElse(false, Array()).foreach { func =>
    println(s"  ${func.name}: ${func.description}")
  }
  
  println("\nTEMPORARY FUNCTIONS:")
  grouped.getOrElse(true, Array()).foreach { func =>
    println(s"  ${func.name}: ${func.description}")
  }
}

documentFunctions("default")

Table Caching Operations

import org.apache.spark.storage.StorageLevel

// Create sample data for caching examples
val salesData = Seq(
  ("TXN001", "CUST001", "PROD001", 100.50, "2023-01-15", "US"),
  ("TXN002", "CUST002", "PROD002", 250.75, "2023-01-16", "UK"),
  ("TXN003", "CUST003", "PROD001", 180.25, "2023-01-17", "CA")
).toDF("transaction_id", "customer_id", "product_id", "amount", "transaction_date", "region")

// Save as table for caching examples
salesData.write
  .mode("overwrite")
  .saveAsTable("sales_cache_demo")

// Basic table caching
spark.catalog.cacheTable("sales_cache_demo")

// Check if table is cached
val isCached = spark.catalog.isCached("sales_cache_demo")
println(s"sales_cache_demo is cached: $isCached")

// Cache with specific storage level
spark.catalog.cacheTable("sales_cache_demo", StorageLevel.MEMORY_AND_DISK_SER)

// Cache multiple tables with different strategies
val largeTables = spark.catalog.listTables().filter(_.name.contains("large")).collect()
largeTables.foreach { table =>
  spark.catalog.cacheTable(table.name, StorageLevel.DISK_ONLY)
  println(s"Cached large table: ${table.name} to disk only")
}

val frequentTables = spark.catalog.listTables().filter(_.name.contains("frequent")).collect()
frequentTables.foreach { table =>
  spark.catalog.cacheTable(table.name, StorageLevel.MEMORY_ONLY)
  println(s"Cached frequent table: ${table.name} to memory only")
}

// Uncache specific table
spark.catalog.uncacheTable("sales_cache_demo")
println(s"sales_cache_demo is cached after uncache: ${spark.catalog.isCached("sales_cache_demo")}")

// Clear all cached tables
spark.catalog.clearCache()
println("All cached tables cleared")

// Refresh table metadata (useful after external changes)
spark.catalog.refreshTable("sales_cache_demo")

// Refresh by path (for external tables)
spark.catalog.refreshByPath("/path/to/external/data")

// Cache management utility
def manageCacheForDatabase(databaseName: String, cacheStrategy: String): Unit = {
  val tables = spark.catalog.listTables(databaseName).collect()
  
  cacheStrategy.toLowerCase match {
    case "memory" =>
      tables.foreach { table =>
        spark.catalog.cacheTable(s"${table.database}.${table.name}", StorageLevel.MEMORY_ONLY)
        println(s"Cached ${table.database}.${table.name} in memory")
      }
    
    case "disk" =>
      tables.foreach { table =>
        spark.catalog.cacheTable(s"${table.database}.${table.name}", StorageLevel.DISK_ONLY)
        println(s"Cached ${table.database}.${table.name} on disk")
      }
      
    case "mixed" =>
      tables.foreach { table =>
        spark.catalog.cacheTable(s"${table.database}.${table.name}", StorageLevel.MEMORY_AND_DISK)
        println(s"Cached ${table.database}.${table.name} with memory and disk")
      }
      
    case "clear" =>
      tables.foreach { table =>
        if (spark.catalog.isCached(s"${table.database}.${table.name}")) {
          spark.catalog.uncacheTable(s"${table.database}.${table.name}")
          println(s"Uncached ${table.database}.${table.name}")
        }
      }
  }
}

// Cache all tables in analytics database in memory and disk
manageCacheForDatabase("analytics", "mixed")

Temporary Views Management

// Create temporary views
salesData.createOrReplaceTempView("temp_sales")
salesData.createGlobalTempView("global_sales")

// List all tables including temporary views
val allTables = spark.catalog.listTables().collect()
allTables.filter(_.isTemporary).foreach { table =>
  println(s"Temporary view: ${table.name} in database: ${table.database}")
}

// Access global temporary views (in global_temp database)
val globalTempTables = spark.catalog.listTables("global_temp").collect()
globalTempTables.foreach { table =>
  println(s"Global temporary view: ${table.name}")
}

// Drop temporary views
val tempDropped = spark.catalog.dropTempView("temp_sales")
val globalTempDropped = spark.catalog.dropGlobalTempView("global_sales")
println(s"Temporary view dropped: $tempDropped")
println(s"Global temporary view dropped: $globalTempDropped")

// Temporary view lifecycle management
def manageTemporaryViews(sessionName: String): Unit = {
  // Create session-specific temporary views
  salesData.filter($"region" === "US")
    .createOrReplaceTempView(s"${sessionName}_us_sales")
  
  salesData.filter($"region" === "UK")
    .createOrReplaceTempView(s"${sessionName}_uk_sales")
  
  // List session views
  val sessionViews = spark.catalog.listTables().filter(_.name.startsWith(sessionName)).collect()
  println(s"Session views for $sessionName:")
  sessionViews.foreach(view => println(s"  - ${view.name}"))
  
  // Cleanup function
  def cleanup(): Unit = {
    sessionViews.foreach { view =>
      spark.catalog.dropTempView(view.name)
      println(s"Dropped temporary view: ${view.name}")
    }
  }
  
  // Register cleanup for shutdown
  sys.addShutdownHook(cleanup())
}

manageTemporaryViews("analytics_session")

Partition Recovery and Maintenance

// Create partitioned table for recovery demo
val partitionedSales = salesData.withColumn("year", year(to_date($"transaction_date")))
  .withColumn("month", month(to_date($"transaction_date")))

partitionedSales.write
  .mode("overwrite")
  .partitionBy("year", "month")
  .saveAsTable("partitioned_sales")

// Simulate adding partitions externally (outside Spark)
// In practice, this would be done by external processes

// Recover partitions to sync metastore with filesystem
spark.catalog.recoverPartitions("partitioned_sales")
println("Recovered partitions for partitioned_sales table")

// Comprehensive catalog maintenance
def performCatalogMaintenance(): Unit = {
  println("Starting catalog maintenance...")
  
  // Get all databases
  val databases = spark.catalog.listDatabases().collect()
  
  databases.foreach { db =>
    println(s"Maintaining database: ${db.name}")
    
    // Get all tables in database
    val tables = spark.catalog.listTables(db.name).collect()
    
    tables.foreach { table =>
      if (!table.isTemporary) {
        try {
          // Refresh table metadata
          spark.catalog.refreshTable(s"${table.database}.${table.name}")
          
          // Recover partitions for partitioned tables
          val columns = spark.catalog.listColumns(table.database, table.name).collect()
          val hasPartitions = columns.exists(_.isPartition)
          
          if (hasPartitions) {
            spark.catalog.recoverPartitions(s"${table.database}.${table.name}")
            println(s"  Recovered partitions for ${table.database}.${table.name}")
          }
          
        } catch {
          case e: Exception =>
            println(s"  Error maintaining ${table.database}.${table.name}: ${e.getMessage}")
        }
      }
    }
  }
  
  println("Catalog maintenance completed")
}

// Run maintenance
performCatalogMaintenance()

// Catalog health check
def catalogHealthCheck(): Unit = {
  println("Catalog Health Check")
  println("=" * 20)
  
  val databases = spark.catalog.listDatabases().collect()
  println(s"Total databases: ${databases.length}")
  
  databases.foreach { db =>
    val tables = spark.catalog.listTables(db.name).collect()
    val tempTables = tables.count(_.isTemporary)
    val permanentTables = tables.length - tempTables
    
    println(s"Database ${db.name}: $permanentTables permanent, $tempTables temporary tables")
    
    // Check for tables with issues
    tables.filter(!_.isTemporary).foreach { table =>
      try {
        val columns = spark.catalog.listColumns(table.database, table.name).collect()
        val partitionCount = columns.count(_.isPartition)
        val bucketCount = columns.count(_.isBucket)
        
        if (partitionCount > 0 || bucketCount > 0) {
          println(s"  ${table.name}: $partitionCount partition columns, $bucketCount bucket columns")
        }
        
      } catch {
        case e: Exception =>
          println(s"  WARNING: Cannot access ${table.name}: ${e.getMessage}")
      }
    }
  }
  
  val allFunctions = spark.catalog.listFunctions().collect()
  val tempFunctions = allFunctions.count(_.isTemporary)
  val permanentFunctions = allFunctions.length - tempFunctions
  
  println(s"Total functions: $permanentFunctions permanent, $tempFunctions temporary")
}

catalogHealthCheck()