CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table

Apache Flink's Table API and SQL module for unified stream and batch processing

Pending
Overview
Eval results
Files

catalog-system.mddocs/

Catalog System

Flink's catalog system provides pluggable metadata management for tables, functions, databases, and user-defined catalogs. It supports persistent storage, schema evolution, and integration with external metadata systems like Hive Metastore.

Capabilities

Catalog Management

Manage multiple catalogs and switch between different metadata repositories.

/**
 * Registers a catalog under a unique name
 * @param catalogName Name for the catalog
 * @param catalog Catalog implementation to register
 */
void registerCatalog(String catalogName, Catalog catalog);

/**
 * Gets a registered catalog by name
 * @param catalogName Name of the catalog to retrieve
 * @return Optional containing the catalog if found
 */
Optional<Catalog> getCatalog(String catalogName);

/**
 * Sets the current catalog for table operations
 * @param catalogName Name of the catalog to use as current
 */
void useCatalog(String catalogName);

/**
 * Gets the name of the current catalog
 * @return Current catalog name
 */
String getCurrentCatalog();

/**
 * Lists all registered catalog names
 * @return Array of catalog names
 */
String[] listCatalogs();

Usage Examples:

// Register different catalog types
HiveCatalog hiveCatalog = new HiveCatalog(
    "my_hive", 
    "default", 
    "path/to/hive-conf", 
    "2.3.4"
);
tableEnv.registerCatalog("hive_catalog", hiveCatalog);

JdbcCatalog jdbcCatalog = new JdbcCatalog(
    "my_jdbc_catalog",
    "default",
    "postgres",
    "jdbc:postgresql://localhost:5432/metadata",
    "username",
    "password"
);
tableEnv.registerCatalog("postgres_catalog", jdbcCatalog);

// Switch between catalogs
tableEnv.useCatalog("hive_catalog");
String[] hiveTables = tableEnv.listTables();

tableEnv.useCatalog("postgres_catalog");
String[] postgresTables = tableEnv.listTables();

// Get current context
String currentCatalog = tableEnv.getCurrentCatalog();

Database Operations

Manage databases within catalogs with full CRUD operations.

interface Catalog {
    /**
     * Creates a database in the catalog
     * @param name Database name
     * @param database Database metadata
     * @param ignoreIfExists Skip creation if database already exists
     */
    void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
        throws DatabaseAlreadyExistException, CatalogException;
    
    /**
     * Drops a database from the catalog
     * @param name Database name to drop
     * @param ignoreIfNotExists Skip error if database doesn't exist
     * @param cascade Drop all tables in the database
     */
    void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
        throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException;
    
    /**
     * Lists all databases in the catalog
     * @return List of database names
     */
    List<String> listDatabases() throws CatalogException;
    
    /**
     * Gets database metadata
     * @param databaseName Name of the database
     * @return CatalogDatabase containing metadata
     */
    CatalogDatabase getDatabase(String databaseName)
        throws DatabaseNotExistException, CatalogException;
    
    /**
     * Checks if a database exists
     * @param databaseName Name of the database to check
     * @return true if database exists
     */
    boolean databaseExists(String databaseName) throws CatalogException;
}

Usage Examples:

// Create database with properties
Map<String, String> dbProperties = new HashMap<>();
dbProperties.put("owner", "analytics_team");
dbProperties.put("created_date", "2024-01-01");

CatalogDatabase analyticsDb = new CatalogDatabaseImpl(
    dbProperties,
    "Database for analytics workflows"
);

Catalog catalog = tableEnv.getCatalog("hive_catalog").get();
catalog.createDatabase("analytics", analyticsDb, false);

// Use the new database
tableEnv.useDatabase("analytics");

// List databases
List<String> databases = catalog.listDatabases();
for (String db : databases) {
    System.out.println("Database: " + db);
}

Table Operations

Comprehensive table management with metadata, partitioning, and constraints.

interface Catalog {
    /**
     * Creates a table in the catalog
     * @param tablePath Path to the table (database.table)
     * @param table Table metadata and schema
     * @param ignoreIfExists Skip creation if table already exists
     */
    void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists)
        throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;
    
    /**
     * Drops a table from the catalog
     * @param tablePath Path to the table to drop
     * @param ignoreIfNotExists Skip error if table doesn't exist
     */
    void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
        throws TableNotExistException, CatalogException;
    
    /**
     * Lists all tables in a database
     * @param databaseName Database name
     * @return List of table names
     */
    List<String> listTables(String databaseName)
        throws DatabaseNotExistException, CatalogException;
    
    /**
     * Gets table metadata
     * @param tablePath Path to the table
     * @return CatalogTable containing metadata
     */
    CatalogTable getTable(ObjectPath tablePath)
        throws TableNotExistException, CatalogException;
    
    /**
     * Checks if a table exists
     * @param tablePath Path to the table to check
     * @return true if table exists
     */
    boolean tableExists(ObjectPath tablePath) throws CatalogException;
    
    /**
     * Renames a table
     * @param tablePath Current table path
     * @param newTableName New table name
     */
    void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
        throws TableNotExistException, TableAlreadyExistException, CatalogException;
    
    /**
     * Alters table metadata
     * @param tablePath Path to the table
     * @param newTable New table metadata
     * @param ignoreIfNotExists Skip error if table doesn't exist
     */
    void alterTable(ObjectPath tablePath, CatalogTable newTable, boolean ignoreIfNotExists)
        throws TableNotExistException, CatalogException;
}

