CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java

Java API for Apache Flink's Table ecosystem, enabling type-safe table operations and SQL query execution.

Pending
Overview
Eval results
Files

catalog-management.mddocs/

Catalog and Metadata Management

The catalog system manages metadata for tables, functions, and data sources in Flink. It provides a centralized registry for database objects and supports multiple catalog backends with persistent storage capabilities.

Capabilities

Catalog Registration and Management

Register and manage multiple catalogs within a table environment.

/**
 * Register a catalog instance with the specified name
 * @param catalogName Name to register catalog under
 * @param catalog Catalog implementation instance
 */
public void registerCatalog(String catalogName, Catalog catalog);

/**
 * Get a registered catalog by name
 * @param catalogName Name of the catalog to retrieve
 * @return Optional containing the catalog if found
 */
public Optional<Catalog> getCatalog(String catalogName);

/**
 * Set the current catalog for table resolution
 * @param catalogName Name of catalog to set as current
 */
public void useCatalog(String catalogName);

/**
 * Set the current database within the current catalog
 * @param databaseName Name of database to set as current
 */
public void useDatabase(String databaseName);

/**
 * Get the name of the current catalog
 * @return Current catalog name
 */
public String getCurrentCatalog();

/**
 * Get the name of the current database
 * @return Current database name
 */
public String getCurrentDatabase();

Usage Examples:

// Register custom catalog
Catalog hiveCatalog = new HiveCatalog("my_hive", "default", hiveConf);
tableEnv.registerCatalog("hive_catalog", hiveCatalog);

// Register in-memory catalog
Catalog memoryCatalog = new GenericInMemoryCatalog("memory_catalog");
tableEnv.registerCatalog("memory", memoryCatalog);

// Switch catalog context
tableEnv.useCatalog("hive_catalog");
tableEnv.useDatabase("production_db");

// Now table references resolve to hive_catalog.production_db
Table prodTable = tableEnv.from("orders"); // resolves to hive_catalog.production_db.orders

// Fully qualified table access
Table specificTable = tableEnv.from("memory.default.temp_table");

Catalog Listing Operations

List available catalogs, databases, and tables for discovery and exploration.

/**
 * List all registered catalog names
 * @return Array of catalog names
 */
public String[] listCatalogs();

/**
 * List all databases in the current catalog
 * @return Array of database names
 */
public String[] listDatabases();

/**
 * List all databases in the specified catalog
 * @param catalogName Name of catalog to list databases from
 * @return Array of database names
 */
public String[] listDatabases(String catalogName);

/**
 * List all tables in the current database
 * @return Array of table names
 */
public String[] listTables();

/**
 * List all tables in the specified database
 * @param databaseName Database name to list tables from
 * @return Array of table names
 */
public String[] listTables(String databaseName);

/**
 * List all functions in the current catalog and database
 * @return Array of function names
 */
public String[] listFunctions();

Usage Examples:

// Discover available catalogs
String[] catalogs = tableEnv.listCatalogs();
System.out.println("Available catalogs: " + Arrays.toString(catalogs));

// List databases in current catalog
String[] databases = tableEnv.listDatabases();
for (String db : databases) {
    System.out.println("Database: " + db);
    
    // List tables in each database
    String[] tables = tableEnv.listTables(db);
    for (String table : tables) {
        System.out.println("  Table: " + table);
    }
}

// List functions
String[] functions = tableEnv.listFunctions();
System.out.println("Available functions: " + Arrays.toString(functions));

Built-in Catalog Implementations

Flink provides several catalog implementations for different storage backends.

/**
 * In-memory catalog for testing and temporary metadata storage
 */
public class GenericInMemoryCatalog implements Catalog {
    /**
     * Creates an in-memory catalog with default database
     * @param catalogName Name of the catalog
     * @param defaultDatabase Name of the default database
     */
    public GenericInMemoryCatalog(String catalogName, String defaultDatabase);
    
    /**
     * Creates an in-memory catalog with "default" as default database
     * @param catalogName Name of the catalog
     */
    public GenericInMemoryCatalog(String catalogName);
}

/**
 * Factory for creating in-memory catalogs
 */
public class GenericInMemoryCatalogFactory implements CatalogFactory {
    public static final String IDENTIFIER = "generic_in_memory";
}

Usage Examples:

