or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aggregations.mdcatalog.mddata-io.mddataframe-dataset.mdfunctions-expressions.mdindex.mdsession-management.mdstreaming.md
tile.json

catalog.mddocs/

Catalog and Metadata Management

The Spark SQL Catalog provides a programmatic interface for managing databases, tables, functions, and cached data. It serves as the central metadata repository and enables runtime introspection of the Spark SQL environment.

Catalog Interface

class Catalog {
  // Database operations
  def currentDatabase: String
  def setCurrentDatabase(dbName: String): Unit
  def listDatabases(): Dataset[Database]
  def databaseExists(dbName: String): Boolean
  def getDatabase(dbName: String): Database
  
  // Table operations
  def listTables(): Dataset[Table]
  def listTables(dbName: String): Dataset[Table]
  def tableExists(tableName: String): Boolean
  def tableExists(dbName: String, tableName: String): Boolean
  def getTable(tableName: String): Table
  def getTable(dbName: String, tableName: String): Table
  def listColumns(tableName: String): Dataset[Column]
  def listColumns(dbName: String, tableName: String): Dataset[Column]
  
  // Function operations
  def listFunctions(): Dataset[Function]
  def listFunctions(dbName: String): Dataset[Function]
  def functionExists(functionName: String): Boolean
  def functionExists(dbName: String, functionName: String): Boolean
  def getFunction(functionName: String): Function
  def getFunction(dbName: String, functionName: String): Function
  
  // Temporary view operations
  def dropTempView(viewName: String): Boolean
  def dropGlobalTempView(viewName: String): Boolean
  
  // Table management (Experimental)
  def createTable(tableName: String): DataFrameWriter[Row]
  def createTable(tableName: String, path: String): DataFrameWriter[Row]
  def createTable(tableName: String, source: String): DataFrameWriter[Row]
  def createTable(tableName: String, path: String, source: String): DataFrameWriter[Row]
  
  // Maintenance operations
  def recoverPartitions(tableName: String): Unit
  
  // Caching operations
  def isCached(tableName: String): Boolean
  def cacheTable(tableName: String): Unit
  def cacheTable(tableName: String, storageLevel: StorageLevel): Unit
  def uncacheTable(tableName: String): Unit
  def clearCache(): Unit
  def refreshTable(tableName: String): Unit
  def refreshByPath(path: String): Unit
}

Database Management

Database Operations

Usage Examples:

val catalog = spark.catalog

// Get current database
val currentDb = catalog.currentDatabase
println(s"Current database: $currentDb")

// Change database
catalog.setCurrentDatabase("my_database")

// List all databases
val databases = catalog.listDatabases()
databases.show()

// Check if database exists
val exists = catalog.databaseExists("production")
if (!exists) {
  spark.sql("CREATE DATABASE production")
}

// Get database information
val dbInfo = catalog.getDatabase("production")
println(s"Database: ${dbInfo.name}, Description: ${dbInfo.description}")

Database Metadata

case class Database(
  name: String,
  description: String,
  locationUri: String
)

Usage Examples:

// Inspect database details
catalog.listDatabases().collect().foreach { db =>
  println(s"Database: ${db.name}")
  println(s"Description: ${db.description}")
  println(s"Location: ${db.locationUri}")
  println("---")
}

// Filter databases
val prodDatabases = catalog.listDatabases()
  .filter(col("name").like("%prod%"))
  .select("name", "description")
  .show()

Table Management

Table Operations

case class Table(
  name: String,
  database: String,
  description: String,
  tableType: String,
  isTemporary: Boolean
)

Usage Examples:

// List all tables in current database
val tables = catalog.listTables()
tables.show()

// List tables in specific database
val prodTables = catalog.listTables("production")
prodTables.filter(col("tableType") === "MANAGED").show()

// Check if table exists
val tableExists = catalog.tableExists("users")
val specificExists = catalog.tableExists("production", "sales")

