Apache Flink SQL connector for Apache Hive 2.3.6 providing integration between Flink and Hive for reading/writing Hive tables and using Hive Metastore as a catalog.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6_2-12@1.15.00
# Apache Flink Hive Connector 2.3.6
1
2
Apache Flink SQL connector for Apache Hive 2.3.6 that provides comprehensive integration between Flink and Hive, enabling both batch and streaming access to Hive tables, Hive catalog integration, and Hive function support. This connector serves as a bridge between Flink's unified stream and batch processing capabilities and the Hive data warehouse ecosystem.
3
4
## Package Information
5
6
- **Package Name**: flink-sql-connector-hive-2.3.6_2.12
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to Maven dependencies:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.flink</groupId>
14
<artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
15
<version>1.15.4</version>
16
</dependency>
17
```
18
19
For runtime classpath (if not using uber JAR):
20
```bash
21
# Download and place in Flink lib directory
22
cp flink-sql-connector-hive-2.3.6_2.12-1.15.4.jar $FLINK_HOME/lib/
23
```
24
25
## Core Imports
26
27
```java
28
// Catalog integration
29
import org.apache.flink.table.catalog.hive.HiveCatalog;
30
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
31
32
// DataStream API source
33
import org.apache.flink.connectors.hive.HiveSource;
34
import org.apache.flink.connectors.hive.HiveSourceBuilder;
35
import org.apache.flink.connectors.hive.HiveTablePartition;
36
37
// Table API integration
38
import org.apache.flink.connectors.hive.HiveTableSource;
39
import org.apache.flink.connectors.hive.HiveTableSink;
40
import org.apache.flink.connectors.hive.HiveLookupTableSource;
41
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
42
43
// Module for Hive functions
44
import org.apache.flink.table.module.hive.HiveModule;
45
import org.apache.flink.table.module.hive.HiveModuleOptions;
46
47
// Configuration options
48
import org.apache.flink.connectors.hive.HiveOptions;
49
50
// Exception handling
51
import org.apache.flink.connectors.hive.FlinkHiveException;
52
53
// Annotations for nullability
54
import javax.annotation.Nullable;
55
import javax.annotation.Nonnull;
56
```
57
58
## Basic Usage
59
60
### Catalog Integration
61
62
```java
63
import org.apache.flink.table.api.EnvironmentSettings;
64
import org.apache.flink.table.api.TableEnvironment;
65
import org.apache.flink.table.catalog.hive.HiveCatalog;
66
67
// Create table environment
68
EnvironmentSettings settings = EnvironmentSettings
69
.newInstance()
70
.inBatchMode()
71
.build();
72
TableEnvironment tableEnv = TableEnvironment.create(settings);
73
74
// Create and register Hive catalog
75
String catalogName = "myhive";
76
String defaultDatabase = "default";
77
String hiveConfDir = "/opt/hive-conf";
78
String hadoopConfDir = "/opt/hadoop-conf";
79
String hiveVersion = "2.3.6";
80
81
HiveCatalog hive = new HiveCatalog(
82
catalogName, defaultDatabase, hiveConfDir, hadoopConfDir, hiveVersion);
83
tableEnv.registerCatalog(catalogName, hive);
84
tableEnv.useCatalog(catalogName);
85
86
// Use Hive tables with SQL
87
tableEnv.executeSql("SELECT * FROM my_hive_table").print();
88
```
89
90
### DataStream API Usage
91
92
```java
93
import org.apache.flink.connectors.hive.HiveSourceBuilder;
94
import org.apache.flink.connectors.hive.HiveTablePartition;
95
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
96
import org.apache.hadoop.conf.Configuration;
97
98
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
99
100
// Create job configuration for Hive
101
Configuration jobConf = new Configuration();
102
// Set Hive metastore URI and other configs
103
jobConf.set("hive.metastore.uris", "thrift://localhost:9083");
104
105
// Build Hive source
106
HiveSource<RowData> source = new HiveSourceBuilder(
107
jobConf,
108
env.getConfiguration(),
109
"2.3.6",
110
"default",
111
"my_table",
112
Collections.emptyMap()
113
).buildWithDefaultBulkFormat();
114
115
// Add source to stream
116
env.fromSource(source, WatermarkStrategy.noWatermarks(), "hive-source")
117
.print();
118
119
env.execute("Hive Stream Job");
120
```
121
122
### Module Registration for Hive Functions
123
124
```java
125
import org.apache.flink.table.module.hive.HiveModule;
126
127
// Register Hive module to access Hive built-in functions
128
tableEnv.loadModule("hive", new HiveModule("2.3.6"));
129
130
// Use Hive functions in SQL
131
tableEnv.executeSql("SELECT concat('Hello', ' ', 'World')").print();
132
```
133
134
## Architecture
135
136
The Flink Hive Connector is built around several key components:
137
138
- **Catalog Integration**: `HiveCatalog` provides full metastore integration for persistent table metadata
139
- **Source/Sink Components**: `HiveSource`, `HiveTableSource`, and `HiveTableSink` for data access
140
- **Module System**: `HiveModule` enables access to Hive built-in functions within Flink SQL
141
- **Configuration Management**: Comprehensive options for tuning performance and behavior
142
- **Version Abstraction**: Shim system supporting multiple Hive versions with consistent API
143
- **File Format Support**: Works with Parquet, ORC, text files, and other Hadoop-compatible formats
144
145
## Capabilities
146
147
### Catalog Integration
148
149
Complete Hive metastore integration allowing Flink to use Hive as a persistent catalog for storing table definitions, schemas, and metadata across sessions.
150
151
```java { .api }
152
HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir,
153
@Nullable String hadoopConfDir, @Nullable String hiveVersion);
154
155
// Database operations
156
void createDatabase(CatalogDatabase database, boolean ignoreIfExists);
157
CatalogDatabase getDatabase(String databaseName);
158
List<String> listDatabases();
159
160
// Table operations
161
void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
162
CatalogBaseTable getTable(ObjectPath tablePath);
163
List<String> listTables(String databaseName);
164
```
165
166
[Catalog Integration](./catalog.md)
167
168
### DataStream Source
169
170
Low-level streaming and batch source for reading Hive tables directly in DataStream API programs with full control over parallelism, partitioning, and data formats.
171
172
```java { .api }
173
class HiveSourceBuilder {
174
HiveSourceBuilder(@Nonnull JobConf jobConf, @Nonnull ReadableConfig flinkConf, @Nullable String hiveVersion,
175
@Nonnull String dbName, @Nonnull String tableName, @Nonnull Map<String, String> tableOptions);
176
177
HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
178
HiveSourceBuilder setLimit(@Nullable Long limit);
179
HiveSourceBuilder setProjectedFields(int[] projectedFields);
180
181
HiveSource<RowData> buildWithDefaultBulkFormat();
182
<T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
183
}
184
185
class HiveTablePartition {
186
static HiveTablePartition ofTable(HiveConf hiveConf, @Nullable String hiveVersion,
187
String dbName, String tableName);
188
static HiveTablePartition ofPartition(HiveConf hiveConf, @Nullable String hiveVersion,
189
String dbName, String tableName,
190
LinkedHashMap<String, String> partitionSpec);
191
}
192
```
193
194
[DataStream Source](./datastream-source.md)
195
196
### Table API Integration
197
198
High-level Table API integration providing `HiveTableSource` and `HiveTableSink` for seamless SQL access to Hive tables with pushdown optimizations.
199
200
```java { .api }
201
// Created automatically via catalog registration
202
// Supports predicate pushdown, projection pushdown, partition pruning
203
interface SupportsPartitionPushDown {
204
Result applyPartitions(List<Map<String, String>> remainingPartitions);
205
}
206
207
interface SupportsProjectionPushDown {
208
boolean supportsNestedProjection();
209
void applyProjection(int[][] projectedFields);
210
}
211
212
interface SupportsLimitPushDown {
213
void applyLimit(long limit);
214
}
215
```
216
217
[Table API Integration](./table-api.md)
218
219
### Hive Functions
220
221
Module system integration enabling access to Hive built-in functions within Flink SQL, including string functions, date functions, and mathematical operations.
222
223
```java { .api }
224
class HiveModule implements Module {
225
HiveModule();
226
HiveModule(String hiveVersion);
227
228
Set<String> listFunctions();
229
Optional<FunctionDefinition> getFunctionDefinition(String name);
230
String getHiveVersion();
231
}
232
```
233
234
[Hive Functions](./hive-functions.md)
235
236
### Configuration Options
237
238
Comprehensive configuration system for tuning connector behavior, performance optimization, and streaming source configuration.
239
240
```java { .api }
241
class HiveOptions {
242
// Performance tuning
243
ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
244
ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;
245
246
// Streaming source options
247
ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
248
ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
249
ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET;
250
251
// Lookup join caching
252
ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
253
}
254
255
enum PartitionOrder {
256
CREATE_TIME, PARTITION_TIME, PARTITION_NAME
257
}
258
```
259
260
[Configuration Options](./configuration.md)
261
262
## Types
263
264
```java { .api }
265
class FlinkHiveException extends RuntimeException {
266
FlinkHiveException(String message);
267
FlinkHiveException(Throwable cause);
268
FlinkHiveException(String message, Throwable cause);
269
}
270
271
class HiveSourceSplit extends FileSourceSplit {
272
// Represents a split of Hive data for parallel processing
273
}
274
275
interface HiveFunction {
276
void setArgumentTypesAndConstants(Object[] constantArguments, DataType[] argTypes);
277
DataType getHiveResultType(Object[] constantArguments, DataType[] argTypes);
278
}
279
```