Java API for Apache Flink's Table ecosystem, enabling type-safe table operations and SQL query execution.
—
The catalog system manages metadata for tables, functions, and data sources in Flink. It provides a centralized registry for database objects and supports multiple catalog backends with persistent storage capabilities.
Register and manage multiple catalogs within a table environment.
/**
* Register a catalog instance with the specified name
* @param catalogName Name to register catalog under
* @param catalog Catalog implementation instance
*/
public void registerCatalog(String catalogName, Catalog catalog);
/**
* Get a registered catalog by name
* @param catalogName Name of the catalog to retrieve
* @return Optional containing the catalog if found
*/
public Optional<Catalog> getCatalog(String catalogName);
/**
* Set the current catalog for table resolution
* @param catalogName Name of catalog to set as current
*/
public void useCatalog(String catalogName);
/**
* Set the current database within the current catalog
* @param databaseName Name of database to set as current
*/
public void useDatabase(String databaseName);
/**
* Get the name of the current catalog
* @return Current catalog name
*/
public String getCurrentCatalog();
/**
* Get the name of the current database
* @return Current database name
*/
public String getCurrentDatabase();Usage Examples:
// Register custom catalog
Catalog hiveCatalog = new HiveCatalog("my_hive", "default", hiveConf);
tableEnv.registerCatalog("hive_catalog", hiveCatalog);
// Register in-memory catalog
Catalog memoryCatalog = new GenericInMemoryCatalog("memory_catalog");
tableEnv.registerCatalog("memory", memoryCatalog);
// Switch catalog context
tableEnv.useCatalog("hive_catalog");
tableEnv.useDatabase("production_db");
// Now table references resolve to hive_catalog.production_db
Table prodTable = tableEnv.from("orders"); // resolves to hive_catalog.production_db.orders
// Fully qualified table access
Table specificTable = tableEnv.from("memory.default.temp_table");List available catalogs, databases, and tables for discovery and exploration.
/**
* List all registered catalog names
* @return Array of catalog names
*/
public String[] listCatalogs();
/**
* List all databases in the current catalog
* @return Array of database names
*/
public String[] listDatabases();
/**
* List all databases in the specified catalog
* @param catalogName Name of catalog to list databases from
* @return Array of database names
*/
public String[] listDatabases(String catalogName);
/**
* List all tables in the current database
* @return Array of table names
*/
public String[] listTables();
/**
* List all tables in the specified database
* @param databaseName Database name to list tables from
* @return Array of table names
*/
public String[] listTables(String databaseName);
/**
* List all functions in the current catalog and database
* @return Array of function names
*/
public String[] listFunctions();Usage Examples:
// Discover available catalogs
String[] catalogs = tableEnv.listCatalogs();
System.out.println("Available catalogs: " + Arrays.toString(catalogs));
// List databases in current catalog
String[] databases = tableEnv.listDatabases();
for (String db : databases) {
System.out.println("Database: " + db);
// List tables in each database
String[] tables = tableEnv.listTables(db);
for (String table : tables) {
System.out.println(" Table: " + table);
}
}
// List functions
String[] functions = tableEnv.listFunctions();
System.out.println("Available functions: " + Arrays.toString(functions));Flink provides several catalog implementations for different storage backends.
/**
* In-memory catalog for testing and temporary metadata storage
*/
public class GenericInMemoryCatalog implements Catalog {
/**
* Creates an in-memory catalog with default database
* @param catalogName Name of the catalog
* @param defaultDatabase Name of the default database
*/
public GenericInMemoryCatalog(String catalogName, String defaultDatabase);
/**
* Creates an in-memory catalog with "default" as default database
* @param catalogName Name of the catalog
*/
public GenericInMemoryCatalog(String catalogName);
}
/**
* Factory for creating in-memory catalogs
*/
public class GenericInMemoryCatalogFactory implements CatalogFactory {
public static final String IDENTIFIER = "generic_in_memory";
}Usage Examples:
// Create and register in-memory catalog
GenericInMemoryCatalog memoryCatalog = new GenericInMemoryCatalog(
"test_catalog",
"test_db"
);
tableEnv.registerCatalog("test", memoryCatalog);
// Use catalog factory for configuration-based creation
Map<String, String> properties = new HashMap<>();
properties.put("type", "generic_in_memory");
properties.put("default-database", "my_database");
// Register through SQL DDL
tableEnv.executeSql(
"CREATE CATALOG test_catalog WITH (" +
" 'type' = 'generic_in_memory'," +
" 'default-database' = 'my_database'" +
")"
);Core catalog interface for implementing custom catalog backends.
public interface Catalog {
/**
* Open the catalog and establish connections
* @throws CatalogException if opening fails
*/
void open() throws CatalogException;
/**
* Close the catalog and clean up resources
* @throws CatalogException if closing fails
*/
void close() throws CatalogException;
/**
* Get the default database name
* @return Default database name
*/
String getDefaultDatabase();
/**
* List all database names
* @return List of database names
* @throws CatalogException if listing fails
*/
List<String> listDatabases() throws CatalogException;
/**
* Get database metadata
* @param databaseName Name of database to retrieve
* @return CatalogDatabase with metadata
* @throws CatalogException if database not found or error occurs
*/
CatalogDatabase getDatabase(String databaseName) throws CatalogException;
/**
* Check if database exists
* @param databaseName Name of database to check
* @return true if database exists
* @throws CatalogException if check fails
*/
boolean databaseExists(String databaseName) throws CatalogException;
/**
* Create a new database
* @param databaseName Name of database to create
* @param database Database metadata
* @param ignoreIfExists If true, don't throw error if database already exists
* @throws CatalogException if creation fails
*/
void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists)
throws CatalogException;
}Operations for managing table metadata within catalogs.
/**
* List all tables in the specified database
* @param databaseName Database name
* @return List of table names
* @throws CatalogException if listing fails
*/
List<String> listTables(String databaseName) throws CatalogException;
/**
* Get table metadata
* @param tablePath Object path identifying the table
* @return CatalogTable with complete metadata
* @throws CatalogException if table not found or error occurs
*/
CatalogTable getTable(ObjectPath tablePath) throws CatalogException;
/**
* Check if table exists
* @param tablePath Object path identifying the table
* @return true if table exists
* @throws CatalogException if check fails
*/
boolean tableExists(ObjectPath tablePath) throws CatalogException;
/**
* Create a new table
* @param tablePath Object path for the new table
* @param table Table metadata
* @param ignoreIfExists If true, don't throw error if table already exists
* @throws CatalogException if creation fails
*/
void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists)
throws CatalogException;
/**
* Drop an existing table
* @param tablePath Object path identifying the table to drop
* @param ignoreIfNotExists If true, don't throw error if table doesn't exist
* @throws CatalogException if drop fails
*/
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws CatalogException;Usage Examples:
// Create table through catalog interface
ObjectPath tablePath = new ObjectPath("my_database", "my_table");
// Define table schema
Schema schema = Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("created_at", DataTypes.TIMESTAMP(3))
.primaryKey("id")
.build();
// Create catalog table
Map<String, String> properties = new HashMap<>();
properties.put("connector", "filesystem");
properties.put("path", "/path/to/data");
properties.put("format", "parquet");
CatalogTable catalogTable = CatalogTable.of(
schema,
"Customer data table",
Collections.emptyList(),
properties
);
// Create table in catalog
catalog.createTable(tablePath, catalogTable, false);
// List tables
List<String> tables = catalog.listTables("my_database");
System.out.println("Tables: " + tables);
// Check if table exists
boolean exists = catalog.tableExists(tablePath);
System.out.println("Table exists: " + exists);Manage user-defined functions within the catalog system.
/**
* List all functions in the specified database
* @param databaseName Database name
* @return List of function names
* @throws CatalogException if listing fails
*/
List<String> listFunctions(String databaseName) throws CatalogException;
/**
* Get function metadata
* @param functionPath Object path identifying the function
* @return CatalogFunction with metadata
* @throws CatalogException if function not found or error occurs
*/
CatalogFunction getFunction(ObjectPath functionPath) throws CatalogException;
/**
* Check if function exists
* @param functionPath Object path identifying the function
* @return true if function exists
* @throws CatalogException if check fails
*/
boolean functionExists(ObjectPath functionPath) throws CatalogException;
/**
* Create a new function
* @param functionPath Object path for the new function
* @param function Function metadata
* @param ignoreIfExists If true, don't throw error if function already exists
* @throws CatalogException if creation fails
*/
void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
throws CatalogException;Objects resolved within a specific catalog context with full metadata.
public interface ContextResolvedTable {
/**
* Get the identifier for this table
* @return Table identifier
*/
Identifier getIdentifier();
/**
* Get the resolved table
* @return CatalogTable with full metadata
*/
CatalogTable getTable();
/**
* Get the resolved schema
* @return ResolvedSchema for the table
*/
ResolvedSchema getResolvedSchema();
/**
* Check if this is a temporary table
* @return true if temporary
*/
boolean isTemporary();
}
public interface ContextResolvedFunction {
/**
* Get the function identifier
* @return Function identifier
*/
Identifier getIdentifier();
/**
* Get the catalog function metadata
* @return CatalogFunction with metadata
*/
CatalogFunction getCatalogFunction();
/**
* Get the function definition
* @return FunctionDefinition for execution
*/
FunctionDefinition getFunctionDefinition();
}Metadata structures for databases and tables.
public interface CatalogDatabase {
/**
* Get database properties
* @return Map of property key-value pairs
*/
Map<String, String> getProperties();
/**
* Get database comment/description
* @return Database description
*/
String getComment();
}
public interface CatalogTable extends CatalogBaseTable {
/**
* Check if this table is partitioned
* @return true if partitioned
*/
boolean isPartitioned();
/**
* Get partition keys for partitioned tables
* @return List of partition key column names
*/
List<String> getPartitionKeys();
/**
* Create a copy of this table with new properties
* @param properties New properties map
* @return New CatalogTable with updated properties
*/
CatalogTable copy(Map<String, String> properties);
}
public class CatalogTableImpl implements CatalogTable {
/**
* Creates a catalog table implementation
* @param schema Table schema
* @param partitionKeys Partition key columns
* @param properties Table properties
* @param comment Table description
*/
public CatalogTableImpl(
Schema schema,
List<String> partitionKeys,
Map<String, String> properties,
String comment
);
}Usage Examples:
// Create database metadata
Map<String, String> dbProps = new HashMap<>();
dbProps.put("location", "/warehouse/analytics");
dbProps.put("owner", "analytics_team");
CatalogDatabase database = new CatalogDatabaseImpl(
dbProps,
"Analytics database for business intelligence"
);
// Create table metadata with partitioning
Schema tableSchema = Schema.newBuilder()
.column("transaction_id", DataTypes.BIGINT())
.column("customer_id", DataTypes.BIGINT())
.column("amount", DataTypes.DECIMAL(10, 2))
.column("transaction_date", DataTypes.DATE())
.column("region", DataTypes.STRING())
.build();
List<String> partitionKeys = Arrays.asList("transaction_date", "region");
Map<String, String> tableProps = new HashMap<>();
tableProps.put("connector", "filesystem");
tableProps.put("path", "/data/transactions");
tableProps.put("format", "parquet");
CatalogTable partitionedTable = new CatalogTableImpl(
tableSchema,
partitionKeys,
tableProps,
"Daily transaction data partitioned by date and region"
);
// Create in catalog
ObjectPath tablePath = new ObjectPath("analytics", "transactions");
catalog.createTable(tablePath, partitionedTable, false);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java