Apache Flink SQL connector for Apache Hive 2.3.9 - enables reading from and writing to Hive tables
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-9_2-12@1.19.0Apache Flink SQL connector for Apache Hive 2.3.9 enables seamless integration between Flink's streaming and batch processing capabilities and Apache Hive data warehouses. This connector provides comprehensive access to Hive tables, metastore operations, and built-in functions, allowing developers to leverage existing Hive infrastructure within Flink applications.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-2.3.9_2.12</artifactId>
<version>1.19.3</version>
</dependency>// Main factories
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
import org.apache.flink.table.module.hive.HiveModule;
// Source and sink classes
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.HiveTableSink;
// Configuration
import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
// Data structures
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.FlinkHiveException;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
// Create table environment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Create and register Hive catalog
String catalogName = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/opt/hive-conf";
String version = "2.3.9";
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, null, version);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
// Query Hive tables
Table result = tableEnv.sqlQuery("SELECT * FROM mytable WHERE active = true");
result.execute().print();import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Build Hive source
JobConf jobConf = new JobConf();
jobConf.set("hive.metastore.uris", "thrift://localhost:9083");
HiveSource<RowData> source = new HiveSourceBuilder(
jobConf,
new Configuration(),
"2.3.9", // hiveVersion
"mydb",
"mytable",
Collections.emptyMap()
).buildWithDefaultBulkFormat();
// Create data stream
DataStream<RowData> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "hive-source");
stream.print();
env.execute("Hive Stream Processing");The Flink Hive connector is built around several key architectural components:
HiveDynamicTableFactory, HiveCatalogFactory) for dynamic registration and configurationHiveSource and HiveTableSinkHiveCatalog for schema discovery and table managementHiveModule for UDF compatibilityCore functionality for reading from and writing to Hive tables using Flink's Table API and DataStream API. Supports both batch and streaming modes with comprehensive partition handling.
// Main source builder
public class HiveSourceBuilder {
public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf,
String hiveVersion, String dbName,
String tableName, Map<String, String> tableOptions);
public HiveSource<RowData> buildWithDefaultBulkFormat();
public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
public HiveSourceBuilder setLimit(Long limit);
public HiveSourceBuilder setProjectedFields(int[] projectedFields);
}
// Dynamic table source
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,
SupportsProjectionPushDown, SupportsLimitPushDown {
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
public Result applyPartitions(List<Map<String, String>> remainingPartitions);
public Result applyProjection(int[][] projectedFields, DataType producedDataType);
public Result applyLimit(long limit);
}
// Dynamic table sink
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
public Result applyStaticPartition(Map<String, String> partition);
public Result applyOverwrite(boolean overwrite);
}Table Source and Sink Operations
Complete Hive metastore integration providing schema discovery, table management, and metadata operations. Enables transparent access to existing Hive data warehouses.
public class HiveCatalog extends AbstractCatalog {
public HiveCatalog(String catalogName, String defaultDatabase,
String hiveConfDir, String hadoopConfDir, String hiveVersion);
public List<String> listDatabases() throws DatabaseNotExistException, CatalogException;
public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException;
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException;
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath);
}
public class HiveCatalogFactory implements CatalogFactory {
public Catalog createCatalog(Context context);
public String factoryIdentifier();
}Hive built-in function support enabling use of Hive UDFs within Flink SQL queries. Provides seamless function compatibility and registration.
public class HiveModule implements Module {
public HiveModule();
public HiveModule(String hiveVersion);
public Set<String> listFunctions();
public Optional<FunctionDefinition> getFunctionDefinition(String name);
}Comprehensive configuration system for customizing connector behavior, performance tuning, and environment-specific settings.
public class HiveOptions {
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;
}
public class HiveCatalogFactoryOptions {
public static final ConfigOption<String> DEFAULT_DATABASE;
public static final ConfigOption<String> HIVE_CONF_DIR;
public static final ConfigOption<String> HIVE_VERSION;
public static final ConfigOption<String> HADOOP_CONF_DIR;
}Advanced partition handling utilities for efficient data access, partition pruning, and metadata management in partitioned Hive tables.
public class HiveTablePartition implements Serializable {
public static HiveTablePartition ofTable(StorageDescriptor storageDescriptor,
Map<String, String> tableParameters);
public static HiveTablePartition ofPartition(StorageDescriptor storageDescriptor,
Map<String, String> partitionSpec,
Map<String, String> tableParameters);
public StorageDescriptor getStorageDescriptor();
public Map<String, String> getPartitionSpec();
}
public class HivePartitionUtils {
public static List<HiveTablePartition> getAllPartitions(JobConf jobConf, String catalogName,
ObjectPath tablePath, List<String> partitionColNames);
public static byte[] serializeHiveTablePartition(List<HiveTablePartition> partitions);
public static List<HiveTablePartition> deserializeHiveTablePartition(byte[] bytes, ClassLoader classLoader);
}Partition Management and Utilities
// Core exception type
public class FlinkHiveException extends RuntimeException {
public FlinkHiveException(String message);
public FlinkHiveException(Throwable cause);
public FlinkHiveException(String message, Throwable cause);
}
// Configuration wrapper
public class JobConfWrapper implements Serializable {
public JobConfWrapper(JobConf jobConf);
public JobConf conf();
}