or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mdcolumns-functions.mddata-io.mddataset-dataframe.mdindex.mdsession-management.mdstreaming.mdtypes-encoders.mdudfs.md
tile.json

catalog.mddocs/

Metadata and Catalog Operations

Database, table, and metadata management through the catalog interface. Provides programmatic access to metastore operations, table management, and schema inspection capabilities.

Capabilities

Catalog Interface

Main interface for catalog operations and metadata management.

/**
 * Interface for catalog operations (databases, tables, functions, columns)
 */
trait Catalog {
  /** Get current database name */
  def currentDatabase(): String
  
  /** Set current database */
  def setCurrentDatabase(dbName: String): Unit
  
  /** List all available databases */
  def listDatabases(): Dataset[Database]
  
  /** List tables in current database */
  def listTables(): Dataset[Table]
  
  /** List tables in specified database */
  def listTables(dbName: String): Dataset[Table]
  
  /** List columns for specified table */
  def listColumns(tableName: String): Dataset[Column]
  def listColumns(dbName: String, tableName: String): Dataset[Column]
  
  /** List all available functions */
  def listFunctions(): Dataset[Function]
  
  /** List functions in specified database */
  def listFunctions(dbName: String): Dataset[Function]
  
  /** Check if database exists */
  def databaseExists(dbName: String): Boolean
  
  /** Check if table exists */
  def tableExists(tableName: String): Boolean
  def tableExists(dbName: String, tableName: String): Boolean
  
  /** Check if function exists */
  def functionExists(functionName: String): Boolean
  def functionExists(dbName: String, functionName: String): Boolean
  
  /** Get table metadata */
  def getTable(tableName: String): Table
  def getTable(dbName: String, tableName: String): Table
  
  /** Get function metadata */
  def getFunction(functionName: String): Function
  def getFunction(dbName: String, functionName: String): Function
  
  /** Cache table in memory */
  def cacheTable(tableName: String): Unit
  def cacheTable(tableName: String, storageLevel: StorageLevel): Unit
  
  /** Uncache table from memory */
  def uncacheTable(tableName: String): Unit
  
  /** Check if table is cached */
  def isCached(tableName: String): Boolean
  
  /** Clear all cached tables */
  def clearCache(): Unit
  
  /** Refresh table metadata */
  def refreshTable(tableName: String): Unit
  
  /** Refresh function metadata */
  def refreshFunction(functionName: String): Unit
  
  /** Create database */
  def createDatabase(dbName: String, ignoreIfExists: Boolean): Unit
  def createDatabase(dbName: String, ignoreIfExists: Boolean, path: String): Unit
  
