Apache Flink SQL connector for Apache Hive 2.3.6 that provides comprehensive integration between Flink and Hive, enabling both batch and streaming access to Hive tables, Hive catalog integration, and Hive function support. This connector serves as a bridge between Flink's unified stream and batch processing capabilities and the Hive data warehouse ecosystem.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
<version>1.15.4</version>
</dependency>For runtime classpath (if not using uber JAR):
# Download and place in Flink lib directory
cp flink-sql-connector-hive-2.3.6_2.12-1.15.4.jar $FLINK_HOME/lib/// Catalog integration
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
// DataStream API source
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.connectors.hive.HiveTablePartition;
// Table API integration
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.HiveTableSink;
import org.apache.flink.connectors.hive.HiveLookupTableSource;
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
// Module for Hive functions
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.table.module.hive.HiveModuleOptions;
// Configuration options
import org.apache.flink.connectors.hive.HiveOptions;
// Exception handling
import org.apache.flink.connectors.hive.FlinkHiveException;
// Annotations for nullability
import javax.annotation.Nullable;
import javax.annotation.Nonnull;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
// Create table environment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Create and register Hive catalog
String catalogName = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/opt/hive-conf";
String hadoopConfDir = "/opt/hadoop-conf";
String hiveVersion = "2.3.6";
HiveCatalog hive = new HiveCatalog(
catalogName, defaultDatabase, hiveConfDir, hadoopConfDir, hiveVersion);
tableEnv.registerCatalog(catalogName, hive);
tableEnv.useCatalog(catalogName);
// Use Hive tables with SQL
tableEnv.executeSql("SELECT * FROM my_hive_table").print();import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create job configuration for Hive
Configuration jobConf = new Configuration();
// Set Hive metastore URI and other configs
jobConf.set("hive.metastore.uris", "thrift://localhost:9083");
// Build Hive source
HiveSource<RowData> source = new HiveSourceBuilder(
jobConf,
env.getConfiguration(),
"2.3.6",
"default",
"my_table",
Collections.emptyMap()
).buildWithDefaultBulkFormat();
// Add source to stream
env.fromSource(source, WatermarkStrategy.noWatermarks(), "hive-source")
.print();
env.execute("Hive Stream Job");import org.apache.flink.table.module.hive.HiveModule;
// Register Hive module to access Hive built-in functions
tableEnv.loadModule("hive", new HiveModule("2.3.6"));
// Use Hive functions in SQL
tableEnv.executeSql("SELECT concat('Hello', ' ', 'World')").print();The Flink Hive Connector is built around several key components:
HiveCatalog provides full metastore integration for persistent table metadataHiveSource, HiveTableSource, and HiveTableSink for data accessHiveModule enables access to Hive built-in functions within Flink SQLComplete Hive metastore integration allowing Flink to use Hive as a persistent catalog for storing table definitions, schemas, and metadata across sessions.
HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir,
@Nullable String hadoopConfDir, @Nullable String hiveVersion);
// Database operations
void createDatabase(CatalogDatabase database, boolean ignoreIfExists);
CatalogDatabase getDatabase(String databaseName);
List<String> listDatabases();
// Table operations
void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
CatalogBaseTable getTable(ObjectPath tablePath);
List<String> listTables(String databaseName);Low-level streaming and batch source for reading Hive tables directly in DataStream API programs with full control over parallelism, partitioning, and data formats.
class HiveSourceBuilder {
HiveSourceBuilder(@Nonnull JobConf jobConf, @Nonnull ReadableConfig flinkConf, @Nullable String hiveVersion,
@Nonnull String dbName, @Nonnull String tableName, @Nonnull Map<String, String> tableOptions);
HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
HiveSourceBuilder setLimit(@Nullable Long limit);
HiveSourceBuilder setProjectedFields(int[] projectedFields);
HiveSource<RowData> buildWithDefaultBulkFormat();
<T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
}
class HiveTablePartition {
static HiveTablePartition ofTable(HiveConf hiveConf, @Nullable String hiveVersion,
String dbName, String tableName);
static HiveTablePartition ofPartition(HiveConf hiveConf, @Nullable String hiveVersion,
String dbName, String tableName,
LinkedHashMap<String, String> partitionSpec);
}High-level Table API integration providing HiveTableSource and HiveTableSink for seamless SQL access to Hive tables with pushdown optimizations.
// Created automatically via catalog registration
// Supports predicate pushdown, projection pushdown, partition pruning
interface SupportsPartitionPushDown {
Result applyPartitions(List<Map<String, String>> remainingPartitions);
}
interface SupportsProjectionPushDown {
boolean supportsNestedProjection();
void applyProjection(int[][] projectedFields);
}
interface SupportsLimitPushDown {
void applyLimit(long limit);
}Module system integration enabling access to Hive built-in functions within Flink SQL, including string functions, date functions, and mathematical operations.
class HiveModule implements Module {
HiveModule();
HiveModule(String hiveVersion);
Set<String> listFunctions();
Optional<FunctionDefinition> getFunctionDefinition(String name);
String getHiveVersion();
}Comprehensive configuration system for tuning connector behavior, performance optimization, and streaming source configuration.
class HiveOptions {
// Performance tuning
ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;
// Streaming source options
ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET;
// Lookup join caching
ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
}
enum PartitionOrder {
CREATE_TIME, PARTITION_TIME, PARTITION_NAME
}class FlinkHiveException extends RuntimeException {
FlinkHiveException(String message);
FlinkHiveException(Throwable cause);
FlinkHiveException(String message, Throwable cause);
}
class HiveSourceSplit extends FileSourceSplit {
// Represents a split of Hive data for parallel processing
}
interface HiveFunction {
void setArgumentTypesAndConstants(Object[] constantArguments, DataType[] argTypes);
DataType getHiveResultType(Object[] constantArguments, DataType[] argTypes);
}