or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mdconfiguration.mddatastream-source.mdhive-functions.mdindex.mdtable-api.md
tile.json

catalog.mddocs/

Catalog Integration

Complete Hive metastore integration allowing Flink to use Hive as a persistent catalog for storing table definitions, schemas, and metadata across sessions.

Capabilities

HiveCatalog

Main catalog implementation providing full metastore operations including database, table, partition, and function management.

/**
 * Primary entry point for Hive metastore integration
 * Provides full catalog operations for databases, tables, partitions, and functions
 */
class HiveCatalog extends AbstractCatalog {
    /**
     * Create a new HiveCatalog instance with minimal configuration
     * @param catalogName - Catalog name for registration
     * @param defaultDatabase - Default database name (typically "default"), nullable
     * @param hiveConfDir - Directory containing hive-site.xml configuration, nullable
     */
    HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir);
    
    /**
     * Create a new HiveCatalog instance with Hive version specification
     * @param catalogName - Catalog name for registration
     * @param defaultDatabase - Default database name (typically "default"), nullable
     * @param hiveConfDir - Directory containing hive-site.xml configuration, nullable
     * @param hiveVersion - Hive version string (e.g., "2.3.6"), nullable for auto-detection
     */
    HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir,
               @Nullable String hiveVersion);
    
    /**
     * Create a new HiveCatalog instance with full configuration
     * @param catalogName - Catalog name for registration
     * @param defaultDatabase - Default database name (typically "default"), nullable
     * @param hiveConfDir - Directory containing hive-site.xml configuration, nullable
     * @param hadoopConfDir - Directory containing Hadoop configuration files, nullable
     * @param hiveVersion - Hive version string (e.g., "2.3.6"), nullable for auto-detection
     */
    HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir, 
               @Nullable String hadoopConfDir, @Nullable String hiveVersion);
    
    /**
     * Create a new HiveCatalog instance with custom Hive configuration
     * @param catalogName - Catalog name for registration
     * @param defaultDatabase - Default database name (typically "default"), nullable  
     * @param hiveConf - Pre-configured HiveConf instance, nullable
     * @param hiveVersion - Hive version string (e.g., "2.3.6"), nullable for auto-detection
     */
    HiveCatalog(String catalogName, @Nullable String defaultDatabase, 
               @Nullable HiveConf hiveConf, @Nullable String hiveVersion);
    
    // Configuration access
    HiveConf getHiveConf();
    String getHiveVersion();
    
    // Database operations
    void createDatabase(CatalogDatabase database, boolean ignoreIfExists) 
        throws DatabaseAlreadyExistException, CatalogException;
    void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
        throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException;
    List<String> listDatabases() throws CatalogException;
    CatalogDatabase getDatabase(String databaseName) 
        throws DatabaseNotExistException, CatalogException;
    void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
        throws DatabaseNotExistException, CatalogException;
    
    // Table operations
    void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
        throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;
    void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
        throws TableNotExistException, CatalogException;
    List<String> listTables(String databaseName)
        throws DatabaseNotExistException, CatalogException;
    CatalogBaseTable getTable(ObjectPath tablePath)
        throws TableNotExistException, CatalogException;
    void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
        throws TableNotExistException, CatalogException;
    
    // Partition operations
    void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
                        CatalogPartition partition, boolean ignoreIfExists)
        throws TableNotExistException, PartitionAlreadyExistsException, CatalogException;
    void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, 
                      boolean ignoreIfNotExists)
        throws PartitionNotExistException, CatalogException;
    List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
        throws TableNotExistException, TableNotPartitionedException, CatalogException;
    CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
        throws PartitionNotExistException, CatalogException;
    void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
                       CatalogPartition newPartition, boolean ignoreIfNotExists)
        throws PartitionNotExistException, CatalogException;
    
    // Function operations
    void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
        throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;
    void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
        throws FunctionNotExistException, CatalogException;
    List<String> listFunctions(String dbName)
        throws DatabaseNotExistException, CatalogException;
    CatalogFunction getFunction(ObjectPath functionPath)
        throws FunctionNotExistException, CatalogException;
    void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
        throws FunctionNotExistException, CatalogException;
    
    // Statistics operations
    void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, 
                             boolean ignoreIfNotExists)
        throws TableNotExistException, CatalogException;
    CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
        throws TableNotExistException, CatalogException;
    void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
                                 CatalogPartitionStatistics partitionStatistics, 
                                 boolean ignoreIfNotExists)
        throws PartitionNotExistException, CatalogException;
}