  /** Drop database */
  def dropDatabase(dbName: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
  
  /** Create table */
  def createTable(tableName: String, path: String): DataFrame
  def createTable(tableName: String, path: String, source: String): DataFrame
  def createTable(tableName: String, source: String, options: Map[String, String]): DataFrame
  def createTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame
  
  /** Drop table */
  def dropTempView(viewName: String): Boolean
  def dropGlobalTempView(viewName: String): Boolean
  
  /** Recover partitions for table */
  def recoverPartitions(tableName: String): Unit
}

Database Metadata

Represents database information in the catalog.

/**
 * Database metadata from the catalog
 */
case class Database(
  name: String,                    // Database name
  description: String,             // Database description
  locationUri: String              // Database location URI
)

Table Metadata

Represents table information in the catalog.

/**
 * Table metadata from the catalog
 */
case class Table(
  name: String,                    // Table name
  database: String,                // Database name
  description: String,             // Table description
  tableType: String,               // Table type (MANAGED, EXTERNAL, VIEW, etc.)
  isTemporary: Boolean             // Whether table is temporary
)

Column Metadata

Represents column information in the catalog.

/**
 * Column metadata from the catalog
 */
case class Column(
  name: String,                    // Column name
  description: String,             // Column description
  dataType: String,                // Column data type
  nullable: Boolean,               // Whether column can be null
  isPartition: Boolean,            // Whether column is partition key
  isBucket: Boolean                // Whether column is bucket key
)

Function Metadata

Represents function information in the catalog.

/**
 * Function metadata from the catalog
 */
case class Function(
  name: String,                    // Function name
  database: String,                // Database name
  description: String,             // Function description
  className: String,               // Implementation class name
  isTemporary: Boolean             // Whether function is temporary
)

Usage Examples:

val catalog = spark.catalog

// Database operations
println(s"Current database: ${catalog.currentDatabase()}")

val databases = catalog.listDatabases().collect()
databases.foreach(db => println(s"Database: ${db.name} at ${db.locationUri}"))

catalog.setCurrentDatabase("my_database")

// Table operations
val tables = catalog.listTables().collect()
tables.foreach(table => 
  println(s"Table: ${table.name} (${table.tableType}) in ${table.database}")
)

// Check existence
if (catalog.tableExists("employees")) {
  val tableInfo = catalog.getTable("employees")
  println(s"Table type: ${tableInfo.tableType}")
  
  // Get column information
  val columns = catalog.listColumns("employees").collect()
  columns.foreach(col => 
    println(s"Column: ${col.name} (${col.dataType}, nullable: ${col.nullable})")
  )
}

// Function operations
val functions = catalog.listFunctions().collect()
functions.foreach(func => 
  println(s"Function: ${func.name} (${func.className})")
)

// Caching operations
catalog.cacheTable("frequently_used_table")
println(s"Table cached: ${catalog.isCached("frequently_used_table")}")

catalog.uncacheTable("frequently_used_table")

Table Management Operations

Advanced table management and DDL operations.

Creating and dropping tables:

// Create external table
catalog.createTable(
  tableName = "external_data",
  path = "s3://bucket/path/to/data",
  source = "parquet"
)

// Create table with schema and options
val schema = StructType(Seq(
  StructField("id", LongType, nullable = false),
  StructField("name", StringType, nullable = true),
  StructField("created_date", DateType, nullable = true)
))

catalog.createTable(
  tableName = "structured_data",
  source = "delta",
  schema = schema,
  options = Map(
    "path" -> "s3://bucket/delta/table",
    "overwriteSchema" -> "true"
  )
)

// Create database
catalog.createDatabase("analytics", ignoreIfExists = true)
catalog.createDatabase("data_lake", ignoreIfExists = true, path = "s3://bucket/databases/data_lake")

// Drop database (cascade removes all tables)
catalog.dropDatabase("old_database", ignoreIfNotExists = true, cascade = true)

// Create temporary views
spark.sql("""
  CREATE OR REPLACE TEMPORARY VIEW active_users AS
  SELECT * FROM users WHERE status = 'active'
""")

// Drop temporary views
catalog.dropTempView("active_users")

Caching and Performance

Table caching for improved query performance.

import org.apache.spark.storage.StorageLevel

// Cache with default storage level (MEMORY_AND_DISK)
catalog.cacheTable("hot_data")

// Cache with specific storage level
catalog.cacheTable("lookup_table", StorageLevel.MEMORY_ONLY)

// Check cache status
val cachedTables = catalog.listTables()
  .filter(_.isTemporary == false)
  .collect()
  .filter(table => catalog.isCached(table.name))

cachedTables.foreach(table => 
  println(s"Cached table: ${table.name}")
)

// Clear specific table from cache
catalog.uncacheTable("hot_data")

// Clear all cached tables
catalog.clearCache()

// Refresh table metadata after external changes
catalog.refreshTable("external_table")

Partition Management

Working with partitioned tables.

// Recover partitions for external tables
catalog.recoverPartitions("partitioned_external_table")

// List partitions (using SQL for detailed partition info)
val partitions = spark.sql("SHOW PARTITIONS partitioned_table").collect()
partitions.foreach(row => println(s"Partition: ${row.getString(0)}"))

// Add specific partitions
spark.sql("""
  ALTER TABLE partitioned_table 
  ADD PARTITION (year=2023, month=12) 
  LOCATION 's3://bucket/data/year=2023/month=12'
""")

// Drop partition
spark.sql("""
  ALTER TABLE partitioned_table 
  DROP PARTITION (year=2022, month=01)
""")

Advanced Catalog Queries

Complex metadata queries and analysis.

// Find large tables
val largeTables = catalog.listTables()
  .select("name", "database", "tableType")
  .collect()
  .filter(_.getString(2) != "VIEW")  // Exclude views

// Analyze table schemas
def analyzeTableSchema(tableName: String): Unit = {
  val columns = catalog.listColumns(tableName).collect()
  
  println(s"Schema analysis for $tableName:")
  println(s"Total columns: ${columns.length}")
  
  val partitionCols = columns.filter(_.isPartition)
  if (partitionCols.nonEmpty) {
    println(s"Partition columns: ${partitionCols.map(_.name).mkString(", ")}")
  }
  
  val bucketCols = columns.filter(_.isBucket)
  if (bucketCols.nonEmpty) {
    println(s"Bucket columns: ${bucketCols.map(_.name).mkString(", ")}")
  }
  
  val dataTypes = columns.groupBy(_.dataType).mapValues(_.length)
  println("Data type distribution:")
  dataTypes.foreach { case (dataType, count) =>
    println(s"  $dataType: $count columns")
  }
}

// Find tables with specific patterns
val userTables = catalog.listTables()
  .filter(col("name").like("%user%"))
  .collect()

// Cross-database analysis
val allDatabases = catalog.listDatabases().collect()
allDatabases.foreach { db =>
  val tableCount = catalog.listTables(db.name).count()
  println(s"Database ${db.name}: $tableCount tables")
}

Integration with External Metastores

Working with Hive metastore and external catalog systems.

// Enable Hive support for metastore integration
val spark = SparkSession.builder()
  .appName("Catalog Operations")
  .enableHiveSupport()
  .getOrCreate()

// Set Hive metastore location
spark.conf.set("spark.sql.warehouse.dir", "/path/to/warehouse")
spark.conf.set("hive.metastore.uris", "thrift://metastore-host:9083")

// Create Hive-managed table
spark.sql("""
  CREATE TABLE IF NOT EXISTS hive_table (
    id BIGINT,
    name STRING,
    created_date DATE
  )
  USING HIVE
  PARTITIONED BY (year INT, month INT)
  STORED AS PARQUET
""")

// Register external table in Hive metastore
spark.sql("""
  CREATE TABLE external_parquet_table (
    id BIGINT,
    name STRING,
    value DOUBLE
  )
  USING PARQUET
  LOCATION 's3://bucket/path/to/parquet'
""")

// Work with Hive databases
spark.sql("CREATE DATABASE IF NOT EXISTS hive_db")
catalog.setCurrentDatabase("hive_db")

// MSCK repair for Hive tables
spark.sql("MSCK REPAIR TABLE partitioned_hive_table")

Catalog Utility Functions

Helper functions for common catalog operations.

object CatalogUtils {
  def tableExists(spark: SparkSession, dbName: String, tableName: String): Boolean = {
    try {
      spark.catalog.getTable(dbName, tableName)
      true
    } catch {
      case _: AnalysisException => false
    }
  }
  
