or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-management.mdcomplex-event-processing.mdcore-table-operations.mddatastream-integration.mdindex.mdsql-processing.mdtype-system.mduser-defined-functions.mdwindow-operations.md
tile.json

catalog-management.mddocs/

Catalog and Metadata Management

This document covers multi-catalog support with database and table metadata management capabilities in Apache Flink Table Uber Blink.

Catalog Operations

Catalog Registration

interface TableEnvironment {
    void registerCatalog(String catalogName, Catalog catalog);
    Optional<Catalog> getCatalog(String catalogName);
    void useCatalog(String catalogName);
    String getCurrentCatalog();
    void useDatabase(String databaseName);
    String getCurrentDatabase();
}

Usage:

// Register Hive catalog
HiveCatalog hiveCatalog = new HiveCatalog("myhive", "default", "/path/to/hive-conf");
tEnv.registerCatalog("myhive", hiveCatalog);

// Register JDBC catalog
JdbcCatalog jdbcCatalog = new PostgresCatalog("mypg", "testdb", "user", "pass", "jdbc:postgresql://localhost:5432/testdb");
tEnv.registerCatalog("mypg", jdbcCatalog);

// Switch catalog context
tEnv.useCatalog("myhive");
tEnv.useDatabase("production");

Built-in Catalogs

Generic In-Memory Catalog

class GenericInMemoryCatalog implements Catalog {
    GenericInMemoryCatalog(String name);
    GenericInMemoryCatalog(String name, String defaultDatabase);
}

Usage:

// Create and register in-memory catalog
GenericInMemoryCatalog memoryCatalog = new GenericInMemoryCatalog("memory_catalog", "my_db");
tEnv.registerCatalog("memory", memoryCatalog);

Hive Catalog

class HiveCatalog implements Catalog {
    HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir);
    HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hiveVersion);
    HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf);
    HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, String hiveVersion);
}

Usage:

// Register Hive catalog
HiveCatalog hive = new HiveCatalog(
    "myhive",           // catalog name
    "default",          // default database
    "/opt/hive/conf",   // hive conf directory
    "2.3.4"            // hive version
);
tEnv.registerCatalog("myhive", hive);

// Use Hive tables
tEnv.useCatalog("myhive");
Table hiveTable = tEnv.from("hive_database.hive_table");

JDBC Catalogs

class JdbcCatalog implements Catalog {
    JdbcCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);
}

class PostgresCatalog extends JdbcCatalog {
    PostgresCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);
}

class MySqlCatalog extends JdbcCatalog {
    MySqlCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);
}

Usage:

// PostgreSQL catalog
PostgresCatalog pgCatalog = new PostgresCatalog(
    "mypg", 
    "postgres", 
    "user", 
    "password", 
    "jdbc:postgresql://localhost:5432/"
);
tEnv.registerCatalog("mypg", pgCatalog);

// MySQL catalog
MySqlCatalog mysqlCatalog = new MySqlCatalog(
    "mysql", 
    "test", 
    "root", 
    "root", 
    "jdbc:mysql://localhost:3306"
);
tEnv.registerCatalog("mysql", mysqlCatalog);

Metadata Listing

Listing Operations

interface TableEnvironment {
    String[] listCatalogs();
    String[] listDatabases();
    String[] listTables();
    String[] listViews();
    String[] listUserDefinedFunctions();
    String[] listFunctions();
    String[] listModules();
}

Usage:

// List all metadata
String[] catalogs = tEnv.listCatalogs();
String[] databases = tEnv.listDatabases(); 
String[] tables = tEnv.listTables();
String[] views = tEnv.listViews();
String[] functions = tEnv.listUserDefinedFunctions();

// Print metadata hierarchy
for (String catalog : catalogs) {
    System.out.println("Catalog: " + catalog);
    tEnv.useCatalog(catalog);
    
    for (String database : tEnv.listDatabases()) {
        System.out.println("  Database: " + database);
        tEnv.useDatabase(database);
        
        for (String table : tEnv.listTables()) {
            System.out.println("    Table: " + table);
        }
    }
}