// Create and register in-memory catalog
GenericInMemoryCatalog memoryCatalog = new GenericInMemoryCatalog(
    "test_catalog", 
    "test_db"
);
tableEnv.registerCatalog("test", memoryCatalog);

// Use catalog factory for configuration-based creation
Map<String, String> properties = new HashMap<>();
properties.put("type", "generic_in_memory");
properties.put("default-database", "my_database");

// Register through SQL DDL
tableEnv.executeSql(
    "CREATE CATALOG test_catalog WITH (" +
    "  'type' = 'generic_in_memory'," +
    "  'default-database' = 'my_database'" +
    ")"
);

Catalog Interface Operations

Core catalog interface for implementing custom catalog backends.

public interface Catalog {
    /**
     * Open the catalog and establish connections
     * @throws CatalogException if opening fails
     */
    void open() throws CatalogException;
    
    /**
     * Close the catalog and clean up resources
     * @throws CatalogException if closing fails
     */
    void close() throws CatalogException;
    
    /**
     * Get the default database name
     * @return Default database name
     */
    String getDefaultDatabase();
    
    /**
     * List all database names
     * @return List of database names
     * @throws CatalogException if listing fails
     */
    List<String> listDatabases() throws CatalogException;
    
    /**
     * Get database metadata
     * @param databaseName Name of database to retrieve
     * @return CatalogDatabase with metadata
     * @throws CatalogException if database not found or error occurs
     */
    CatalogDatabase getDatabase(String databaseName) throws CatalogException;
    
    /**
     * Check if database exists
     * @param databaseName Name of database to check
     * @return true if database exists
     * @throws CatalogException if check fails
     */
    boolean databaseExists(String databaseName) throws CatalogException;
    
    /**
     * Create a new database
     * @param databaseName Name of database to create
     * @param database Database metadata
     * @param ignoreIfExists If true, don't throw error if database already exists
     * @throws CatalogException if creation fails
     */
    void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists) 
        throws CatalogException;
}

Table Management Operations

Operations for managing table metadata within catalogs.

/**
 * List all tables in the specified database
 * @param databaseName Database name
 * @return List of table names
 * @throws CatalogException if listing fails
 */
List<String> listTables(String databaseName) throws CatalogException;

/**
 * Get table metadata
 * @param tablePath Object path identifying the table
 * @return CatalogTable with complete metadata
 * @throws CatalogException if table not found or error occurs
 */
CatalogTable getTable(ObjectPath tablePath) throws CatalogException;

/**
 * Check if table exists
 * @param tablePath Object path identifying the table
 * @return true if table exists
 * @throws CatalogException if check fails
 */
boolean tableExists(ObjectPath tablePath) throws CatalogException;

/**
 * Create a new table
 * @param tablePath Object path for the new table
 * @param table Table metadata
 * @param ignoreIfExists If true, don't throw error if table already exists
 * @throws CatalogException if creation fails
 */
void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists) 
    throws CatalogException;

/**
 * Drop an existing table
 * @param tablePath Object path identifying the table to drop
 * @param ignoreIfNotExists If true, don't throw error if table doesn't exist
 * @throws CatalogException if drop fails
 */
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws CatalogException;

Usage Examples:

// Create table through catalog interface
ObjectPath tablePath = new ObjectPath("my_database", "my_table");

// Define table schema
Schema schema = Schema.newBuilder()
    .column("id", DataTypes.BIGINT())
    .column("name", DataTypes.STRING())
    .column("created_at", DataTypes.TIMESTAMP(3))
    .primaryKey("id")
    .build();

// Create catalog table
Map<String, String> properties = new HashMap<>();
properties.put("connector", "filesystem");
properties.put("path", "/path/to/data");
properties.put("format", "parquet");

CatalogTable catalogTable = CatalogTable.of(
    schema,
    "Customer data table",
    Collections.emptyList(),
    properties
);

// Create table in catalog
catalog.createTable(tablePath, catalogTable, false);

// List tables
List<String> tables = catalog.listTables("my_database");
System.out.println("Tables: " + tables);

// Check if table exists
boolean exists = catalog.tableExists(tablePath);
System.out.println("Table exists: " + exists);

Function Management

Manage user-defined functions within the catalog system.

/**
 * List all functions in the specified database
 * @param databaseName Database name
 * @return List of function names
 * @throws CatalogException if listing fails
 */
List<String> listFunctions(String databaseName) throws CatalogException;

