Apache Flink SQL connector for Apache Hive 3.1.2 that enables unified BATCH and STREAM processing of Hive tables.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2_2-11@1.14.00
# Apache Flink SQL Hive Connector 3.1.2
1
2
Apache Flink SQL Hive Connector 3.1.2 provides seamless integration between Apache Flink and Apache Hive 3.1.2, enabling unified BATCH and STREAM processing of Hive tables through Flink's Table/SQL API. The connector serves as a bridge between Flink's streaming capabilities and Hive's data warehouse ecosystem, supporting both metadata management through HiveCatalog and data processing through specialized table sources and sinks.
3
4
## Package Information
5
6
- **Package Name**: flink-sql-connector-hive-3.1.2_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: `org.apache.flink:flink-sql-connector-hive-3.1.2_2.11:1.14.6`
10
- **License**: Apache-2.0
11
12
## Core Imports
13
14
```java
15
import org.apache.flink.table.catalog.hive.HiveCatalog;
16
import org.apache.flink.connectors.hive.HiveTableSource;
17
import org.apache.flink.connectors.hive.HiveTableSink;
18
import org.apache.flink.connectors.hive.HiveSource;
19
import org.apache.flink.connectors.hive.HiveSourceBuilder;
20
import org.apache.flink.table.module.hive.HiveModule;
21
```
22
23
## Basic Usage
24
25
```java
26
import org.apache.flink.table.api.EnvironmentSettings;
27
import org.apache.flink.table.api.TableEnvironment;
28
import org.apache.flink.table.catalog.hive.HiveCatalog;
29
30
// Create table environment
31
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
32
TableEnvironment tableEnv = TableEnvironment.create(settings);
33
34
// Create and register Hive catalog
35
String catalogName = "myhive";
36
String defaultDatabase = "default";
37
String hiveConfDir = "/opt/hive-conf";
38
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
39
40
tableEnv.registerCatalog(catalogName, hive);
41
tableEnv.useCatalog(catalogName);
42
43
// Query Hive tables using SQL
44
tableEnv.executeSql("SELECT * FROM my_hive_table LIMIT 10");
45
```
46
47
## Architecture
48
49
The connector is built around several key components:
50
51
- **HiveCatalog**: Manages metadata integration with Hive metastore, providing database, table, partition, and function operations
52
- **Table Sources/Sinks**: Handle data reading and writing with support for partition pruning, projection pushdown, and streaming modes
53
- **Unified Data Source API**: Provides lower-level access through HiveSource for custom data processing workflows
54
- **Function Module**: Exposes Hive built-in functions to Flink through HiveModule
55
- **Configuration System**: Comprehensive options for tuning connector behavior and performance
56
57
## Capabilities
58
59
### Catalog Management
60
61
Complete Hive metastore integration for metadata operations including databases, tables, partitions, and functions. Supports both batch and streaming table discovery with automatic schema inference.
62
63
```java { .api }
64
public class HiveCatalog extends AbstractCatalog {
65
public HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir);
66
public HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hiveVersion);
67
public HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hadoopConfDir, String hiveVersion);
68
69
public void open() throws CatalogException;
70
public void close() throws CatalogException;
71
public HiveConf getHiveConf();
72
}
73
```
74
75
[Catalog Management](./catalog.md)
76
77
### Table Sources and Sinks
78
79
High-level Table API integration for reading from and writing to Hive tables with advanced optimizations like partition pruning, projection pushdown, and limit pushdown.
80
81
```java { .api }
82
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown, SupportsProjectionPushDown, SupportsLimitPushDown {
83
public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
84
public Result applyPartitions(List<Map<String, String>> remainingPartitions);
85
public Result applyProjection(int[][] projectedFields);
86
public Result applyLimit(long limit);
87
}
88
89
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
90
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
91
public Result applyStaticPartition(Map<String, String> partition);
92
public Result applyOverwrite(boolean overwrite);
93
}
94
```
95
96
[Table Sources and Sinks](./table-api.md)
97
98
### Unified Data Source
99
100
Lower-level DataStream API integration providing fine-grained control over Hive data processing with custom formats and transformations.
101
102
```java { .api }
103
@PublicEvolving
104
public class HiveSource<T> implements Source<T> {
105
public Boundedness getBoundedness();
106
public SourceReader<T, HiveSourceSplit> createReader(SourceReaderContext readerContext);
107
public SplitEnumerator<HiveSourceSplit, HivePendingSplitsCheckpoint> createEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext);
108
}
109
110
@PublicEvolving
111
public class HiveSourceBuilder {
112
public HiveSourceBuilder setProjectedFields(int[] projectedFields);
113
public HiveSourceBuilder setLimit(Long limit);
114
public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
115
public <T> HiveSource<T> buildWithDefaultBulkFormat();
116
}
117
```
118
119
[Unified Data Source](./data-source.md)
120
121
### Function Module
122
123
Integration with Hive built-in functions through Flink's module system, enabling access to hundreds of Hive functions within Flink SQL queries.
124
125
```java { .api }
126
public class HiveModule implements Module {
127
public Set<String> listFunctions();
128
public Optional<FunctionDefinition> getFunctionDefinition(String name);
129
public String getHiveVersion();
130
}
131
```
132
133
[Function Module](./functions.md)
134
135
### Configuration Options
136
137
Comprehensive configuration system for tuning connector behavior, performance optimization, and feature toggles.
138
139
```java { .api }
140
public class HiveOptions {
141
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
142
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
143
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;
144
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
145
}
146
```
147
148
[Configuration](./configuration.md)
149
150
## Common Types
151
152
```java { .api }
153
@PublicEvolving
154
public class HiveTablePartition {
155
public StorageDescriptor getStorageDescriptor();
156
public LinkedHashMap<String, String> getPartitionSpec();
157
public Properties getTableProperties();
158
}
159
160
@PublicEvolving
161
public class FlinkHiveException extends RuntimeException {
162
public FlinkHiveException(String message);
163
public FlinkHiveException(Throwable cause);
164
public FlinkHiveException(String message, Throwable cause);
165
}
166
167
public class FlinkHiveUDFException extends RuntimeException {
168
public FlinkHiveUDFException(String message);
169
public FlinkHiveUDFException(Throwable cause);
170
public FlinkHiveUDFException(String message, Throwable cause);
171
}
172
```
173
174
## Service Provider Integration
175
176
The connector automatically registers with Flink through the service provider interface (SPI) mechanism via `META-INF/services/org.apache.flink.table.factories.Factory`:
177
178
- `org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory` - Enables "hive" catalog type
179
- `org.apache.flink.table.module.hive.HiveModuleFactory` - Enables "hive" module type
180
- `org.apache.flink.table.planner.delegation.hive.HiveParserFactory` - Enables Hive SQL dialect
181
182
This allows automatic discovery and registration of Hive functionality when the connector JAR is present in the classpath.