Database Operations

Database Management

interface Catalog {
    boolean databaseExists(String databaseName);
    void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);
    void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);
    void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists);
    List<String> listDatabases();
    CatalogDatabase getDatabase(String databaseName);
}

Usage:

// Create database
CatalogDatabase newDatabase = new CatalogDatabaseImpl(
    Map.of("location", "/path/to/database"),
    "My custom database"
);

Catalog catalog = tEnv.getCatalog("myhive").get();
catalog.createDatabase("my_db", newDatabase, false);

// Check database existence
boolean exists = catalog.databaseExists("my_db");

// Get database metadata
CatalogDatabase dbMeta = catalog.getDatabase("my_db");
Map<String, String> properties = dbMeta.getProperties();
String comment = dbMeta.getComment();

Database SQL Operations

-- Create database
CREATE DATABASE [IF NOT EXISTS] database_name 
[COMMENT 'comment'] 
[WITH (key1=val1, key2=val2, ...)];

-- Drop database  
DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];

-- Show databases
SHOW DATABASES;

-- Describe database
DESCRIBE DATABASE database_name;
DESC DATABASE database_name;

-- Use database
USE database_name;

Table Operations

Table Management

interface Catalog {
    boolean tableExists(ObjectPath tablePath);
    void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
    void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
    void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists);
    List<String> listTables(String databaseName);
    CatalogBaseTable getTable(ObjectPath tablePath);
    TableStatistics getTableStatistics(ObjectPath tablePath);
    CatalogTableStatistics getTableStatistics(ObjectPath tablePath);
    CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath);
}

Usage:

ObjectPath tablePath = new ObjectPath("my_db", "my_table");

// Check table existence
boolean tableExists = catalog.tableExists(tablePath);

// Get table metadata
CatalogBaseTable table = catalog.getTable(tablePath);
Map<String, String> options = table.getOptions();
TableSchema schema = table.getSchema();
String comment = table.getComment();

// Get table statistics
CatalogTableStatistics stats = catalog.getTableStatistics(tablePath);
long rowCount = stats.getRowCount();
Map<String, CatalogColumnStatistics> columnStats = stats.getColumnStatisticsData();

Object Path Handling

class ObjectPath {
    ObjectPath(String databaseName, String objectName);
    String getDatabaseName();
    String getObjectName();  
    String getFullName();
    
    static ObjectPath fromString(String fullName);
}

Usage:

// Create object paths
ObjectPath path1 = new ObjectPath("database", "table");
ObjectPath path2 = ObjectPath.fromString("database.table");

// Multi-part identifiers
String fullName = "catalog.database.table";
// Parse into components for object path
String[] parts = fullName.split("\\.");
ObjectPath path = new ObjectPath(parts[1], parts[2]); // database.table

Custom Catalog Implementation

Catalog Interface Implementation

public class CustomCatalog implements Catalog {
    private final String catalogName;
    private final String defaultDatabase;
    
    public CustomCatalog(String catalogName, String defaultDatabase) {
        this.catalogName = catalogName;
        this.defaultDatabase = defaultDatabase;
    }
    
    @Override
    public void open() throws CatalogException {
        // Initialize catalog connection
    }
    
    @Override
    public void close() throws CatalogException {
        // Clean up resources
    }
    
    @Override
    public String getDefaultDatabase() throws CatalogException {
        return defaultDatabase;
    }
    
    @Override
    public boolean databaseExists(String databaseName) throws CatalogException {
        // Implementation to check database existence
        return checkDatabaseExists(databaseName);
    }
    
    @Override
    public List<String> listDatabases() throws CatalogException {
        // Implementation to list databases
        return getDatabaseList();
    }
    
    @Override
    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
        // Implementation to check table existence
        return checkTableExists(tablePath);
    }
    
    @Override
    public List<String> listTables(String databaseName) throws CatalogException {
        // Implementation to list tables
        return getTableList(databaseName);
    }
    
    @Override
    public CatalogBaseTable getTable(ObjectPath tablePath) throws CatalogException {
        // Implementation to get table metadata
        return loadTableMetadata(tablePath);
    }
    
    // Implement other required methods...
}