/**
 * Get function metadata
 * @param functionPath Object path identifying the function
 * @return CatalogFunction with metadata
 * @throws CatalogException if function not found or error occurs
 */
CatalogFunction getFunction(ObjectPath functionPath) throws CatalogException;

/**
 * Check if function exists
 * @param functionPath Object path identifying the function
 * @return true if function exists
 * @throws CatalogException if check fails
 */
boolean functionExists(ObjectPath functionPath) throws CatalogException;

/**
 * Create a new function
 * @param functionPath Object path for the new function
 * @param function Function metadata
 * @param ignoreIfExists If true, don't throw error if function already exists
 * @throws CatalogException if creation fails
 */
void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) 
    throws CatalogException;

Context-Resolved Objects

Objects resolved within a specific catalog context with full metadata.

public interface ContextResolvedTable {
    /**
     * Get the identifier for this table
     * @return Table identifier
     */
    Identifier getIdentifier();
    
    /**
     * Get the resolved table
     * @return CatalogTable with full metadata
     */
    CatalogTable getTable();
    
    /**
     * Get the resolved schema
     * @return ResolvedSchema for the table
     */
    ResolvedSchema getResolvedSchema();
    
    /**
     * Check if this is a temporary table
     * @return true if temporary
     */
    boolean isTemporary();
}

public interface ContextResolvedFunction {
    /**
     * Get the function identifier
     * @return Function identifier
     */
    Identifier getIdentifier();
    
    /**
     * Get the catalog function metadata
     * @return CatalogFunction with metadata
     */
    CatalogFunction getCatalogFunction();
    
    /**
     * Get the function definition
     * @return FunctionDefinition for execution
     */
    FunctionDefinition getFunctionDefinition();
}

Database and Table Metadata Types

Metadata structures for databases and tables.

public interface CatalogDatabase {
    /**
     * Get database properties
     * @return Map of property key-value pairs
     */
    Map<String, String> getProperties();
    
    /**
     * Get database comment/description
     * @return Database description
     */
    String getComment();
}

public interface CatalogTable extends CatalogBaseTable {
    /**
     * Check if this table is partitioned
     * @return true if partitioned
     */
    boolean isPartitioned();
    
    /**
     * Get partition keys for partitioned tables
     * @return List of partition key column names
     */
    List<String> getPartitionKeys();
    
    /**
     * Create a copy of this table with new properties
     * @param properties New properties map
     * @return New CatalogTable with updated properties
     */
    CatalogTable copy(Map<String, String> properties);
}

public class CatalogTableImpl implements CatalogTable {
    /**
     * Creates a catalog table implementation
     * @param schema Table schema
     * @param partitionKeys Partition key columns
     * @param properties Table properties
     * @param comment Table description
     */
    public CatalogTableImpl(
        Schema schema,
        List<String> partitionKeys,
        Map<String, String> properties,
        String comment
    );
}

Usage Examples:

// Create database metadata
Map<String, String> dbProps = new HashMap<>();
dbProps.put("location", "/warehouse/analytics");
dbProps.put("owner", "analytics_team");

CatalogDatabase database = new CatalogDatabaseImpl(
    dbProps,
    "Analytics database for business intelligence"
);

// Create table metadata with partitioning
Schema tableSchema = Schema.newBuilder()
    .column("transaction_id", DataTypes.BIGINT())
    .column("customer_id", DataTypes.BIGINT())
    .column("amount", DataTypes.DECIMAL(10, 2))
    .column("transaction_date", DataTypes.DATE())
    .column("region", DataTypes.STRING())
    .build();

List<String> partitionKeys = Arrays.asList("transaction_date", "region");

Map<String, String> tableProps = new HashMap<>();
tableProps.put("connector", "filesystem");
tableProps.put("path", "/data/transactions");
tableProps.put("format", "parquet");

CatalogTable partitionedTable = new CatalogTableImpl(
    tableSchema,
    partitionKeys,
    tableProps,
    "Daily transaction data partitioned by date and region"
);

// Create in catalog
ObjectPath tablePath = new ObjectPath("analytics", "transactions");
catalog.createTable(tablePath, partitionedTable, false);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java

docs

aggregation-grouping.md

catalog-management.md

expressions.md

index.md

sql-integration.md

table-environment.md

table-operations.md

user-defined-functions.md

window-operations.md

tile.json