Usage Examples:

// Create table with comprehensive metadata
Schema schema = Schema.newBuilder()
    .column("order_id", DataTypes.BIGINT())
    .column("customer_id", DataTypes.BIGINT())
    .column("product_id", DataTypes.BIGINT())
    .column("quantity", DataTypes.INT())
    .column("unit_price", DataTypes.DECIMAL(10, 2))
    .column("order_date", DataTypes.DATE())
    .column("region", DataTypes.STRING())
    .primaryKey("order_id")
    .build();

Map<String, String> properties = new HashMap<>();
properties.put("connector", "kafka");
properties.put("topic", "orders");
properties.put("properties.bootstrap.servers", "localhost:9092");
properties.put("format", "json");

CatalogTable ordersTable = CatalogTable.of(
    schema,
    "Orders table for e-commerce analytics",
    Arrays.asList("region", "order_date"), // Partition keys
    properties
);

ObjectPath tablePath = new ObjectPath("analytics", "orders");
catalog.createTable(tablePath, ordersTable, false);

// List and inspect tables
List<String> tables = catalog.listTables("analytics");
for (String tableName : tables) {
    ObjectPath path = new ObjectPath("analytics", tableName);
    CatalogTable table = catalog.getTable(path);
    System.out.println("Table: " + tableName);
    System.out.println("Schema: " + table.getUnresolvedSchema());
    System.out.println("Properties: " + table.getOptions());
}

Function Management

Manage user-defined functions in the catalog with versioning and metadata.

interface Catalog {
    /**
     * Creates a function in the catalog
     * @param functionPath Path to the function (database.function)
     * @param function Function metadata
     * @param ignoreIfExists Skip creation if function already exists
     */
    void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
        throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;
    
    /**
     * Drops a function from the catalog
     * @param functionPath Path to the function to drop
     * @param ignoreIfNotExists Skip error if function doesn't exist
     */
    void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
        throws FunctionNotExistException, CatalogException;
    
    /**
     * Lists all functions in a database
     * @param databaseName Database name
     * @return List of function names
     */
    List<String> listFunctions(String databaseName)
        throws DatabaseNotExistException, CatalogException;
    
    /**
     * Gets function metadata
     * @param functionPath Path to the function
     * @return CatalogFunction containing metadata
     */
    CatalogFunction getFunction(ObjectPath functionPath)
        throws FunctionNotExistException, CatalogException;
    
    /**
     * Checks if a function exists
     * @param functionPath Path to the function to check
     * @return true if function exists
     */
    boolean functionExists(ObjectPath functionPath) throws CatalogException;
}

Usage Examples:

// Register UDF in catalog
CatalogFunction myFunction = new CatalogFunctionImpl(
    "com.company.functions.MyCustomFunction",
    FunctionLanguage.JAVA,
    Arrays.asList("dependency1.jar", "dependency2.jar"),
    "Custom function for business logic"
);

ObjectPath functionPath = new ObjectPath("analytics", "my_custom_function");
catalog.createFunction(functionPath, myFunction, false);

// Use function in SQL
tableEnv.useCatalog("hive_catalog");
tableEnv.useDatabase("analytics");
Table result = tableEnv.sqlQuery(
    "SELECT customer_id, my_custom_function(customer_data) as processed_data " +
    "FROM customers"
);

Partition Management

Handle partitioned tables with dynamic partition discovery and pruning.

interface Catalog {
    /**
     * Lists all partitions of a partitioned table
     * @param tablePath Path to the partitioned table
     * @return List of partition specifications
     */
    List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
        throws TableNotExistException, TableNotPartitionedException, CatalogException;
    
    /**
     * Lists partitions matching a partial specification
     * @param tablePath Path to the partitioned table
     * @param partitionSpec Partial partition specification for filtering
     * @return List of matching partition specifications
     */
    List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
        throws TableNotExistException, TableNotPartitionedException, CatalogException;
    
    /**
     * Gets partition metadata
     * @param tablePath Path to the partitioned table
     * @param partitionSpec Partition specification
     * @return CatalogPartition containing metadata
     */
    CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
        throws PartitionNotExistException, CatalogException;
    