  def createDatabaseIfNotExists(spark: SparkSession, dbName: String, location: Option[String] = None): Unit = {
    if (!spark.catalog.databaseExists(dbName)) {
      location match {
        case Some(path) => spark.catalog.createDatabase(dbName, ignoreIfExists = true, path)
        case None => spark.catalog.createDatabase(dbName, ignoreIfExists = true)
      }
      println(s"Created database: $dbName")
    } else {
      println(s"Database already exists: $dbName")
    }
  }
  
  def getTableStatistics(spark: SparkSession, tableName: String): Map[String, Any] = {
    val table = spark.catalog.getTable(tableName)
    val columns = spark.catalog.listColumns(tableName).collect()
    
    Map(
      "tableName" -> table.name,
      "database" -> table.database,
      "tableType" -> table.tableType,
      "isTemporary" -> table.isTemporary,
      "columnCount" -> columns.length,
      "partitionColumns" -> columns.filter(_.isPartition).map(_.name),
      "bucketColumns" -> columns.filter(_.isBucket).map(_.name),
      "dataTypes" -> columns.map(_.dataType).distinct
    )
  }
  
  def copyTableSchema(spark: SparkSession, sourceTable: String, targetTable: String): Unit = {
    val sourceColumns = spark.catalog.listColumns(sourceTable).collect()
    val schema = StructType(sourceColumns.map { col =>
      val dataType = DataType.fromDDL(col.dataType)
      StructField(col.name, dataType, col.nullable)
    })
    
    // Create empty table with same schema
    val emptyDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
    emptyDf.write.saveAsTable(targetTable)
  }
}

// Usage examples
CatalogUtils.createDatabaseIfNotExists(spark, "analytics", Some("s3://bucket/analytics"))

val stats = CatalogUtils.getTableStatistics(spark, "employees")
println(s"Table statistics: $stats")

if (CatalogUtils.tableExists(spark, "default", "source_table")) {
  CatalogUtils.copyTableSchema(spark, "source_table", "target_table")
}