// Get table information
val tableInfo = catalog.getTable("users")
println(s"Table: ${tableInfo.name}")
println(s"Database: ${tableInfo.database}")
println(s"Type: ${tableInfo.tableType}")
println(s"Is Temporary: ${tableInfo.isTemporary}")

// List all temporary tables
catalog.listTables()
  .filter(col("isTemporary") === true)
  .select("name", "database")
  .show()

Column Metadata

case class Column(
  name: String,
  description: String,
  dataType: String,
  nullable: Boolean,
  isPartition: Boolean,
  isBucket: Boolean
)

Usage Examples:

// List columns for a table
val columns = catalog.listColumns("users")
columns.show()

// Get detailed column information
columns.collect().foreach { col =>
  println(s"Column: ${col.name}")
  println(s"Type: ${col.dataType}")
  println(s"Nullable: ${col.nullable}")
  println(s"Partition: ${col.isPartition}")
  println(s"Bucket: ${col.isBucket}")
  println("---")
}

// Find partition columns
val partitionCols = catalog.listColumns("partitioned_table")
  .filter(col("isPartition") === true)
  .select("name", "dataType")
  .show()

// Analyze table schema
def analyzeTableSchema(tableName: String): Unit = {
  val columns = catalog.listColumns(tableName)
  
  println(s"Schema for table: $tableName")
  println("=" * 50)
  
  columns.collect().foreach { col =>
    val nullable = if (col.nullable) "NULL" else "NOT NULL"
    val partition = if (col.isPartition) " (PARTITION)" else ""
    val bucket = if (col.isBucket) " (BUCKET)" else ""
    
    println(f"${col.name}%-20s ${col.dataType}%-15s $nullable$partition$bucket")
  }
}

analyzeTableSchema("my_table")

Function Management

Function Operations

case class Function(
  name: String,
  database: String,
  description: String,
  className: String,
  isTemporary: Boolean
)

Usage Examples:

// List all functions
val functions = catalog.listFunctions()
functions.show()

// List functions in specific database
val dbFunctions = catalog.listFunctions("my_database")
dbFunctions.show()

// Check if function exists
val funcExists = catalog.functionExists("my_udf")
val specificFuncExists = catalog.functionExists("my_database", "custom_agg")

// Get function information
val funcInfo = catalog.getFunction("upper")
println(s"Function: ${funcInfo.name}")
println(s"Database: ${funcInfo.database}")
println(s"Class: ${funcInfo.className}")
println(s"Is Temporary: ${funcInfo.isTemporary}")

// Filter user-defined functions
val udfs = catalog.listFunctions()
  .filter(col("isTemporary") === true)
  .select("name", "description")
  .show()

// Categorize functions
def categorizeFunction(func: Function): String = {
  if (func.isTemporary) "UDF"
  else if (func.className.startsWith("org.apache.spark.sql.catalyst.expressions")) "Built-in"
  else "System"
}

val categorized = catalog.listFunctions().collect().groupBy(categorizeFunction)
categorized.foreach { case (category, funcs) =>
  println(s"$category functions: ${funcs.length}")
}

Temporary View Management

Usage Examples:

// Create temporary views
df.createOrReplaceTempView("temp_users")
df.createGlobalTempView("global_temp_data")

// List temporary tables
val tempTables = catalog.listTables()
  .filter(col("isTemporary") === true)
  .show()

// Drop temporary views
val dropped = catalog.dropTempView("temp_users")
println(s"View dropped: $dropped")

val globalDropped = catalog.dropGlobalTempView("global_temp_data")
println(s"Global view dropped: $globalDropped")

// Manage temporary views
def cleanupTempViews(): Unit = {
  val tempViews = catalog.listTables()
    .filter(col("isTemporary") === true)
    .select("name")
    .collect()
    .map(_.getString(0))
  
  tempViews.foreach { viewName =>
    val success = catalog.dropTempView(viewName)
    println(s"Dropped temp view $viewName: $success")
  }
}

Caching Operations

Table Caching

Usage Examples:

import org.apache.spark.storage.StorageLevel

// Cache a table
catalog.cacheTable("frequently_used_table")

