This document covers multi-catalog support with database and table metadata management capabilities in Apache Flink Table Uber Blink.
interface TableEnvironment {
void registerCatalog(String catalogName, Catalog catalog);
Optional<Catalog> getCatalog(String catalogName);
void useCatalog(String catalogName);
String getCurrentCatalog();
void useDatabase(String databaseName);
String getCurrentDatabase();
}Usage:
// Register Hive catalog
HiveCatalog hiveCatalog = new HiveCatalog("myhive", "default", "/path/to/hive-conf");
tEnv.registerCatalog("myhive", hiveCatalog);
// Register JDBC catalog
JdbcCatalog jdbcCatalog = new PostgresCatalog("mypg", "testdb", "user", "pass", "jdbc:postgresql://localhost:5432/testdb");
tEnv.registerCatalog("mypg", jdbcCatalog);
// Switch catalog context
tEnv.useCatalog("myhive");
tEnv.useDatabase("production");class GenericInMemoryCatalog implements Catalog {
GenericInMemoryCatalog(String name);
GenericInMemoryCatalog(String name, String defaultDatabase);
}Usage:
// Create and register in-memory catalog
GenericInMemoryCatalog memoryCatalog = new GenericInMemoryCatalog("memory_catalog", "my_db");
tEnv.registerCatalog("memory", memoryCatalog);class HiveCatalog implements Catalog {
HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir);
HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hiveVersion);
HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf);
HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, String hiveVersion);
}Usage:
// Register Hive catalog
HiveCatalog hive = new HiveCatalog(
"myhive", // catalog name
"default", // default database
"/opt/hive/conf", // hive conf directory
"2.3.4" // hive version
);
tEnv.registerCatalog("myhive", hive);
// Use Hive tables
tEnv.useCatalog("myhive");
Table hiveTable = tEnv.from("hive_database.hive_table");class JdbcCatalog implements Catalog {
JdbcCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);
}
class PostgresCatalog extends JdbcCatalog {
PostgresCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);
}
class MySqlCatalog extends JdbcCatalog {
MySqlCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);
}Usage:
// PostgreSQL catalog
PostgresCatalog pgCatalog = new PostgresCatalog(
"mypg",
"postgres",
"user",
"password",
"jdbc:postgresql://localhost:5432/"
);
tEnv.registerCatalog("mypg", pgCatalog);
// MySQL catalog
MySqlCatalog mysqlCatalog = new MySqlCatalog(
"mysql",
"test",
"root",
"root",
"jdbc:mysql://localhost:3306"
);
tEnv.registerCatalog("mysql", mysqlCatalog);interface TableEnvironment {
String[] listCatalogs();
String[] listDatabases();
String[] listTables();
String[] listViews();
String[] listUserDefinedFunctions();
String[] listFunctions();
String[] listModules();
}Usage:
// List all metadata
String[] catalogs = tEnv.listCatalogs();
String[] databases = tEnv.listDatabases();
String[] tables = tEnv.listTables();
String[] views = tEnv.listViews();
String[] functions = tEnv.listUserDefinedFunctions();
// Print metadata hierarchy
for (String catalog : catalogs) {
System.out.println("Catalog: " + catalog);
tEnv.useCatalog(catalog);
for (String database : tEnv.listDatabases()) {
System.out.println(" Database: " + database);
tEnv.useDatabase(database);
for (String table : tEnv.listTables()) {
System.out.println(" Table: " + table);
}
}
}interface Catalog {
boolean databaseExists(String databaseName);
void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);
void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);
void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists);
List<String> listDatabases();
CatalogDatabase getDatabase(String databaseName);
}Usage:
// Create database
CatalogDatabase newDatabase = new CatalogDatabaseImpl(
Map.of("location", "/path/to/database"),
"My custom database"
);
Catalog catalog = tEnv.getCatalog("myhive").get();
catalog.createDatabase("my_db", newDatabase, false);
// Check database existence
boolean exists = catalog.databaseExists("my_db");
// Get database metadata
CatalogDatabase dbMeta = catalog.getDatabase("my_db");
Map<String, String> properties = dbMeta.getProperties();
String comment = dbMeta.getComment();-- Create database
CREATE DATABASE [IF NOT EXISTS] database_name
[COMMENT 'comment']
[WITH (key1=val1, key2=val2, ...)];
-- Drop database
DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];
-- Show databases
SHOW DATABASES;
-- Describe database
DESCRIBE DATABASE database_name;
DESC DATABASE database_name;
-- Use database
USE database_name;interface Catalog {
boolean tableExists(ObjectPath tablePath);
void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists);
List<String> listTables(String databaseName);
CatalogBaseTable getTable(ObjectPath tablePath);
TableStatistics getTableStatistics(ObjectPath tablePath);
CatalogTableStatistics getTableStatistics(ObjectPath tablePath);
CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath);
}Usage:
ObjectPath tablePath = new ObjectPath("my_db", "my_table");
// Check table existence
boolean tableExists = catalog.tableExists(tablePath);
// Get table metadata
CatalogBaseTable table = catalog.getTable(tablePath);
Map<String, String> options = table.getOptions();
TableSchema schema = table.getSchema();
String comment = table.getComment();
// Get table statistics
CatalogTableStatistics stats = catalog.getTableStatistics(tablePath);
long rowCount = stats.getRowCount();
Map<String, CatalogColumnStatistics> columnStats = stats.getColumnStatisticsData();class ObjectPath {
ObjectPath(String databaseName, String objectName);
String getDatabaseName();
String getObjectName();
String getFullName();
static ObjectPath fromString(String fullName);
}Usage:
// Create object paths
ObjectPath path1 = new ObjectPath("database", "table");
ObjectPath path2 = ObjectPath.fromString("database.table");
// Multi-part identifiers
String fullName = "catalog.database.table";
// Parse into components for object path
String[] parts = fullName.split("\\.");
ObjectPath path = new ObjectPath(parts[1], parts[2]); // database.tablepublic class CustomCatalog implements Catalog {
private final String catalogName;
private final String defaultDatabase;
public CustomCatalog(String catalogName, String defaultDatabase) {
this.catalogName = catalogName;
this.defaultDatabase = defaultDatabase;
}
@Override
public void open() throws CatalogException {
// Initialize catalog connection
}
@Override
public void close() throws CatalogException {
// Clean up resources
}
@Override
public String getDefaultDatabase() throws CatalogException {
return defaultDatabase;
}
@Override
public boolean databaseExists(String databaseName) throws CatalogException {
// Implementation to check database existence
return checkDatabaseExists(databaseName);
}
@Override
public List<String> listDatabases() throws CatalogException {
// Implementation to list databases
return getDatabaseList();
}
@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
// Implementation to check table existence
return checkTableExists(tablePath);
}
@Override
public List<String> listTables(String databaseName) throws CatalogException {
// Implementation to list tables
return getTableList(databaseName);
}
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws CatalogException {
// Implementation to get table metadata
return loadTableMetadata(tablePath);
}
// Implement other required methods...
}-- Query across multiple catalogs
SELECT
h.user_id,
h.purchase_amount,
p.user_name,
p.email
FROM myhive.sales.purchases h
JOIN mypg.users.profiles p ON h.user_id = p.user_id
WHERE h.purchase_date >= CURRENT_DATE - INTERVAL '7' DAY;
-- Insert from one catalog to another
INSERT INTO myhive.warehouse.sales_summary
SELECT
user_id,
SUM(amount) as total_amount,
COUNT(*) as transaction_count
FROM mypg.transactional.orders
WHERE order_date >= CURRENT_DATE - INTERVAL '1' DAY
GROUP BY user_id;// Fully qualified table names
Table hiveTable = tEnv.from("myhive.production.user_events");
Table pgTable = tEnv.from("mypg.analytics.user_profiles");
// Join across catalogs
Table joined = hiveTable
.join(pgTable, $("myhive.production.user_events.user_id").isEqual($("mypg.analytics.user_profiles.id")))
.select($("user_id"), $("event_type"), $("name"), $("email"));// Configure catalog properties
Map<String, String> hiveProperties = new HashMap<>();
hiveProperties.put("hive.metastore.uris", "thrift://localhost:9083");
hiveProperties.put("hadoop.conf.dir", "/etc/hadoop/conf");
HiveCatalog hiveCatalog = new HiveCatalog("myhive", "default", hiveProperties);
// JDBC catalog with connection pool
Map<String, String> jdbcProperties = new HashMap<>();
jdbcProperties.put("connection.pool.max-size", "10");
jdbcProperties.put("connection.timeout", "30000");
PostgresCatalog pgCatalog = new PostgresCatalog("mypg", "postgres", "user", "pass", "jdbc:postgresql://localhost:5432/", jdbcProperties);Configuration config = tEnv.getConfig().getConfiguration();
// Configure metadata cache
config.setString("table.catalog.cache.expiration-time", "10 min");
config.setBoolean("table.catalog.cache.enabled", true);
// Configure Hive metastore client
config.setString("table.catalog.hive.metastore.client.factory", "org.apache.hadoop.hive.metastore.HiveMetaStoreClientFactory");class CatalogException extends Exception {
CatalogException(String message);
CatalogException(String message, Throwable cause);
}
class DatabaseNotExistException extends CatalogException;
class DatabaseAlreadyExistException extends CatalogException;
class TableNotExistException extends CatalogException;
class TableAlreadyExistException extends CatalogException;interface CatalogDatabase {
Map<String, String> getProperties();
String getComment();
CatalogDatabase copy();
CatalogDatabase copy(Map<String, String> properties);
Optional<String> getDescription();
Optional<String> getDetailedDescription();
}
interface CatalogBaseTable {
Map<String, String> getOptions();
String getComment();
CatalogBaseTable copy(Map<String, String> options);
Optional<String> getDescription();
Optional<String> getDetailedDescription();
TableSchema getSchema(); // Deprecated
Schema getUnresolvedSchema();
}
class CatalogTable implements CatalogBaseTable {
CatalogTable(TableSchema tableSchema, Map<String, String> properties, String comment);
CatalogTable(Schema schema, String comment, List<String> partitionKeys, Map<String, String> options);
boolean isPartitioned();
List<String> getPartitionKeys();
}
class CatalogView implements CatalogBaseTable {
CatalogView(String originalQuery, String expandedQuery, TableSchema schema, Map<String, String> properties, String comment);
String getOriginalQuery();
String getExpandedQuery();
}
interface CatalogFunction {
String getClassName();
FunctionLanguage getFunctionLanguage();
List<String> getFunctionResources();
}
enum FunctionLanguage {
JVM,
PYTHON,
SCALA
}