    /**
     * Checks if a partition exists
     * @param tablePath Path to the partitioned table
     * @param partitionSpec Partition specification to check
     * @return true if partition exists
     */
    boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
        throws CatalogException;
}

Usage Examples:

// Work with partitioned tables
ObjectPath partitionedTable = new ObjectPath("analytics", "daily_sales");

// List all partitions
List<CatalogPartitionSpec> allPartitions = catalog.listPartitions(partitionedTable);
for (CatalogPartitionSpec spec : allPartitions) {
    System.out.println("Partition: " + spec.getPartitionSpec());
}

// List partitions for specific year
CatalogPartitionSpec yearFilter = new CatalogPartitionSpec(
    Collections.singletonMap("year", "2024")
);
List<CatalogPartitionSpec> yearPartitions = catalog.listPartitions(partitionedTable, yearFilter);

// Check specific partition
CatalogPartitionSpec specificPartition = new CatalogPartitionSpec(
    Map.of("year", "2024", "month", "01", "day", "15")
);
boolean exists = catalog.partitionExists(partitionedTable, specificPartition);

Object Path Resolution

Navigate catalog hierarchies with full path resolution and validation.

class ObjectPath {
    /**
     * Creates an object path for database.object
     * @param databaseName Database name
     * @param objectName Object name (table, function, etc.)
     */
    ObjectPath(String databaseName, String objectName);
    
    /**
     * Gets the database name
     * @return Database name
     */
    String getDatabaseName();
    
    /**
     * Gets the object name
     * @return Object name
     */
    String getObjectName();
    
    /**
     * Gets the full path as a string
     * @return String representation of the path
     */
    String getFullName();
}

class ObjectIdentifier {
    /**
     * Creates a full object identifier
     * @param catalogName Catalog name
     * @param databaseName Database name
     * @param objectName Object name
     */
    static ObjectIdentifier of(String catalogName, String databaseName, String objectName);
    
    /**
     * Gets the catalog name
     * @return Catalog name
     */
    String getCatalogName();
    
    /**
     * Gets the database name
     * @return Database name
     */
    String getDatabaseName();
    
    /**
     * Gets the object name
     * @return Object name
     */
    String getObjectName();
}

Usage Examples:

// Object path resolution
ObjectPath tablePath = new ObjectPath("sales_db", "orders");
ObjectIdentifier fullIdentifier = ObjectIdentifier.of("hive_catalog", "sales_db", "orders");

// Use in catalog operations
CatalogTable table = catalog.getTable(tablePath);
String fullPath = fullIdentifier.getCatalogName() + "." + 
                  fullIdentifier.getDatabaseName() + "." + 
                  fullIdentifier.getObjectName();

Types

Catalog Interfaces

interface Catalog {
    // Database operations
    void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);
    void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);
    List<String> listDatabases();
    CatalogDatabase getDatabase(String databaseName);
    boolean databaseExists(String databaseName);
    
    // Table operations
    void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists);
    void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
    List<String> listTables(String databaseName);
    CatalogTable getTable(ObjectPath tablePath);
    boolean tableExists(ObjectPath tablePath);
    
    // Function operations
    void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists);
    void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists);
    List<String> listFunctions(String databaseName);
    CatalogFunction getFunction(ObjectPath functionPath);
    boolean functionExists(ObjectPath functionPath);
}

Catalog Metadata Types

interface CatalogDatabase {
    Map<String, String> getProperties();
    String getComment();
    CatalogDatabase copy();
    CatalogDatabase copy(Map<String, String> properties);
}

interface CatalogTable extends CatalogBaseTable {
    boolean isPartitioned();
    List<String> getPartitionKeys();
    CatalogTable copy();
    CatalogTable copy(Map<String, String> options);
}

interface CatalogFunction {
    String getClassName();
    FunctionLanguage getLanguage();
    List<String> getFunctionResources();
    String getDescription();
    CatalogFunction copy();
}

enum FunctionLanguage {
    JVM,
    PYTHON,
    SCALA
}

Exception Types

class CatalogException extends Exception { }
class DatabaseAlreadyExistException extends CatalogException { }
class DatabaseNotExistException extends CatalogException { }
class DatabaseNotEmptyException extends CatalogException { }
class TableAlreadyExistException extends CatalogException { }
class TableNotExistException extends CatalogException { }
class FunctionAlreadyExistException extends CatalogException { }
class FunctionNotExistException extends CatalogException { }
class PartitionNotExistException extends CatalogException { }
class TableNotPartitionedException extends CatalogException { }

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table

docs

catalog-system.md

datastream-integration.md

index.md

sql-execution.md

table-environment.md

table-operations.md

type-system.md

user-defined-functions.md

tile.json