Complete Hive metastore integration providing schema discovery, table management, and metadata operations. The catalog integration enables transparent access to existing Hive data warehouses with full support for databases, tables, partitions, and metadata management.
Main catalog implementation that integrates with Hive metastore for comprehensive metadata operations.
/**
* Hive catalog implementation providing full metastore integration
* Extends AbstractCatalog to provide Flink-compatible catalog operations
*/
public class HiveCatalog extends AbstractCatalog {
/**
* Create HiveCatalog instance
* @param catalogName Name for this catalog instance
* @param defaultDatabase Default database to use
* @param hiveConfDir Directory containing hive-site.xml (optional)
* @param hadoopConfDir Directory containing Hadoop configuration (optional)
* @param hiveVersion Hive version for compatibility (e.g., "2.3.9")
*/
public HiveCatalog(String catalogName, String defaultDatabase,
String hiveConfDir, String hadoopConfDir, String hiveVersion);
// Database operations
/**
* List all databases in the Hive metastore
* @return List of database names
* @throws DatabaseNotExistException If catalog is not accessible
* @throws CatalogException If operation fails
*/
public List<String> listDatabases() throws DatabaseNotExistException, CatalogException;
/**
* Check if database exists
* @param databaseName Database name to check
* @return true if database exists
* @throws CatalogException If operation fails
*/
public boolean databaseExists(String databaseName) throws CatalogException;
/**
* Create new database
* @param databaseName Name of database to create
* @param database Database metadata
* @param ignoreIfExists Whether to ignore if database already exists
* @throws DatabaseAlreadyExistException If database exists and ignoreIfExists is false
* @throws CatalogException If operation fails
*/
public void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException;
/**
* Get database metadata
* @param databaseName Database name
* @return Database metadata
* @throws DatabaseNotExistException If database doesn't exist
* @throws CatalogException If operation fails
*/
public CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException;
// Table operations
/**
* List all tables in a database
* @param databaseName Database name
* @return List of table names
* @throws DatabaseNotExistException If database doesn't exist
* @throws CatalogException If operation fails
*/
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException;
/**
* List tables matching a pattern
* @param databaseName Database name
* @param tableNamePattern SQL-like pattern for table names
* @return List of matching table names
* @throws DatabaseNotExistException If database doesn't exist
* @throws CatalogException If operation fails
*/
public List<String> listTables(String databaseName, String tableNamePattern)
throws DatabaseNotExistException, CatalogException;
/**
* Check if table exists
* @param tablePath Table path (database.table)
* @return true if table exists
* @throws CatalogException If operation fails
*/
public boolean tableExists(ObjectPath tablePath) throws CatalogException;
/**
* Get table metadata and schema
* @param tablePath Table path (database.table)
* @return Table metadata including schema
* @throws TableNotExistException If table doesn't exist
* @throws CatalogException If operation fails
*/
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException;
/**
* Create new table
* @param tablePath Table path (database.table)
* @param table Table metadata and schema
* @param ignoreIfExists Whether to ignore if table already exists
* @throws TableAlreadyExistException If table exists and ignoreIfExists is false
* @throws DatabaseNotExistException If database doesn't exist
* @throws CatalogException If operation fails
*/
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;
/**
* Drop table
* @param tablePath Table path (database.table)
* @param ignoreIfNotExists Whether to ignore if table doesn't exist
* @throws TableNotExistException If table doesn't exist and ignoreIfNotExists is false
* @throws CatalogException If operation fails
*/
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException;
/**
* Rename table
* @param tablePath Current table path
* @param newTableName New table name
* @throws TableNotExistException If table doesn't exist
* @throws TableAlreadyExistException If new name already exists
* @throws CatalogException If operation fails
*/
public void renameTable(ObjectPath tablePath, String newTableName)
throws TableNotExistException, TableAlreadyExistException, CatalogException;
// Partition operations
/**
* List all partitions for a partitioned table
* @param tablePath Table path (database.table)
* @return List of partition specifications
* @throws TableNotExistException If table doesn't exist
* @throws TableNotPartitionedException If table is not partitioned
* @throws CatalogException If operation fails
*/
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException;
/**
* List partitions matching partial specification
* @param tablePath Table path (database.table)
* @param partitionSpec Partial partition specification
* @return List of matching partition specifications
* @throws TableNotExistException If table doesn't exist
* @throws TableNotPartitionedException If table is not partitioned
* @throws PartitionSpecInvalidException If partition spec is invalid
* @throws CatalogException If operation fails
*/
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, CatalogException;
/**
* Get partition metadata
* @param tablePath Table path (database.table)
* @param partitionSpec Partition specification
* @return Partition metadata
* @throws PartitionNotExistException If partition doesn't exist
* @throws CatalogException If operation fails
*/
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException;
/**
* Check if partition exists
* @param tablePath Table path (database.table)
* @param partitionSpec Partition specification
* @return true if partition exists
* @throws CatalogException If operation fails
*/
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws CatalogException;
/**
* Create new partition
* @param tablePath Table path (database.table)
* @param partitionSpec Partition specification
* @param partition Partition metadata
* @param ignoreIfExists Whether to ignore if partition already exists
* @throws TableNotExistException If table doesn't exist
* @throws TableNotPartitionedException If table is not partitioned
* @throws PartitionSpecInvalidException If partition spec is invalid
* @throws PartitionAlreadyExistsException If partition exists and ignoreIfExists is false
* @throws CatalogException If operation fails
*/
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
CatalogPartition partition, boolean ignoreIfExists)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException;
/**
* Drop partition
* @param tablePath Table path (database.table)
* @param partitionSpec Partition specification
* @param ignoreIfNotExists Whether to ignore if partition doesn't exist
* @throws PartitionNotExistException If partition doesn't exist and ignoreIfNotExists is false
* @throws CatalogException If operation fails
*/
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
// Function operations
/**
* List user-defined functions in database
* @param databaseName Database name
* @return List of function names
* @throws DatabaseNotExistException If database doesn't exist
* @throws CatalogException If operation fails
*/
public List<String> listFunctions(String databaseName)
throws DatabaseNotExistException, CatalogException;
/**
* Get function metadata
* @param functionPath Function path (database.function)
* @return Function metadata
* @throws FunctionNotExistException If function doesn't exist
* @throws CatalogException If operation fails
*/
public CatalogFunction getFunction(ObjectPath functionPath)
throws FunctionNotExistException, CatalogException;
/**
* Check if function exists
* @param functionPath Function path (database.function)
* @return true if function exists
* @throws CatalogException If operation fails
*/
public boolean functionExists(ObjectPath functionPath) throws CatalogException;
/**
* Create user-defined function
* @param functionPath Function path (database.function)
* @param function Function metadata
* @param ignoreIfExists Whether to ignore if function already exists
* @throws FunctionAlreadyExistException If function exists and ignoreIfExists is false
* @throws DatabaseNotExistException If database doesn't exist
* @throws CatalogException If operation fails
*/
public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;
/**
* Alter function metadata
* @param functionPath Function path (database.function)
* @param newFunction New function metadata
* @param ignoreIfNotExists Whether to ignore if function doesn't exist
* @throws FunctionNotExistException If function doesn't exist and ignoreIfNotExists is false
* @throws CatalogException If operation fails
*/
public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException;
/**
* Drop function
* @param functionPath Function path (database.function)
* @param ignoreIfNotExists Whether to ignore if function doesn't exist
* @throws FunctionNotExistException If function doesn't exist and ignoreIfNotExists is false
* @throws CatalogException If operation fails
*/
public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException;
// Statistics operations
/**
* Get table statistics for cost-based optimization
* @param tablePath Table path (database.table)
* @return Table statistics
* @throws TableNotExistException If table doesn't exist
* @throws CatalogException If operation fails
*/
public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
throws TableNotExistException, CatalogException;
/**
* Get column statistics for cost-based optimization
* @param tablePath Table path (database.table)
* @return Column statistics
* @throws TableNotExistException If table doesn't exist
* @throws CatalogException If operation fails
*/
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
throws TableNotExistException, CatalogException;
/**
* Get partition statistics
* @param tablePath Table path (database.table)
* @param partitionSpec Partition specification
* @return Partition statistics
* @throws PartitionNotExistException If partition doesn't exist
* @throws CatalogException If operation fails
*/
public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException;
}Usage Examples:
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.ObjectPath;
// Create and configure Hive catalog
String catalogName = "production_hive";
String defaultDatabase = "analytics";
String hiveConfDir = "/etc/hive/conf"; // Contains hive-site.xml
String hadoopConfDir = "/etc/hadoop/conf"; // Contains core-site.xml, hdfs-site.xml
String hiveVersion = "2.3.9";
HiveCatalog catalog = new HiveCatalog(
catalogName,
defaultDatabase,
hiveConfDir,
hadoopConfDir,
hiveVersion
);
// Register with Table Environment
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.registerCatalog("hive", catalog);
tableEnv.useCatalog("hive");
// Database operations
List<String> databases = catalog.listDatabases();
System.out.println("Available databases: " + databases);
boolean dbExists = catalog.databaseExists("user_data");
if (!dbExists) {
CatalogDatabase newDb = new CatalogDatabaseImpl(
Map.of("description", "User analytics data"),
"Database for user analytics"
);
catalog.createDatabase("user_data", newDb, false);
}
// Table operations
ObjectPath tablePath = new ObjectPath("user_data", "events");
if (catalog.tableExists(tablePath)) {
CatalogBaseTable table = catalog.getTable(tablePath);
TableSchema schema = table.getSchema();
System.out.println("Table schema: " + schema);
// List partitions if table is partitioned
if (table instanceof CatalogTable) {
CatalogTable catalogTable = (CatalogTable) table;
if (catalogTable.isPartitioned()) {
List<CatalogPartitionSpec> partitions = catalog.listPartitions(tablePath);
System.out.println("Partitions: " + partitions.size());
}
}
}
// Query through catalog
Table result = tableEnv.sqlQuery("SELECT * FROM hive.user_data.events WHERE event_date = '2024-01-01'");