Apache Flink's Table API and SQL module for unified stream and batch processing
—
Flink's catalog system provides pluggable metadata management for tables, functions, databases, and user-defined catalogs. It supports persistent storage, schema evolution, and integration with external metadata systems like Hive Metastore.
Manage multiple catalogs and switch between different metadata repositories.
/**
* Registers a catalog under a unique name
* @param catalogName Name for the catalog
* @param catalog Catalog implementation to register
*/
void registerCatalog(String catalogName, Catalog catalog);
/**
* Gets a registered catalog by name
* @param catalogName Name of the catalog to retrieve
* @return Optional containing the catalog if found
*/
Optional<Catalog> getCatalog(String catalogName);
/**
* Sets the current catalog for table operations
* @param catalogName Name of the catalog to use as current
*/
void useCatalog(String catalogName);
/**
* Gets the name of the current catalog
* @return Current catalog name
*/
String getCurrentCatalog();
/**
* Lists all registered catalog names
* @return Array of catalog names
*/
String[] listCatalogs();Usage Examples:
// Register different catalog types
HiveCatalog hiveCatalog = new HiveCatalog(
"my_hive",
"default",
"path/to/hive-conf",
"2.3.4"
);
tableEnv.registerCatalog("hive_catalog", hiveCatalog);
JdbcCatalog jdbcCatalog = new JdbcCatalog(
"my_jdbc_catalog",
"default",
"postgres",
"jdbc:postgresql://localhost:5432/metadata",
"username",
"password"
);
tableEnv.registerCatalog("postgres_catalog", jdbcCatalog);
// Switch between catalogs
tableEnv.useCatalog("hive_catalog");
String[] hiveTables = tableEnv.listTables();
tableEnv.useCatalog("postgres_catalog");
String[] postgresTables = tableEnv.listTables();
// Get current context
String currentCatalog = tableEnv.getCurrentCatalog();Manage databases within catalogs with full CRUD operations.
interface Catalog {
/**
* Creates a database in the catalog
* @param name Database name
* @param database Database metadata
* @param ignoreIfExists Skip creation if database already exists
*/
void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException;
/**
* Drops a database from the catalog
* @param name Database name to drop
* @param ignoreIfNotExists Skip error if database doesn't exist
* @param cascade Drop all tables in the database
*/
void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException;
/**
* Lists all databases in the catalog
* @return List of database names
*/
List<String> listDatabases() throws CatalogException;
/**
* Gets database metadata
* @param databaseName Name of the database
* @return CatalogDatabase containing metadata
*/
CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException;
/**
* Checks if a database exists
* @param databaseName Name of the database to check
* @return true if database exists
*/
boolean databaseExists(String databaseName) throws CatalogException;
}Usage Examples:
// Create database with properties
Map<String, String> dbProperties = new HashMap<>();
dbProperties.put("owner", "analytics_team");
dbProperties.put("created_date", "2024-01-01");
CatalogDatabase analyticsDb = new CatalogDatabaseImpl(
dbProperties,
"Database for analytics workflows"
);
Catalog catalog = tableEnv.getCatalog("hive_catalog").get();
catalog.createDatabase("analytics", analyticsDb, false);
// Use the new database
tableEnv.useDatabase("analytics");
// List databases
List<String> databases = catalog.listDatabases();
for (String db : databases) {
System.out.println("Database: " + db);
}Comprehensive table management with metadata, partitioning, and constraints.
interface Catalog {
/**
* Creates a table in the catalog
* @param tablePath Path to the table (database.table)
* @param table Table metadata and schema
* @param ignoreIfExists Skip creation if table already exists
*/
void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;
/**
* Drops a table from the catalog
* @param tablePath Path to the table to drop
* @param ignoreIfNotExists Skip error if table doesn't exist
*/
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException;
/**
* Lists all tables in a database
* @param databaseName Database name
* @return List of table names
*/
List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException;
/**
* Gets table metadata
* @param tablePath Path to the table
* @return CatalogTable containing metadata
*/
CatalogTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException;
/**
* Checks if a table exists
* @param tablePath Path to the table to check
* @return true if table exists
*/
boolean tableExists(ObjectPath tablePath) throws CatalogException;
/**
* Renames a table
* @param tablePath Current table path
* @param newTableName New table name
*/
void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, CatalogException;
/**
* Alters table metadata
* @param tablePath Path to the table
* @param newTable New table metadata
* @param ignoreIfNotExists Skip error if table doesn't exist
*/
void alterTable(ObjectPath tablePath, CatalogTable newTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException;
}Usage Examples:
// Create table with comprehensive metadata
Schema schema = Schema.newBuilder()
.column("order_id", DataTypes.BIGINT())
.column("customer_id", DataTypes.BIGINT())
.column("product_id", DataTypes.BIGINT())
.column("quantity", DataTypes.INT())
.column("unit_price", DataTypes.DECIMAL(10, 2))
.column("order_date", DataTypes.DATE())
.column("region", DataTypes.STRING())
.primaryKey("order_id")
.build();
Map<String, String> properties = new HashMap<>();
properties.put("connector", "kafka");
properties.put("topic", "orders");
properties.put("properties.bootstrap.servers", "localhost:9092");
properties.put("format", "json");
CatalogTable ordersTable = CatalogTable.of(
schema,
"Orders table for e-commerce analytics",
Arrays.asList("region", "order_date"), // Partition keys
properties
);
ObjectPath tablePath = new ObjectPath("analytics", "orders");
catalog.createTable(tablePath, ordersTable, false);
// List and inspect tables
List<String> tables = catalog.listTables("analytics");
for (String tableName : tables) {
ObjectPath path = new ObjectPath("analytics", tableName);
CatalogTable table = catalog.getTable(path);
System.out.println("Table: " + tableName);
System.out.println("Schema: " + table.getUnresolvedSchema());
System.out.println("Properties: " + table.getOptions());
}Manage user-defined functions in the catalog with versioning and metadata.
interface Catalog {
/**
* Creates a function in the catalog
* @param functionPath Path to the function (database.function)
* @param function Function metadata
* @param ignoreIfExists Skip creation if function already exists
*/
void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;
/**
* Drops a function from the catalog
* @param functionPath Path to the function to drop
* @param ignoreIfNotExists Skip error if function doesn't exist
*/
void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException;
/**
* Lists all functions in a database
* @param databaseName Database name
* @return List of function names
*/
List<String> listFunctions(String databaseName)
throws DatabaseNotExistException, CatalogException;
/**
* Gets function metadata
* @param functionPath Path to the function
* @return CatalogFunction containing metadata
*/
CatalogFunction getFunction(ObjectPath functionPath)
throws FunctionNotExistException, CatalogException;
/**
* Checks if a function exists
* @param functionPath Path to the function to check
* @return true if function exists
*/
boolean functionExists(ObjectPath functionPath) throws CatalogException;
}Usage Examples:
// Register UDF in catalog
CatalogFunction myFunction = new CatalogFunctionImpl(
"com.company.functions.MyCustomFunction",
FunctionLanguage.JAVA,
Arrays.asList("dependency1.jar", "dependency2.jar"),
"Custom function for business logic"
);
ObjectPath functionPath = new ObjectPath("analytics", "my_custom_function");
catalog.createFunction(functionPath, myFunction, false);
// Use function in SQL
tableEnv.useCatalog("hive_catalog");
tableEnv.useDatabase("analytics");
Table result = tableEnv.sqlQuery(
"SELECT customer_id, my_custom_function(customer_data) as processed_data " +
"FROM customers"
);Handle partitioned tables with dynamic partition discovery and pruning.
interface Catalog {
/**
* Lists all partitions of a partitioned table
* @param tablePath Path to the partitioned table
* @return List of partition specifications
*/
List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException;
/**
* Lists partitions matching a partial specification
* @param tablePath Path to the partitioned table
* @param partitionSpec Partial partition specification for filtering
* @return List of matching partition specifications
*/
List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws TableNotExistException, TableNotPartitionedException, CatalogException;
/**
* Gets partition metadata
* @param tablePath Path to the partitioned table
* @param partitionSpec Partition specification
* @return CatalogPartition containing metadata
*/
CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException;
/**
* Checks if a partition exists
* @param tablePath Path to the partitioned table
* @param partitionSpec Partition specification to check
* @return true if partition exists
*/
boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws CatalogException;
}Usage Examples:
// Work with partitioned tables
ObjectPath partitionedTable = new ObjectPath("analytics", "daily_sales");
// List all partitions
List<CatalogPartitionSpec> allPartitions = catalog.listPartitions(partitionedTable);
for (CatalogPartitionSpec spec : allPartitions) {
System.out.println("Partition: " + spec.getPartitionSpec());
}
// List partitions for specific year
CatalogPartitionSpec yearFilter = new CatalogPartitionSpec(
Collections.singletonMap("year", "2024")
);
List<CatalogPartitionSpec> yearPartitions = catalog.listPartitions(partitionedTable, yearFilter);
// Check specific partition
CatalogPartitionSpec specificPartition = new CatalogPartitionSpec(
Map.of("year", "2024", "month", "01", "day", "15")
);
boolean exists = catalog.partitionExists(partitionedTable, specificPartition);Navigate catalog hierarchies with full path resolution and validation.
class ObjectPath {
/**
* Creates an object path for database.object
* @param databaseName Database name
* @param objectName Object name (table, function, etc.)
*/
ObjectPath(String databaseName, String objectName);
/**
* Gets the database name
* @return Database name
*/
String getDatabaseName();
/**
* Gets the object name
* @return Object name
*/
String getObjectName();
/**
* Gets the full path as a string
* @return String representation of the path
*/
String getFullName();
}
class ObjectIdentifier {
/**
* Creates a full object identifier
* @param catalogName Catalog name
* @param databaseName Database name
* @param objectName Object name
*/
static ObjectIdentifier of(String catalogName, String databaseName, String objectName);
/**
* Gets the catalog name
* @return Catalog name
*/
String getCatalogName();
/**
* Gets the database name
* @return Database name
*/
String getDatabaseName();
/**
* Gets the object name
* @return Object name
*/
String getObjectName();
}Usage Examples:
// Object path resolution
ObjectPath tablePath = new ObjectPath("sales_db", "orders");
ObjectIdentifier fullIdentifier = ObjectIdentifier.of("hive_catalog", "sales_db", "orders");
// Use in catalog operations
CatalogTable table = catalog.getTable(tablePath);
String fullPath = fullIdentifier.getCatalogName() + "." +
fullIdentifier.getDatabaseName() + "." +
fullIdentifier.getObjectName();interface Catalog {
// Database operations
void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);
void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);
List<String> listDatabases();
CatalogDatabase getDatabase(String databaseName);
boolean databaseExists(String databaseName);
// Table operations
void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists);
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
List<String> listTables(String databaseName);
CatalogTable getTable(ObjectPath tablePath);
boolean tableExists(ObjectPath tablePath);
// Function operations
void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists);
void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists);
List<String> listFunctions(String databaseName);
CatalogFunction getFunction(ObjectPath functionPath);
boolean functionExists(ObjectPath functionPath);
}interface CatalogDatabase {
Map<String, String> getProperties();
String getComment();
CatalogDatabase copy();
CatalogDatabase copy(Map<String, String> properties);
}
interface CatalogTable extends CatalogBaseTable {
boolean isPartitioned();
List<String> getPartitionKeys();
CatalogTable copy();
CatalogTable copy(Map<String, String> options);
}
interface CatalogFunction {
String getClassName();
FunctionLanguage getLanguage();
List<String> getFunctionResources();
String getDescription();
CatalogFunction copy();
}
enum FunctionLanguage {
JVM,
PYTHON,
SCALA
}class CatalogException extends Exception { }
class DatabaseAlreadyExistException extends CatalogException { }
class DatabaseNotExistException extends CatalogException { }
class DatabaseNotEmptyException extends CatalogException { }
class TableAlreadyExistException extends CatalogException { }
class TableNotExistException extends CatalogException { }
class FunctionAlreadyExistException extends CatalogException { }
class FunctionNotExistException extends CatalogException { }
class PartitionNotExistException extends CatalogException { }
class TableNotPartitionedException extends CatalogException { }Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table