Usage Examples:

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

// Create and register Hive catalog
HiveCatalog hiveCatalog = new HiveCatalog(
    "myhive",           // catalog name
    "default",          // default database
    "/opt/hive/conf",   // hive-site.xml location
    "/opt/hadoop/etc/hadoop", // hadoop configuration
    "2.3.6"            // hive version
);

// Register with table environment
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.registerCatalog("myhive", hiveCatalog);
tableEnv.useCatalog("myhive");

// Now you can use Hive tables with SQL
tableEnv.executeSql("SHOW DATABASES").print();
tableEnv.executeSql("SHOW TABLES").print();
tableEnv.executeSql("DESCRIBE my_hive_table").print();
tableEnv.executeSql("SELECT * FROM my_hive_table WHERE partition_col = 'value'").print();

HiveCatalogFactory

Factory for creating HiveCatalog instances through Flink's factory system, used for SQL DDL catalog creation.

/**
 * Factory for creating HiveCatalog instances via Flink's factory system
 * Used for SQL DDL CREATE CATALOG statements
 */
class HiveCatalogFactory implements CatalogFactory {
    /**
     * Returns the unique identifier for this factory
     * @return "hive" - the factory identifier
     */
    String factoryIdentifier();
    
    /**
     * Creates a HiveCatalog instance from the provided context
     * @param context - Factory context containing configuration options
     * @return Configured HiveCatalog instance
     */
    Catalog createCatalog(Context context);
}

Usage Examples:

// Via SQL DDL
tableEnv.executeSql(
    "CREATE CATALOG myhive WITH (\n" +
    "  'type' = 'hive',\n" +
    "  'default-database' = 'default',\n" +
    "  'hive-conf-dir' = '/opt/hive/conf',\n" +
    "  'hadoop-conf-dir' = '/opt/hadoop/etc/hadoop',\n" +
    "  'hive-version' = '2.3.6'\n" +
    ")"
);

// Via configuration
Configuration config = new Configuration();
config.setString("catalogs.myhive.type", "hive");
config.setString("catalogs.myhive.default-database", "default");
config.setString("catalogs.myhive.hive-conf-dir", "/opt/hive/conf");
config.setString("catalogs.myhive.hadoop-conf-dir", "/opt/hadoop/etc/hadoop");
config.setString("catalogs.myhive.hive-version", "2.3.6");

HiveCatalogLock

Catalog-level locking mechanism for concurrent operations on Hive metastore.

/**
 * Provides catalog-level locking for concurrent operations
 * Ensures consistency when multiple processes access the same Hive metastore
 */
class HiveCatalogLock implements CatalogLock {
    /**
     * Acquire a lock for the given database and table
     * @param catalog - The catalog instance
     * @param context - Lock context with database and table information
     */
    void lock(Catalog catalog, LockContext context) throws CatalogException;
    
    /**
     * Release a previously acquired lock
     * @param catalog - The catalog instance  
     * @param context - Lock context with database and table information
     */
    void unlock(Catalog catalog, LockContext context) throws CatalogException;
}

Configuration Options

Key configuration options for Hive catalog integration:

// Catalog factory configuration keys
static final String CATALOG_TYPE = "type";                    // "hive"
static final String CATALOG_DEFAULT_DATABASE = "default-database";
static final String CATALOG_HIVE_CONF_DIR = "hive-conf-dir";
static final String CATALOG_HADOOP_CONF_DIR = "hadoop-conf-dir";
static final String CATALOG_HIVE_VERSION = "hive-version";

Exception Handling

// Common exceptions thrown by catalog operations
class DatabaseAlreadyExistException extends Exception;
class DatabaseNotExistException extends Exception;
class DatabaseNotEmptyException extends Exception;
class TableAlreadyExistException extends Exception;
class TableNotExistException extends Exception;
class TableNotPartitionedException extends Exception;
class PartitionAlreadyExistsException extends Exception;
class PartitionNotExistException extends Exception;
class FunctionAlreadyExistException extends Exception;
class FunctionNotExistException extends Exception;
class CatalogException extends Exception;

All catalog operations can throw CatalogException for general catalog-related errors. Specific operations throw more specific exceptions as documented in their method signatures.