// Cache with specific storage level
catalog.cacheTable("large_table", StorageLevel.MEMORY_AND_DISK_SER)

// Check if table is cached
val isCached = catalog.isCached("my_table")
println(s"Table cached: $isCached")

// Uncache a table
catalog.uncacheTable("my_table")

// Clear all cached tables
catalog.clearCache()

// Cache management workflow
def manageCaching(tableName: String): Unit = {
  if (!catalog.isCached(tableName)) {
    println(s"Caching table: $tableName")
    catalog.cacheTable(tableName)
  } else {
    println(s"Table $tableName is already cached")
  }
}

// Cache frequently accessed tables
val frequentTables = Seq("dim_users", "fact_sales", "lookup_regions")
frequentTables.foreach(manageCaching)

Cache Monitoring

// Monitor cached tables
def showCachedTables(): Unit = {
  val allTables = catalog.listTables().collect()
  
  println("Cached Tables:")
  println("=" * 40)
  
  allTables.foreach { table =>
    val cached = catalog.isCached(table.name)
    if (cached) {
      println(s"${table.database}.${table.name}")
    }
  }
}

showCachedTables()

// Refresh cached table metadata
catalog.refreshTable("my_cached_table")

// Refresh by file path (for file-based tables)
catalog.refreshByPath("/path/to/parquet/files")

Table Creation and Management

Creating Tables

Usage Examples:

// Create table from DataFrame (Experimental API)
val writer = catalog.createTable("new_table")
df.write
  .mode("overwrite")
  .option("path", "/path/to/table")
  .saveAsTable("new_table")

// Create external table
val externalWriter = catalog.createTable("external_table", "/external/path")
df.write
  .mode("overwrite")
  .format("parquet")
  .save("/external/path")

// Create table with specific format
val formatWriter = catalog.createTable("json_table", "json")
df.write
  .mode("overwrite")
  .format("json")
  .saveAsTable("json_table")

// Create partitioned table using SQL
spark.sql("""
  CREATE TABLE partitioned_sales (
    id BIGINT,
    amount DOUBLE,
    customer_id STRING,
    sale_date DATE
  )
  USING PARQUET
  PARTITIONED BY (year INT, month INT)
  LOCATION '/data/sales'
""")

Partition Recovery

// Recover partitions for external tables
catalog.recoverPartitions("external_partitioned_table")

// Workflow for partition management
def managePartitions(tableName: String): Unit = {
  println(s"Recovering partitions for: $tableName")
  
  try {
    catalog.recoverPartitions(tableName)
    println("Partition recovery completed successfully")
    
    // Refresh the table to update metadata
    catalog.refreshTable(tableName)
    println("Table metadata refreshed")
    
  } catch {
    case e: Exception =>
      println(s"Partition recovery failed: ${e.getMessage}")
  }
}

managePartitions("my_partitioned_table")

Metadata Introspection

Schema Discovery

// Comprehensive metadata inspection
def inspectCatalog(): Unit = {
  println("=== CATALOG INSPECTION ===")
  
  // Current context
  println(s"Current Database: ${catalog.currentDatabase}")
  println()
  
  // Database summary
  val databases = catalog.listDatabases().collect()
  println(s"Total Databases: ${databases.length}")
  databases.foreach(db => println(s"  - ${db.name}: ${db.description}"))
  println()
  
  // Table summary
  val tables = catalog.listTables().collect()
  val managedTables = tables.count(_.tableType == "MANAGED")
  val externalTables = tables.count(_.tableType == "EXTERNAL")
  val tempTables = tables.count(_.isTemporary)
  
  println(s"Total Tables: ${tables.length}")
  println(s"  - Managed: $managedTables")
  println(s"  - External: $externalTables")
  println(s"  - Temporary: $tempTables")
  println()
  
  // Function summary
  val functions = catalog.listFunctions().collect()
  val builtinFuncs = functions.count(!_.isTemporary)
  val udfs = functions.count(_.isTemporary)
  
  println(s"Total Functions: ${functions.length}")
  println(s"  - Built-in: $builtinFuncs")
  println(s"  - UDFs: $udfs")
  println()
  
  // Cache summary
  val cachedTables = tables.count(table => catalog.isCached(table.name))
  println(s"Cached Tables: $cachedTables")
}

