0
# Apache Flink SQL Connector for Hive 2.3.6
1
2
Apache Flink SQL connector for Apache Hive 2.3.6 enables seamless integration between Apache Flink's streaming and batch processing capabilities and Hive's data warehousing infrastructure. This connector provides comprehensive support for reading from and writing to Hive tables using Flink SQL queries, complete with catalog integration and Hive function support.
3
4
## Package Information
5
6
- **Package Name**: flink-sql-connector-hive-2.3.6_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to Maven dependencies:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-sql-connector-hive-2.3.6_2.11</artifactId>
14
<version>1.14.6</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```java
21
import org.apache.flink.table.catalog.hive.HiveCatalog;
22
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
23
import org.apache.flink.connectors.hive.HiveTableSource;
24
import org.apache.flink.connectors.hive.HiveTableSink;
25
import org.apache.flink.table.module.hive.HiveModule;
26
```
27
28
## Basic Usage
29
30
```java
31
import org.apache.flink.table.api.TableEnvironment;
32
import org.apache.flink.table.catalog.hive.HiveCatalog;
33
import org.apache.flink.table.module.hive.HiveModule;
34
35
// Create Hive catalog
36
HiveCatalog hiveCatalog = new HiveCatalog(
37
"hive_catalog", // catalog name
38
"default", // default database
39
"/path/to/hive-site.xml", // hive conf dir
40
"/path/to/hadoop/conf", // hadoop conf dir
41
"2.3.6" // hive version
42
);
43
44
// Register catalog with table environment
45
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
46
tableEnv.registerCatalog("hive_catalog", hiveCatalog);
47
tableEnv.useCatalog("hive_catalog");
48
49
// Load Hive module for UDF support
50
tableEnv.loadModule("hive", new HiveModule("2.3.6"));
51
52
// Use Hive tables with SQL
53
tableEnv.executeSql("SELECT * FROM hive_table WHERE partition_key = 'value'");
54
```
55
56
## Architecture
57
58
The Apache Flink Hive connector is built around several key components:
59
60
- **Catalog Integration**: `HiveCatalog` provides full integration with Hive metastore for database and table metadata management
61
- **Table Source/Sink**: `HiveTableSource` and `HiveTableSink` handle data reading and writing with support for various file formats
62
- **Dynamic Table Factory**: `HiveDynamicTableFactory` creates table sources and sinks based on catalog metadata
63
- **Function Module**: `HiveModule` provides access to Hive built-in functions (UDF/UDAF/UDTF)
64
- **Version Compatibility**: Multiple `HiveShim` implementations ensure compatibility across Hive versions 1.0.0 to 3.1.2
65
- **Parser Support**: `HiveParser` enables Hive dialect SQL parsing for enhanced compatibility
66
67
## Capabilities
68
69
### Catalog Operations
70
71
Complete Hive metastore integration for managing databases, tables, partitions, and metadata. Supports all standard catalog operations with full compatibility.
72
73
```java { .api }
74
public class HiveCatalog extends AbstractCatalog {
75
public HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hadoopConfDir, String hiveVersion);
76
public void open() throws CatalogException;
77
public void close() throws CatalogException;
78
public List<String> listDatabases() throws CatalogException;
79
public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException;
80
public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException;
81
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException;
82
}
83
```
84
85
[Catalog Operations](./catalog-operations.md)
86
87
### Table Sources
88
89
Reading data from Hive tables with support for both batch and streaming modes, partition pruning, projection pushdown, and lookup joins.
90
91
```java { .api }
92
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown, SupportsProjectionPushDown, SupportsLimitPushDown {
93
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
94
public void applyLimit(long limit);
95
public boolean supportsNestedProjection();
96
public void applyProjection(int[][] projectedFields);
97
public void applyPartitions(List<Map<String, String>> remainingPartitions);
98
}
99
100
public class HiveLookupTableSource implements LookupTableSource, ScanTableSource {
101
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
102
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
103
}
104
```
105
106
[Table Sources](./table-sources.md)
107
108
### Table Sinks
109
110
Writing data to Hive tables with support for partitioning, multiple file formats, and streaming ingestion with compaction.
111
112
```java { .api }
113
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
114
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
115
public DynamicTableSink applyStaticPartition(Map<String, String> partition);
116
public boolean requiresPartitionGrouping(boolean supportsGrouping);
117
}
118
```
119
120
[Table Sinks](./table-sinks.md)
121
122
### Hive Functions
123
124
Access to Hive built-in functions including UDF, UDAF, and UDTF through the HiveModule system with version-specific compatibility.
125
126
```java { .api }
127
public class HiveModule implements Module {
128
public HiveModule(String hiveVersion);
129
public Set<String> listFunctions();
130
public Optional<FunctionDefinition> getFunctionDefinition(String name);
131
}
132
133
public interface HiveFunction {
134
// Marker interface for Hive function wrappers
135
}
136
137
public class HiveGenericUDF extends ScalarFunction implements HiveFunction {
138
public HiveGenericUDF(HiveFunctionWrapper<GenericUDF> hiveFunctionWrapper, HiveShim hiveShim);
139
}
140
```
141
142
[Hive Functions](./hive-functions.md)
143
144
### Source API
145
146
New Source API implementation for Hive tables providing enhanced control over split enumeration and reading with support for continuous partition monitoring.
147
148
```java { .api }
149
public class HiveSource<T> implements Source<T, HiveSourceSplit, ContinuousHivePendingSplitsCheckpoint> {
150
public SourceReader<T, HiveSourceSplit> createReader(SourceReaderContext readerContext);
151
public SplitEnumerator<HiveSourceSplit, ContinuousHivePendingSplitsCheckpoint> createEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext);
152
public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();
153
public SimpleVersionedSerializer<ContinuousHivePendingSplitsCheckpoint> getEnumeratorCheckpointSerializer();
154
}
155
```
156
157
[Source API](./source-api.md)
158
159
### Configuration
160
161
Configuration options and factory classes for setting up Hive integration with customizable behavior for performance and compatibility.
162
163
```java { .api }
164
public class HiveOptions {
165
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM;
166
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;
167
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED;
168
}
169
170
public class HiveCatalogFactory implements CatalogFactory {
171
public String factoryIdentifier();
172
public Catalog createCatalog(Context context);
173
public Set<ConfigOption<?>> requiredOptions();
174
public Set<ConfigOption<?>> optionalOptions();
175
}
176
```
177
178
[Configuration](./configuration.md)
179
180
## Types
181
182
### Core Types
183
184
```java { .api }
185
public class HiveTablePartition {
186
public HiveTablePartition(StorageDescriptor storageDescriptor, Map<String, String> partitionSpec);
187
public StorageDescriptor getStorageDescriptor();
188
public Map<String, String> getPartitionSpec();
189
}
190
191
public class FlinkHiveException extends RuntimeException {
192
public FlinkHiveException(String message);
193
public FlinkHiveException(String message, Throwable cause);
194
}
195
196
public interface HiveShim {
197
// Version-specific Hive compatibility interface
198
}
199
```
200
201
### Source/Sink Types
202
203
```java { .api }
204
public class HiveSourceSplit implements SourceSplit {
205
public String splitId();
206
// Split information for Hive table reading
207
}
208
209
public class ContinuousHivePendingSplitsCheckpoint {
210
// Checkpoint information for continuous Hive monitoring
211
}
212
```