Multi-Catalog Queries

Cross-Catalog Operations

-- Query across multiple catalogs
SELECT 
    h.user_id,
    h.purchase_amount,
    p.user_name,
    p.email
FROM myhive.sales.purchases h
JOIN mypg.users.profiles p ON h.user_id = p.user_id
WHERE h.purchase_date >= CURRENT_DATE - INTERVAL '7' DAY;

-- Insert from one catalog to another
INSERT INTO myhive.warehouse.sales_summary
SELECT 
    user_id,  
    SUM(amount) as total_amount,
    COUNT(*) as transaction_count
FROM mypg.transactional.orders
WHERE order_date >= CURRENT_DATE - INTERVAL '1' DAY
GROUP BY user_id;

Catalog Resolution

// Fully qualified table names
Table hiveTable = tEnv.from("myhive.production.user_events");
Table pgTable = tEnv.from("mypg.analytics.user_profiles");

// Join across catalogs
Table joined = hiveTable
    .join(pgTable, $("myhive.production.user_events.user_id").isEqual($("mypg.analytics.user_profiles.id")))
    .select($("user_id"), $("event_type"), $("name"), $("email"));

Configuration

Catalog Configuration

// Configure catalog properties
Map<String, String> hiveProperties = new HashMap<>();
hiveProperties.put("hive.metastore.uris", "thrift://localhost:9083");
hiveProperties.put("hadoop.conf.dir", "/etc/hadoop/conf");

HiveCatalog hiveCatalog = new HiveCatalog("myhive", "default", hiveProperties);

// JDBC catalog with connection pool
Map<String, String> jdbcProperties = new HashMap<>();
jdbcProperties.put("connection.pool.max-size", "10");
jdbcProperties.put("connection.timeout", "30000");

PostgresCatalog pgCatalog = new PostgresCatalog("mypg", "postgres", "user", "pass", "jdbc:postgresql://localhost:5432/", jdbcProperties);

Metadata Cache Configuration

Configuration config = tEnv.getConfig().getConfiguration();

// Configure metadata cache
config.setString("table.catalog.cache.expiration-time", "10 min");
config.setBoolean("table.catalog.cache.enabled", true);

// Configure Hive metastore client
config.setString("table.catalog.hive.metastore.client.factory", "org.apache.hadoop.hive.metastore.HiveMetaStoreClientFactory");

Error Handling

class CatalogException extends Exception {
    CatalogException(String message);
    CatalogException(String message, Throwable cause);
}

class DatabaseNotExistException extends CatalogException;
class DatabaseAlreadyExistException extends CatalogException;
class TableNotExistException extends CatalogException;
class TableAlreadyExistException extends CatalogException;

Types

interface CatalogDatabase {
    Map<String, String> getProperties();
    String getComment();
    CatalogDatabase copy();
    CatalogDatabase copy(Map<String, String> properties);
    Optional<String> getDescription();
    Optional<String> getDetailedDescription();
}

interface CatalogBaseTable {
    Map<String, String> getOptions();
    String getComment();
    CatalogBaseTable copy(Map<String, String> options);
    Optional<String> getDescription();
    Optional<String> getDetailedDescription();
    TableSchema getSchema();  // Deprecated
    Schema getUnresolvedSchema();
}

class CatalogTable implements CatalogBaseTable {
    CatalogTable(TableSchema tableSchema, Map<String, String> properties, String comment);
    CatalogTable(Schema schema, String comment, List<String> partitionKeys, Map<String, String> options);
    
    boolean isPartitioned();
    List<String> getPartitionKeys();
}

class CatalogView implements CatalogBaseTable {
    CatalogView(String originalQuery, String expandedQuery, TableSchema schema, Map<String, String> properties, String comment);
    
    String getOriginalQuery();
    String getExpandedQuery();
}

interface CatalogFunction {
    String getClassName();
    FunctionLanguage getFunctionLanguage();
    List<String> getFunctionResources();
}

enum FunctionLanguage {
    JVM,
    PYTHON,
    SCALA
}