Apache Flink SQL connector for Apache Hive 3.1.2, enabling unified batch and stream processing with Hive tables.
—
Hive catalog implementation that provides unified metadata management between Flink and Hive systems. The HiveCatalog enables Flink to seamlessly access Hive databases, tables, partitions, views, and functions through the Hive metastore, maintaining consistency across both ecosystems.
Primary catalog implementation that integrates with Hive metastore for comprehensive metadata management.
/**
* Catalog implementation for Hive metastore integration
* Provides unified metadata management for databases, tables, partitions, and functions
*/
public class HiveCatalog extends AbstractCatalog {
/**
* Creates a new HiveCatalog instance
* @param catalogName Name of the catalog in Flink
* @param defaultDatabase Default database name (typically "default")
* @param hiveConfDir Path to directory containing hive-site.xml
* @param hiveVersion Hive version for compatibility (e.g., "3.1.2")
*/
public HiveCatalog(String catalogName, String defaultDatabase,
String hiveConfDir, String hiveVersion);
/**
* Creates HiveCatalog with additional Hadoop configuration
* @param catalogName Name of the catalog in Flink
* @param defaultDatabase Default database name
* @param hiveConfDir Path to Hive configuration directory
* @param hadoopConfDir Path to Hadoop configuration directory
* @param hiveVersion Hive version for compatibility
*/
public HiveCatalog(String catalogName, String defaultDatabase,
String hiveConfDir, String hadoopConfDir, String hiveVersion);
/**
* Opens the catalog and establishes metastore connection
* @throws CatalogException if connection fails
*/
public void open() throws CatalogException;
/**
* Closes the catalog and releases resources
* @throws CatalogException if close operation fails
*/
public void close() throws CatalogException;
// Database Operations
/**
* Lists all databases in the catalog
* @return List of database names
* @throws CatalogException if operation fails
*/
public List<String> listDatabases() throws CatalogException;
/**
* Checks if a database exists
* @param databaseName Name of the database
* @return True if database exists, false otherwise
* @throws CatalogException if operation fails
*/
public boolean databaseExists(String databaseName) throws CatalogException;
/**
* Gets database metadata
* @param databaseName Name of the database
* @return CatalogDatabase containing metadata
* @throws DatabaseNotExistException if database doesn't exist
* @throws CatalogException if operation fails
*/
public CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException;
/**
* Creates a new database
* @param databaseName Name of the database to create
* @param database Database metadata
* @param ignoreIfExists Whether to ignore if database already exists
* @throws DatabaseAlreadyExistException if database exists and ignoreIfExists is false
* @throws CatalogException if operation fails
*/
public void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException;
/**
* Drops an existing database
* @param databaseName Name of the database to drop
* @param ignoreIfNotExists Whether to ignore if database doesn't exist
* @param cascade Whether to drop tables in the database
* @throws DatabaseNotExistException if database doesn't exist and ignoreIfNotExists is false
* @throws DatabaseNotEmptyException if database is not empty and cascade is false
* @throws CatalogException if operation fails
*/
public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException;
// Table Operations
/**
* Lists all tables in a database
* @param databaseName Name of the database
* @return List of table names
* @throws DatabaseNotExistException if database doesn't exist
* @throws CatalogException if operation fails
*/
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException;
/**
* Checks if a table exists
* @param tablePath Path identifying the table (database.table)
* @return True if table exists, false otherwise
* @throws CatalogException if operation fails
*/
public boolean tableExists(ObjectPath tablePath) throws CatalogException;
/**
* Gets table metadata
* @param tablePath Path identifying the table
* @return CatalogBaseTable containing metadata
* @throws TableNotExistException if table doesn't exist
* @throws CatalogException if operation fails
*/
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException;
/**
* Creates a new table
* @param tablePath Path identifying the table
* @param table Table metadata
* @param ignoreIfExists Whether to ignore if table already exists
* @throws TableAlreadyExistException if table exists and ignoreIfExists is false
* @throws DatabaseNotExistException if database doesn't exist
* @throws CatalogException if operation fails
*/
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;
/**
* Drops an existing table
* @param tablePath Path identifying the table
* @param ignoreIfNotExists Whether to ignore if table doesn't exist
* @throws TableNotExistException if table doesn't exist and ignoreIfNotExists is false
* @throws CatalogException if operation fails
*/
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException;
// Partition Operations
/**
* Lists all partitions of a table
* @param tablePath Path identifying the table
* @return List of partition specifications
* @throws TableNotExistException if table doesn't exist
* @throws TableNotPartitionedException if table is not partitioned
* @throws CatalogException if operation fails
*/
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException;
/**
* Gets partition metadata
* @param tablePath Path identifying the table
* @param partitionSpec Partition specification
* @return CatalogPartition containing metadata
* @throws PartitionNotExistException if partition doesn't exist
* @throws CatalogException if operation fails
*/
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException;
/**
* Checks if a partition exists
* @param tablePath Path identifying the table
* @param partitionSpec Partition specification
* @return True if partition exists, false otherwise
* @throws CatalogException if operation fails
*/
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws CatalogException;
// Function Operations
/**
* Lists all functions in a database
* @param databaseName Name of the database
* @return List of function names
* @throws DatabaseNotExistException if database doesn't exist
* @throws CatalogException if operation fails
*/
public List<String> listFunctions(String databaseName)
throws DatabaseNotExistException, CatalogException;
/**
* Gets function metadata
* @param functionPath Path identifying the function
* @return CatalogFunction containing metadata
* @throws FunctionNotExistException if function doesn't exist
* @throws CatalogException if operation fails
*/
public CatalogFunction getFunction(ObjectPath functionPath)
throws FunctionNotExistException, CatalogException;
/**
* Checks if a function exists
* @param functionPath Path identifying the function
* @return True if function exists, false otherwise
* @throws CatalogException if operation fails
*/
public boolean functionExists(ObjectPath functionPath) throws CatalogException;
/**
* Creates a new function
* @param functionPath Path identifying the function
* @param function Function metadata
* @param ignoreIfExists Whether to ignore if function already exists
* @throws FunctionAlreadyExistException if function exists and ignoreIfExists is false
* @throws DatabaseNotExistException if database doesn't exist
* @throws CatalogException if operation fails
*/
public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;
/**
* Drops an existing function
* @param functionPath Path identifying the function
* @param ignoreIfNotExists Whether to ignore if function doesn't exist
* @throws FunctionNotExistException if function doesn't exist and ignoreIfNotExists is false
* @throws CatalogException if operation fails
*/
public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException;
/**
* Alters an existing function
* @param functionPath Path identifying the function
* @param newFunction New function metadata
* @param ignoreIfNotExists Whether to ignore if function doesn't exist
* @throws FunctionNotExistException if function doesn't exist and ignoreIfNotExists is false
* @throws CatalogException if operation fails
*/
public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException;
// Additional Table Operations
/**
* Lists all views in a database
* @param databaseName Name of the database
* @return List of view names
* @throws DatabaseNotExistException if database doesn't exist
* @throws CatalogException if operation fails
*/
public List<String> listViews(String databaseName)
throws DatabaseNotExistException, CatalogException;
/**
* Renames an existing table
* @param tablePath Current path of the table
* @param newTableName New name for the table
* @param ignoreIfNotExists Whether to ignore if table doesn't exist
* @throws TableNotExistException if table doesn't exist and ignoreIfNotExists is false
* @throws TableAlreadyExistException if new table name already exists
* @throws CatalogException if operation fails
*/
public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, CatalogException;
/**
* Alters an existing table
* @param tablePath Path identifying the table
* @param newCatalogTable New table metadata
* @param ignoreIfNotExists Whether to ignore if table doesn't exist
* @throws TableNotExistException if table doesn't exist and ignoreIfNotExists is false
* @throws CatalogException if operation fails
*/
public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException;
/**
* Alters an existing database
* @param databaseName Name of the database
* @param newDatabase New database metadata
* @param ignoreIfNotExists Whether to ignore if database doesn't exist
* @throws DatabaseNotExistException if database doesn't exist and ignoreIfNotExists is false
* @throws CatalogException if operation fails
*/
public void alterDatabase(String databaseName, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException;
// Additional Partition Operations
/**
* Lists partitions matching a partial partition specification
* @param tablePath Path identifying the table
* @param partitionSpec Partial partition specification
* @return List of matching partition specifications
* @throws TableNotExistException if table doesn't exist
* @throws TableNotPartitionedException if table is not partitioned
* @throws PartitionSpecInvalidException if partition spec is invalid
* @throws CatalogException if operation fails
*/
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException;
/**
* Lists partitions matching filter expressions
* @param tablePath Path identifying the table
* @param expressions Filter expressions
* @return List of matching partition specifications
* @throws TableNotExistException if table doesn't exist
* @throws TableNotPartitionedException if table is not partitioned
* @throws CatalogException if operation fails
*/
public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> expressions)
throws TableNotExistException, TableNotPartitionedException, CatalogException;
/**
* Creates a new partition
* @param tablePath Path identifying the table
* @param partitionSpec Partition specification
* @param partition Partition metadata
* @param ignoreIfExists Whether to ignore if partition already exists
* @throws TableNotExistException if table doesn't exist
* @throws TableNotPartitionedException if table is not partitioned
* @throws PartitionSpecInvalidException if partition spec is invalid
* @throws PartitionAlreadyExistsException if partition exists and ignoreIfExists is false
* @throws CatalogException if operation fails
*/
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
CatalogPartition partition, boolean ignoreIfExists)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException,
PartitionAlreadyExistsException, CatalogException;
/**
* Drops an existing partition
* @param tablePath Path identifying the table
* @param partitionSpec Partition specification
* @param ignoreIfNotExists Whether to ignore if partition doesn't exist
* @throws PartitionNotExistException if partition doesn't exist and ignoreIfNotExists is false
* @throws CatalogException if operation fails
*/
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
/**
* Alters an existing partition
* @param tablePath Path identifying the table
* @param partitionSpec Partition specification
* @param newPartition New partition metadata
* @param ignoreIfNotExists Whether to ignore if partition doesn't exist
* @throws PartitionNotExistException if partition doesn't exist and ignoreIfNotExists is false
* @throws CatalogException if operation fails
*/
public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
CatalogPartition newPartition, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
// Statistics Operations
/**
* Gets table statistics
* @param tablePath Path identifying the table
* @return Table statistics
* @throws TableNotExistException if table doesn't exist
* @throws CatalogException if operation fails
*/
public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
throws TableNotExistException, CatalogException;
/**
* Gets table column statistics
* @param tablePath Path identifying the table
* @return Column statistics
* @throws TableNotExistException if table doesn't exist
* @throws CatalogException if operation fails
*/
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
throws TableNotExistException, CatalogException;
/**
* Gets partition statistics
* @param tablePath Path identifying the table
* @param partitionSpec Partition specification
* @return Partition statistics
* @throws PartitionNotExistException if partition doesn't exist
* @throws CatalogException if operation fails
*/
public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException;
/**
* Gets partition column statistics
* @param tablePath Path identifying the table
* @param partitionSpec Partition specification
* @return Partition column statistics
* @throws PartitionNotExistException if partition doesn't exist
* @throws CatalogException if operation fails
*/
public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException;
// Utility Methods
// Additional Constructors
/**
* Creates HiveCatalog with explicit HiveConf instance
* @param catalogName Name of the catalog in Flink
* @param defaultDatabase Default database name
* @param hiveConf Hive configuration instance
* @param hiveVersion Hive version for compatibility
*/
public HiveCatalog(String catalogName, String defaultDatabase,
HiveConf hiveConf, String hiveVersion);
/**
* Creates HiveCatalog with HiveConf and embedded metastore control
* @param catalogName Name of the catalog in Flink
* @param defaultDatabase Default database name
* @param hiveConf Hive configuration instance
* @param hiveVersion Hive version for compatibility
* @param allowEmbedded Whether to allow embedded metastore
*/
public HiveCatalog(String catalogName, String defaultDatabase,
HiveConf hiveConf, String hiveVersion, boolean allowEmbedded);
// Configuration and Access Methods
/**
* Gets the Hive configuration instance
* @return HiveConf instance used by this catalog
*/
public HiveConf getHiveConf();
/**
* Gets the Hive version string
* @return Hive version (e.g., "3.1.2")
*/
public String getHiveVersion();
/**
* Checks if catalog supports managed tables
* @return True if managed tables are supported
*/
public boolean supportsManagedTable();
/**
* Gets factory instance for table factory integration
* @return Optional Factory instance
*/
public Optional<Factory> getFactory();
/**
* Gets table factory for legacy integration
* @return Optional TableFactory instance
*/
public Optional<TableFactory> getTableFactory();
/**
* Gets function definition factory
* @return Optional FunctionDefinitionFactory instance
*/
public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory();
// Static Utility Methods
/**
* Creates HiveConf from configuration directories
* @param hiveConfDir Path to Hive configuration directory
* @param hadoopConfDir Path to Hadoop configuration directory
* @return Configured HiveConf instance
*/
public static HiveConf createHiveConf(String hiveConfDir, String hadoopConfDir);
/**
* Extracts field names from Hive field schema list
* @param fieldSchemas List of Hive field schemas
* @return List of field names
*/
public static List<String> getFieldNames(List<FieldSchema> fieldSchemas);
/**
* Determines if a table is managed by Hive based on its properties
* @param properties Table properties map
* @return True if table is a Hive table, false otherwise
*/
public static boolean isHiveTable(Map<String, String> properties);
/**
* Determines if a table is managed by Hive based on table instance
* @param table CatalogBaseTable instance
* @return True if table is a Hive table, false otherwise
*/
public static boolean isHiveTable(CatalogBaseTable table);
/**
* Checks if the metastore is configured for embedded mode
* @param hiveConf Hive configuration to check
* @return True if using embedded metastore, false otherwise
*/
public static boolean isEmbeddedMetastore(HiveConf hiveConf);
// Hive-specific Operations (Internal API)
/**
* Gets the underlying Hive Table object
* @param tablePath Path identifying the table
* @return Hive Table instance
* @throws TableNotExistException if table doesn't exist
*/
public Table getHiveTable(ObjectPath tablePath) throws TableNotExistException;
/**
* Loads data into a table from external location
* @param loadPath Path to data files
* @param tablePath Path identifying the target table
* @param isOverwrite Whether to overwrite existing data
* @param isSrcLocal Whether source is on local filesystem
*/
public void loadTable(Path loadPath, ObjectPath tablePath, boolean isOverwrite, boolean isSrcLocal);
/**
* Loads data into a specific partition from external location
* @param loadPath Path to data files
* @param tablePath Path identifying the target table
* @param partSpec Partition specification
* @param isOverwrite Whether to overwrite existing data
* @param isSrcLocal Whether source is on local filesystem
*/
public void loadPartition(Path loadPath, ObjectPath tablePath, Map<String, String> partSpec,
boolean isOverwrite, boolean isSrcLocal);
}Usage Examples:
// Programmatic catalog creation and registration
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
// Create table environment
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Create and register Hive catalog
String catalogName = "hive_catalog";
String defaultDatabase = "default";
String hiveConfDir = "/opt/hive/conf"; // Contains hive-site.xml
String hiveVersion = "3.1.2";
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, hiveVersion);
tableEnv.registerCatalog(catalogName, hive);
// Set as current catalog
tableEnv.useCatalog(catalogName);
// Use catalog operations
tableEnv.executeSql("SHOW DATABASES").print();
tableEnv.executeSql("SHOW TABLES").print();
tableEnv.executeSql("DESCRIBE hive_table").print();-- SQL DDL catalog usage
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'hive-conf-dir' = '/opt/hive/conf',
'hive-version' = '3.1.2',
'hadoop-conf-dir' = '/opt/hadoop/conf'
);
-- Switch to Hive catalog
USE CATALOG hive_catalog;
USE default;
-- Catalog operations
SHOW DATABASES;
SHOW TABLES;
DESCRIBE EXTENDED hive_table;
SHOW PARTITIONS hive_partitioned_table;
-- Create database and tables
CREATE DATABASE analytics_db;
USE analytics_db;
CREATE TABLE user_events (
user_id BIGINT,
event_type STRING,
event_time TIMESTAMP,
partition_date STRING
) PARTITIONED BY (partition_date)
STORED AS PARQUET;The HiveCatalog maintains metadata consistency between Flink and Hive, enabling:
Bidirectional Table Access:
Metadata Consistency:
Function Integration:
/**
* Configuration constants for Hive catalog
*/
public class HiveCatalogConfig {
/** Default separator for column type lists */
public static final String DEFAULT_LIST_COLUMN_TYPES_SEPARATOR = ":";
/** Property key for table comments */
public static final String COMMENT = "comment";
/** Property key for partition location */
public static final String PARTITION_LOCATION = "partition.location";
/** Property key for table location */
public static final String TABLE_LOCATION = "location";
/** Property key for storage format */
public static final String STORED_AS = "stored-as";
/** Property key for input format */
public static final String INPUT_FORMAT = "input-format";
/** Property key for output format */
public static final String OUTPUT_FORMAT = "output-format";
/** Property key for serialization library */
public static final String SERDE_LIB = "serde-lib";
}The catalog provides comprehensive error handling for common scenarios:
// Common exceptions thrown by catalog operations
try {
CatalogBaseTable table = catalog.getTable(new ObjectPath("db", "table"));
} catch (TableNotExistException e) {
// Handle missing table
} catch (DatabaseNotExistException e) {
// Handle missing database
} catch (CatalogException e) {
// Handle general catalog errors (connectivity, permissions, etc.)
}Common Error Scenarios:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12