Apache Flink SQL connector for Apache Hive 3.1.2 that enables unified BATCH and STREAM processing of Hive tables.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2_2-11@1.14.0Apache Flink SQL Hive Connector 3.1.2 provides seamless integration between Apache Flink and Apache Hive 3.1.2, enabling unified BATCH and STREAM processing of Hive tables through Flink's Table/SQL API. The connector serves as a bridge between Flink's streaming capabilities and Hive's data warehouse ecosystem, supporting both metadata management through HiveCatalog and data processing through specialized table sources and sinks.
org.apache.flink:flink-sql-connector-hive-3.1.2_2.11:1.14.6import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.HiveTableSink;
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.table.module.hive.HiveModule;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
// Create table environment
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Create and register Hive catalog
String catalogName = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog(catalogName, hive);
tableEnv.useCatalog(catalogName);
// Query Hive tables using SQL
tableEnv.executeSql("SELECT * FROM my_hive_table LIMIT 10");The connector is built around several key components:
Complete Hive metastore integration for metadata operations including databases, tables, partitions, and functions. Supports both batch and streaming table discovery with automatic schema inference.
public class HiveCatalog extends AbstractCatalog {
public HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir);
public HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hiveVersion);
public HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hadoopConfDir, String hiveVersion);
public void open() throws CatalogException;
public void close() throws CatalogException;
public HiveConf getHiveConf();
}High-level Table API integration for reading from and writing to Hive tables with advanced optimizations like partition pruning, projection pushdown, and limit pushdown.
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown, SupportsProjectionPushDown, SupportsLimitPushDown {
public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
public Result applyPartitions(List<Map<String, String>> remainingPartitions);
public Result applyProjection(int[][] projectedFields);
public Result applyLimit(long limit);
}
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
public Result applyStaticPartition(Map<String, String> partition);
public Result applyOverwrite(boolean overwrite);
}Lower-level DataStream API integration providing fine-grained control over Hive data processing with custom formats and transformations.
@PublicEvolving
public class HiveSource<T> implements Source<T> {
public Boundedness getBoundedness();
public SourceReader<T, HiveSourceSplit> createReader(SourceReaderContext readerContext);
public SplitEnumerator<HiveSourceSplit, HivePendingSplitsCheckpoint> createEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext);
}
@PublicEvolving
public class HiveSourceBuilder {
public HiveSourceBuilder setProjectedFields(int[] projectedFields);
public HiveSourceBuilder setLimit(Long limit);
public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
public <T> HiveSource<T> buildWithDefaultBulkFormat();
}Integration with Hive built-in functions through Flink's module system, enabling access to hundreds of Hive functions within Flink SQL queries.
public class HiveModule implements Module {
public Set<String> listFunctions();
public Optional<FunctionDefinition> getFunctionDefinition(String name);
public String getHiveVersion();
}Comprehensive configuration system for tuning connector behavior, performance optimization, and feature toggles.
public class HiveOptions {
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
}@PublicEvolving
public class HiveTablePartition {
public StorageDescriptor getStorageDescriptor();
public LinkedHashMap<String, String> getPartitionSpec();
public Properties getTableProperties();
}
@PublicEvolving
public class FlinkHiveException extends RuntimeException {
public FlinkHiveException(String message);
public FlinkHiveException(Throwable cause);
public FlinkHiveException(String message, Throwable cause);
}
public class FlinkHiveUDFException extends RuntimeException {
public FlinkHiveUDFException(String message);
public FlinkHiveUDFException(Throwable cause);
public FlinkHiveUDFException(String message, Throwable cause);
}The connector automatically registers with Flink through the service provider interface (SPI) mechanism via META-INF/services/org.apache.flink.table.factories.Factory:
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory - Enables "hive" catalog typeorg.apache.flink.table.module.hive.HiveModuleFactory - Enables "hive" module typeorg.apache.flink.table.planner.delegation.hive.HiveParserFactory - Enables Hive SQL dialectThis allows automatic discovery and registration of Hive functionality when the connector JAR is present in the classpath.