Apache Flink SQL connector for Apache Hive 3.1.2, enabling unified batch and stream processing with Hive tables.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12@1.16.0The Apache Flink SQL Connector for Hive 3.1.2 enables seamless integration between Apache Flink and Apache Hive 3.1.2, providing unified batch and stream processing capabilities with Hive tables. This connector allows Flink to read from and write to Hive tables, supports both partitioned and non-partitioned tables in streaming and batch modes, and includes comprehensive catalog integration for metadata management.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-3.1.2_2.12</artifactId>
<version>1.16.3</version>
</dependency>// Factory classes for programmatic usage
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
import org.apache.flink.table.module.hive.HiveModuleFactory;
// Core connector classes
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.HiveTableSink;
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
// Catalog and module classes
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;
// Configuration and options
import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;-- Create Hive catalog
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'hive-conf-dir' = '/path/to/hive/conf',
'hive-version' = '3.1.2'
);
-- Use the Hive catalog
USE CATALOG hive_catalog;
USE default;
-- Read from existing Hive table
SELECT * FROM my_hive_table WHERE partition_date = '2023-01-01';
-- Create and write to new Hive table
CREATE TABLE new_hive_table (
id BIGINT,
name STRING,
event_time TIMESTAMP,
partition_date STRING
) PARTITIONED BY (partition_date)
STORED AS PARQUET
TBLPROPERTIES (
'sink.partition-commit.policy.kind' = 'metastore,success-file'
);
INSERT INTO new_hive_table SELECT id, name, event_time, '2023-01-01' FROM source_table;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
// Create table environment
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Create Hive catalog
String catalogName = "hive_catalog";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive/conf";
String hiveVersion = "3.1.2";
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, hiveVersion);
tableEnv.registerCatalog(catalogName, hive);
tableEnv.useCatalog(catalogName);
// Execute queries
tableEnv.executeSql("SELECT * FROM my_hive_table LIMIT 10").print();import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
HiveSource<RowData> hiveSource = new HiveSourceBuilder()
.setTableIdentifier(tableIdentifier)
.setHiveConfiguration(hiveConf)
.build();
env.fromSource(hiveSource, WatermarkStrategy.noWatermarks(), "Hive Source")
.print();
env.execute("Hive Streaming Job");The Flink Hive Connector is built around several key components:
The connector supports both legacy and modern Flink table interfaces, enabling seamless migration from older versions while providing access to the latest features.
Factory classes that enable automatic connector discovery through Flink's Service Provider Interface (SPI). These factories handle connector instantiation and configuration validation.
public class HiveCatalogFactory implements CatalogFactory {
public String factoryIdentifier(): "hive";
}
public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
public String factoryIdentifier(): "hive";
}
public class HiveModuleFactory implements ModuleFactory {
public String factoryIdentifier(): "hive";
}Core table connector implementations providing read and write capabilities for Hive tables with support for batch processing, streaming ingestion, partition management, and performance optimizations.
public class HiveTableSource implements ScanTableSource,
SupportsPartitionPushDown, SupportsProjectionPushDown, SupportsLimitPushDown {
// Batch and streaming source for Hive tables
}
public class HiveTableSink implements DynamicTableSink {
// Sink for writing to Hive tables with partition support
}
public final class HiveSource<T> implements Source<T, HiveSplitEnumeratorState, HiveSourceSplit> {
// Low-level source using new Source API
}Hive catalog implementation that provides unified metadata management, enabling Flink to access Hive databases, tables, partitions, and functions through the Hive metastore.
public class HiveCatalog extends AbstractCatalog {
public HiveCatalog(String catalogName, String defaultDatabase,
String hiveConfDir, String hiveVersion);
public static boolean isHiveTable(Map<String, String> properties);
}Module system providing access to Hive built-in functions within Flink SQL, enabling compatibility with existing Hive UDFs and maintaining function behavior consistency.
public class HiveModule implements Module {
public HiveModule(String hiveVersion);
}Comprehensive configuration options for controlling connector behavior, performance tuning, and feature enablement across reading, writing, streaming, and lookup join scenarios.
public class HiveOptions {
// Reading options
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM;
public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_SPLIT_MAX_BYTES;
public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_FILE_OPEN_COST;
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM;
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_DYNAMIC_GROUPING_ENABLED;
// Writing options
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;
public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_CLASS;
public static final ConfigOption<String> SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE;
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_THREAD_NUM;
// Streaming options
public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE;
public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET;
public static final ConfigOption<PartitionOrder> STREAMING_SOURCE_PARTITION_ORDER;
// Lookup join options
public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
}Specialized lookup table source for dimension table joins, providing caching capabilities and optimized access patterns for real-time data enrichment scenarios.
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
// Lookup join capabilities for dimension tables
}// Exception handling
public class FlinkHiveException extends RuntimeException {
public FlinkHiveException(String message);
public FlinkHiveException(Throwable cause);
public FlinkHiveException(String message, Throwable cause);
}
// Partition representation
public class HiveTablePartition {
// Constructors
public HiveTablePartition(StorageDescriptor storageDescriptor, Properties tableProps);
public HiveTablePartition(StorageDescriptor storageDescriptor,
Map<String, String> partitionSpec, Properties tableProps);
// Instance methods
public StorageDescriptor getStorageDescriptor();
public Map<String, String> getPartitionSpec();
public Properties getTableProps();
public boolean equals(Object o);
public int hashCode();
public String toString();
// Static factory methods
public static HiveTablePartition ofTable(HiveConf hiveConf, String hiveVersion,
String dbName, String tableName);
public static HiveTablePartition ofPartition(HiveConf hiveConf, String hiveVersion,
String dbName, String tableName,
LinkedHashMap<String, String> partitionSpec);
}
// Configuration wrapper
public class JobConfWrapper implements Serializable {
public JobConfWrapper(JobConf jobConf);
public JobConf conf();
}
// Source implementation with generic type parameter
public final class HiveSource<T> implements Source<T, HiveSplitEnumeratorState, HiveSourceSplit> {
// Source interface methods
public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();
public SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>> getEnumeratorCheckpointSerializer();
public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> createEnumerator(
SplitEnumeratorContext<HiveSourceSplit> enumContext);
public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> restoreEnumerator(
SplitEnumeratorContext<HiveSourceSplit> enumContext,
PendingSplitsCheckpoint<HiveSourceSplit> checkpoint);
}
// Table source with interface implementations
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,
SupportsProjectionPushDown, SupportsLimitPushDown, SupportsStatisticReport, SupportsDynamicFiltering {
public HiveTableSource(JobConf jobConf, ReadableConfig flinkConf,
ObjectPath tablePath, CatalogTable catalogTable);
// ScanTableSource methods
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext);
public ChangelogMode getChangelogMode();
public DynamicTableSource copy();
public String asSummaryString();
// Optimization interface methods
public void applyLimit(long limit);
public Optional<List<Map<String, String>>> listPartitions();
public void applyPartitions(List<Map<String, String>> remainingPartitions);
public List<String> listAcceptedFilterFields();
public void applyDynamicFiltering(List<String> candidateFilterFields);
public boolean supportsNestedProjection();
public void applyProjection(int[][] projectedFields, DataType producedDataType);
public TableStats reportStatistics();
}
// Table sink with interface implementations
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
public HiveTableSink(ReadableConfig flinkConf, JobConf jobConf,
ObjectIdentifier identifier, CatalogTable table, Integer configuredParallelism);
// DynamicTableSink methods
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
public DynamicTableSink copy();
public String asSummaryString();
// Partitioning and overwrite support
public boolean requiresPartitionGrouping(boolean supportsGrouping);
public void applyStaticPartition(Map<String, String> partition);
public void applyOverwrite(boolean overwrite);
}
// Lookup table source extending HiveTableSource
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
public HiveLookupTableSource(JobConf jobConf, ReadableConfig flinkConf,
ObjectPath tablePath, CatalogTable catalogTable);
// LookupTableSource methods
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
public DynamicTableSource copy();
}
// Builder pattern
public class HiveSourceBuilder {
// Constructors
public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf, String hiveVersion,
String dbName, String tableName, Map<String, String> tableOptions);
public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf, ObjectPath tablePath,
String hiveVersion, CatalogTable catalogTable);
// Configuration methods
public HiveSourceBuilder setProjectedFields(int[] projectedFields);
public HiveSourceBuilder setLimit(Long limit);
public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
public HiveSourceBuilder setDynamicFilterPartitionKeys(List<String> dynamicFilterPartitionKeys);
// Build methods
public HiveSource<RowData> buildWithDefaultBulkFormat();
public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
}// Partition ordering strategies
public enum PartitionOrder implements DescribedEnum {
CREATE_TIME("create-time", "Order partitions by creation time"),
PARTITION_TIME("partition-time", "Order partitions by partition time"),
PARTITION_NAME("partition-name", "Order partitions by partition name");
public String toString();
public InlineElement getDescription();
}
// Version constants
public class HiveShimLoader {
public static final String HIVE_VERSION_V3_1_2 = "3.1.2";
public static String getHiveVersion();
}
// Catalog configuration constants
public class HiveCatalogConfig {
public static final String DEFAULT_LIST_COLUMN_TYPES_SEPARATOR = ":";
public static final String COMMENT = "comment";
public static final String PARTITION_LOCATION = "partition.location";
}