Apache Flink SQL connector for Apache Hive 2.3.9 - enables reading from and writing to Hive tables
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-9_2-12@1.19.00
# Apache Flink SQL Connector for Hive 2.3.9
1
2
Apache Flink SQL connector for Apache Hive 2.3.9 enables seamless integration between Flink's streaming and batch processing capabilities and Apache Hive data warehouses. This connector provides comprehensive access to Hive tables, metastore operations, and built-in functions, allowing developers to leverage existing Hive infrastructure within Flink applications.
3
4
## Package Information
5
6
- **Package Name**: flink-sql-connector-hive-2.3.9_2.12
7
- **Package Type**: maven
8
- **Group ID**: org.apache.flink
9
- **Language**: Java
10
- **Installation**: Add dependency to pom.xml
11
- **Hive Version**: 2.3.9
12
- **Scala Version**: 2.12
13
14
```xml
15
<dependency>
16
<groupId>org.apache.flink</groupId>
17
<artifactId>flink-sql-connector-hive-2.3.9_2.12</artifactId>
18
<version>1.19.3</version>
19
</dependency>
20
```
21
22
## Core Imports
23
24
```java
25
// Main factories
26
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
27
import org.apache.flink.table.catalog.hive.HiveCatalog;
28
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
29
import org.apache.flink.table.module.hive.HiveModule;
30
31
// Source and sink classes
32
import org.apache.flink.connectors.hive.HiveSource;
33
import org.apache.flink.connectors.hive.HiveSourceBuilder;
34
import org.apache.flink.connectors.hive.HiveTableSource;
35
import org.apache.flink.connectors.hive.HiveTableSink;
36
37
// Configuration
38
import org.apache.flink.connectors.hive.HiveOptions;
39
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
40
41
// Data structures
42
import org.apache.flink.connectors.hive.HiveTablePartition;
43
import org.apache.flink.connectors.hive.FlinkHiveException;
44
```
45
46
## Basic Usage
47
48
### Table API Integration
49
50
```java
51
import org.apache.flink.table.api.EnvironmentSettings;
52
import org.apache.flink.table.api.TableEnvironment;
53
import org.apache.flink.table.catalog.hive.HiveCatalog;
54
55
// Create table environment
56
EnvironmentSettings settings = EnvironmentSettings
57
.newInstance()
58
.inBatchMode()
59
.build();
60
TableEnvironment tableEnv = TableEnvironment.create(settings);
61
62
// Create and register Hive catalog
63
String catalogName = "myhive";
64
String defaultDatabase = "mydatabase";
65
String hiveConfDir = "/opt/hive-conf";
66
String version = "2.3.9";
67
68
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, null, version);
69
tableEnv.registerCatalog("myhive", hive);
70
tableEnv.useCatalog("myhive");
71
72
// Query Hive tables
73
Table result = tableEnv.sqlQuery("SELECT * FROM mytable WHERE active = true");
74
result.execute().print();
75
```
76
77
### DataStream API Integration
78
79
```java
80
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
81
import org.apache.flink.connectors.hive.HiveSource;
82
import org.apache.flink.connectors.hive.HiveSourceBuilder;
83
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
84
85
// Create execution environment
86
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
87
88
// Build Hive source
89
JobConf jobConf = new JobConf();
90
jobConf.set("hive.metastore.uris", "thrift://localhost:9083");
91
92
HiveSource<RowData> source = new HiveSourceBuilder(
93
jobConf,
94
new Configuration(),
95
"2.3.9", // hiveVersion
96
"mydb",
97
"mytable",
98
Collections.emptyMap()
99
).buildWithDefaultBulkFormat();
100
101
// Create data stream
102
DataStream<RowData> stream = env
103
.fromSource(source, WatermarkStrategy.noWatermarks(), "hive-source");
104
105
stream.print();
106
env.execute("Hive Stream Processing");
107
```
108
109
## Architecture
110
111
The Flink Hive connector is built around several key architectural components:
112
113
- **Factory System**: Plugin-based factories (`HiveDynamicTableFactory`, `HiveCatalogFactory`) for dynamic registration and configuration
114
- **Source/Sink Framework**: Unified data access layer with streaming and batch support through `HiveSource` and `HiveTableSink`
115
- **Catalog Integration**: Full Hive metastore integration via `HiveCatalog` for schema discovery and table management
116
- **Function Module**: Native Hive function support through `HiveModule` for UDF compatibility
117
- **Partition Management**: Intelligent partition handling with pruning and dynamic discovery capabilities
118
- **Type System**: Seamless type mapping between Hive and Flink data types with full serialization support
119
120
## Capabilities
121
122
### Table Source and Sink Operations
123
124
Core functionality for reading from and writing to Hive tables using Flink's Table API and DataStream API. Supports both batch and streaming modes with comprehensive partition handling.
125
126
```java { .api }
127
// Main source builder
128
public class HiveSourceBuilder {
129
public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf,
130
String hiveVersion, String dbName,
131
String tableName, Map<String, String> tableOptions);
132
public HiveSource<RowData> buildWithDefaultBulkFormat();
133
public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
134
public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
135
public HiveSourceBuilder setLimit(Long limit);
136
public HiveSourceBuilder setProjectedFields(int[] projectedFields);
137
}
138
139
// Dynamic table source
140
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,
141
SupportsProjectionPushDown, SupportsLimitPushDown {
142
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
143
public Result applyPartitions(List<Map<String, String>> remainingPartitions);
144
public Result applyProjection(int[][] projectedFields, DataType producedDataType);
145
public Result applyLimit(long limit);
146
}
147
148
// Dynamic table sink
149
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
150
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
151
public Result applyStaticPartition(Map<String, String> partition);
152
public Result applyOverwrite(boolean overwrite);
153
}
154
```
155
156
[Table Source and Sink Operations](./table-source-sink.md)
157
158
### Catalog Integration
159
160
Complete Hive metastore integration providing schema discovery, table management, and metadata operations. Enables transparent access to existing Hive data warehouses.
161
162
```java { .api }
163
public class HiveCatalog extends AbstractCatalog {
164
public HiveCatalog(String catalogName, String defaultDatabase,
165
String hiveConfDir, String hadoopConfDir, String hiveVersion);
166
public List<String> listDatabases() throws DatabaseNotExistException, CatalogException;
167
public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException;
168
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException;
169
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
170
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
171
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath);
172
}
173
174
public class HiveCatalogFactory implements CatalogFactory {
175
public Catalog createCatalog(Context context);
176
public String factoryIdentifier();
177
}
178
```
179
180
[Catalog Integration](./catalog-integration.md)
181
182
### Function Module Integration
183
184
Hive built-in function support enabling use of Hive UDFs within Flink SQL queries. Provides seamless function compatibility and registration.
185
186
```java { .api }
187
public class HiveModule implements Module {
188
public HiveModule();
189
public HiveModule(String hiveVersion);
190
public Set<String> listFunctions();
191
public Optional<FunctionDefinition> getFunctionDefinition(String name);
192
}
193
194
```
195
196
[Function Module Integration](./function-module.md)
197
198
### Configuration and Options
199
200
Comprehensive configuration system for customizing connector behavior, performance tuning, and environment-specific settings.
201
202
```java { .api }
203
public class HiveOptions {
204
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
205
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
206
public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
207
public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
208
public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
209
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
210
public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;
211
}
212
213
public class HiveCatalogFactoryOptions {
214
public static final ConfigOption<String> DEFAULT_DATABASE;
215
public static final ConfigOption<String> HIVE_CONF_DIR;
216
public static final ConfigOption<String> HIVE_VERSION;
217
public static final ConfigOption<String> HADOOP_CONF_DIR;
218
}
219
```
220
221
[Configuration and Options](./configuration.md)
222
223
### Partition Management and Utilities
224
225
Advanced partition handling utilities for efficient data access, partition pruning, and metadata management in partitioned Hive tables.
226
227
```java { .api }
228
public class HiveTablePartition implements Serializable {
229
public static HiveTablePartition ofTable(StorageDescriptor storageDescriptor,
230
Map<String, String> tableParameters);
231
public static HiveTablePartition ofPartition(StorageDescriptor storageDescriptor,
232
Map<String, String> partitionSpec,
233
Map<String, String> tableParameters);
234
public StorageDescriptor getStorageDescriptor();
235
public Map<String, String> getPartitionSpec();
236
}
237
238
public class HivePartitionUtils {
239
public static List<HiveTablePartition> getAllPartitions(JobConf jobConf, String catalogName,
240
ObjectPath tablePath, List<String> partitionColNames);
241
public static byte[] serializeHiveTablePartition(List<HiveTablePartition> partitions);
242
public static List<HiveTablePartition> deserializeHiveTablePartition(byte[] bytes, ClassLoader classLoader);
243
}
244
```
245
246
[Partition Management and Utilities](./partition-management.md)
247
248
## Types
249
250
```java { .api }
251
// Core exception type
252
public class FlinkHiveException extends RuntimeException {
253
public FlinkHiveException(String message);
254
public FlinkHiveException(Throwable cause);
255
public FlinkHiveException(String message, Throwable cause);
256
}
257
258
// Configuration wrapper
259
public class JobConfWrapper implements Serializable {
260
public JobConfWrapper(JobConf jobConf);
261
public JobConf conf();
262
}
263
264
```