Complete Hive metastore integration allowing Flink to use Hive as a persistent catalog for storing table definitions, schemas, and metadata across sessions.
Main catalog implementation providing full metastore operations including database, table, partition, and function management.
/**
* Primary entry point for Hive metastore integration
* Provides full catalog operations for databases, tables, partitions, and functions
*/
class HiveCatalog extends AbstractCatalog {
/**
* Create a new HiveCatalog instance with minimal configuration
* @param catalogName - Catalog name for registration
* @param defaultDatabase - Default database name (typically "default"), nullable
* @param hiveConfDir - Directory containing hive-site.xml configuration, nullable
*/
HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir);
/**
* Create a new HiveCatalog instance with Hive version specification
* @param catalogName - Catalog name for registration
* @param defaultDatabase - Default database name (typically "default"), nullable
* @param hiveConfDir - Directory containing hive-site.xml configuration, nullable
* @param hiveVersion - Hive version string (e.g., "2.3.6"), nullable for auto-detection
*/
HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir,
@Nullable String hiveVersion);
/**
* Create a new HiveCatalog instance with full configuration
* @param catalogName - Catalog name for registration
* @param defaultDatabase - Default database name (typically "default"), nullable
* @param hiveConfDir - Directory containing hive-site.xml configuration, nullable
* @param hadoopConfDir - Directory containing Hadoop configuration files, nullable
* @param hiveVersion - Hive version string (e.g., "2.3.6"), nullable for auto-detection
*/
HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir,
@Nullable String hadoopConfDir, @Nullable String hiveVersion);
/**
* Create a new HiveCatalog instance with custom Hive configuration
* @param catalogName - Catalog name for registration
* @param defaultDatabase - Default database name (typically "default"), nullable
* @param hiveConf - Pre-configured HiveConf instance, nullable
* @param hiveVersion - Hive version string (e.g., "2.3.6"), nullable for auto-detection
*/
HiveCatalog(String catalogName, @Nullable String defaultDatabase,
@Nullable HiveConf hiveConf, @Nullable String hiveVersion);
// Configuration access
HiveConf getHiveConf();
String getHiveVersion();
// Database operations
void createDatabase(CatalogDatabase database, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException;
void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException;
List<String> listDatabases() throws CatalogException;
CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException;
void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException;
// Table operations
void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException;
List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException;
CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException;
void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException;
// Partition operations
void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
CatalogPartition partition, boolean ignoreIfExists)
throws TableNotExistException, PartitionAlreadyExistsException, CatalogException;
void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException;
CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException;
void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
CatalogPartition newPartition, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
// Function operations
void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;
void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException;
List<String> listFunctions(String dbName)
throws DatabaseNotExistException, CatalogException;
CatalogFunction getFunction(ObjectPath functionPath)
throws FunctionNotExistException, CatalogException;
void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException;
// Statistics operations
void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics,
boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException;
CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
throws TableNotExistException, CatalogException;
void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
CatalogPartitionStatistics partitionStatistics,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
}Usage Examples:
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
// Create and register Hive catalog
HiveCatalog hiveCatalog = new HiveCatalog(
"myhive", // catalog name
"default", // default database
"/opt/hive/conf", // hive-site.xml location
"/opt/hadoop/etc/hadoop", // hadoop configuration
"2.3.6" // hive version
);
// Register with table environment
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.registerCatalog("myhive", hiveCatalog);
tableEnv.useCatalog("myhive");
// Now you can use Hive tables with SQL
tableEnv.executeSql("SHOW DATABASES").print();
tableEnv.executeSql("SHOW TABLES").print();
tableEnv.executeSql("DESCRIBE my_hive_table").print();
tableEnv.executeSql("SELECT * FROM my_hive_table WHERE partition_col = 'value'").print();Factory for creating HiveCatalog instances through Flink's factory system, used for SQL DDL catalog creation.
/**
* Factory for creating HiveCatalog instances via Flink's factory system
* Used for SQL DDL CREATE CATALOG statements
*/
class HiveCatalogFactory implements CatalogFactory {
/**
* Returns the unique identifier for this factory
* @return "hive" - the factory identifier
*/
String factoryIdentifier();
/**
* Creates a HiveCatalog instance from the provided context
* @param context - Factory context containing configuration options
* @return Configured HiveCatalog instance
*/
Catalog createCatalog(Context context);
}Usage Examples:
// Via SQL DDL
tableEnv.executeSql(
"CREATE CATALOG myhive WITH (\n" +
" 'type' = 'hive',\n" +
" 'default-database' = 'default',\n" +
" 'hive-conf-dir' = '/opt/hive/conf',\n" +
" 'hadoop-conf-dir' = '/opt/hadoop/etc/hadoop',\n" +
" 'hive-version' = '2.3.6'\n" +
")"
);
// Via configuration
Configuration config = new Configuration();
config.setString("catalogs.myhive.type", "hive");
config.setString("catalogs.myhive.default-database", "default");
config.setString("catalogs.myhive.hive-conf-dir", "/opt/hive/conf");
config.setString("catalogs.myhive.hadoop-conf-dir", "/opt/hadoop/etc/hadoop");
config.setString("catalogs.myhive.hive-version", "2.3.6");Catalog-level locking mechanism for concurrent operations on Hive metastore.
/**
* Provides catalog-level locking for concurrent operations
* Ensures consistency when multiple processes access the same Hive metastore
*/
class HiveCatalogLock implements CatalogLock {
/**
* Acquire a lock for the given database and table
* @param catalog - The catalog instance
* @param context - Lock context with database and table information
*/
void lock(Catalog catalog, LockContext context) throws CatalogException;
/**
* Release a previously acquired lock
* @param catalog - The catalog instance
* @param context - Lock context with database and table information
*/
void unlock(Catalog catalog, LockContext context) throws CatalogException;
}Key configuration options for Hive catalog integration:
// Catalog factory configuration keys
static final String CATALOG_TYPE = "type"; // "hive"
static final String CATALOG_DEFAULT_DATABASE = "default-database";
static final String CATALOG_HIVE_CONF_DIR = "hive-conf-dir";
static final String CATALOG_HADOOP_CONF_DIR = "hadoop-conf-dir";
static final String CATALOG_HIVE_VERSION = "hive-version";// Common exceptions thrown by catalog operations
class DatabaseAlreadyExistException extends Exception;
class DatabaseNotExistException extends Exception;
class DatabaseNotEmptyException extends Exception;
class TableAlreadyExistException extends Exception;
class TableNotExistException extends Exception;
class TableNotPartitionedException extends Exception;
class PartitionAlreadyExistsException extends Exception;
class PartitionNotExistException extends Exception;
class FunctionAlreadyExistException extends Exception;
class FunctionNotExistException extends Exception;
class CatalogException extends Exception;All catalog operations can throw CatalogException for general catalog-related errors. Specific operations throw more specific exceptions as documented in their method signatures.