inspectCatalog()

Table Lineage and Dependencies

// Find table dependencies
def findTableDependencies(tableName: String): Unit = {
  try {
    val table = catalog.getTable(tableName)
    println(s"Table: ${table.name}")
    println(s"Database: ${table.database}")
    println(s"Type: ${table.tableType}")
    
    // Get columns with their characteristics
    val columns = catalog.listColumns(tableName).collect()
    val partitionCols = columns.filter(_.isPartition).map(_.name)
    val bucketCols = columns.filter(_.isBucket).map(_.name)
    
    if (partitionCols.nonEmpty) {
      println(s"Partitioned by: ${partitionCols.mkString(", ")}")
    }
    
    if (bucketCols.nonEmpty) {
      println(s"Bucketed by: ${bucketCols.mkString(", ")}")
    }
    
    // Check if cached
    if (catalog.isCached(tableName)) {
      println("Status: CACHED")
    }
    
  } catch {
    case e: Exception =>
      println(s"Error inspecting table $tableName: ${e.getMessage}")
  }
}

// Usage
findTableDependencies("my_important_table")

Integration with SQL DDL

DDL Operations through Catalog

// Create database
spark.sql("CREATE DATABASE IF NOT EXISTS analytics")
catalog.setCurrentDatabase("analytics")

// Verify creation
if (catalog.databaseExists("analytics")) {
  println("Analytics database created successfully")
}

// Create managed table
spark.sql("""
  CREATE TABLE user_analytics (
    user_id STRING,
    session_count BIGINT,
    total_duration DOUBLE,
    last_activity TIMESTAMP
  )
  USING DELTA
  TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true')
""")

// Verify table creation
if (catalog.tableExists("user_analytics")) {
  val table = catalog.getTable("user_analytics")
  println(s"Created table: ${table.name} (${table.tableType})")
  
  // Show schema
  catalog.listColumns("user_analytics").show()
}

// Create view
spark.sql("""
  CREATE VIEW active_users AS
  SELECT user_id, session_count
  FROM user_analytics
  WHERE last_activity > current_date() - INTERVAL 30 DAYS
""")

// List all objects in the database
println("Tables and Views:")
catalog.listTables("analytics").show()

Error Handling and Best Practices

Robust Catalog Operations

// Safe catalog operations with error handling
def safeTableOperation(tableName: String)(operation: => Unit): Unit = {
  try {
    if (catalog.tableExists(tableName)) {
      operation
    } else {
      println(s"Table $tableName does not exist")
    }
  } catch {
    case e: Exception =>
      println(s"Error operating on table $tableName: ${e.getMessage}")
  }
}

// Safe caching
def safeCacheTable(tableName: String): Unit = {
  safeTableOperation(tableName) {
    if (!catalog.isCached(tableName)) {
      catalog.cacheTable(tableName)
      println(s"Cached table: $tableName")
    } else {
      println(s"Table $tableName is already cached")
    }
  }
}

// Safe cleanup
def safeCleanup(tempViewName: String): Unit = {
  try {
    val dropped = catalog.dropTempView(tempViewName)
    if (dropped) {
      println(s"Dropped temporary view: $tempViewName")
    } else {
      println(s"Temporary view $tempViewName was not found")
    }
  } catch {
    case e: Exception =>
      println(s"Error dropping view $tempViewName: ${e.getMessage}")
  }
}

// Batch operations with error handling
def batchTableOperations(tableNames: Seq[String]): Unit = {
  tableNames.foreach { tableName =>
    safeTableOperation(tableName) {
      // Refresh and cache frequently used tables
      catalog.refreshTable(tableName)
      catalog.cacheTable(tableName)
      println(s"Processed: $tableName")
    }
  }
}