0
# Apache Flink SQL Connector for Hive 3.1.2
1
2
The Apache Flink SQL Connector for Hive 3.1.2 enables seamless integration between Apache Flink and Apache Hive 3.1.2, providing unified batch and stream processing capabilities with Hive tables. This connector allows Flink to read from and write to Hive tables, supports both partitioned and non-partitioned tables in streaming and batch modes, and includes comprehensive catalog integration for metadata management.
3
4
## Package Information
5
6
- **Package Name**: flink-sql-connector-hive-3.1.2_2.12
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-sql-connector-hive-3.1.2_2.12
11
- **Version**: 1.16.3
12
- **Installation**: Add Maven dependency in `pom.xml`:
13
14
```xml
15
<dependency>
16
<groupId>org.apache.flink</groupId>
17
<artifactId>flink-sql-connector-hive-3.1.2_2.12</artifactId>
18
<version>1.16.3</version>
19
</dependency>
20
```
21
22
## Core Imports
23
24
```java
25
// Factory classes for programmatic usage
26
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
27
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
28
import org.apache.flink.table.module.hive.HiveModuleFactory;
29
30
// Core connector classes
31
import org.apache.flink.connectors.hive.HiveTableSource;
32
import org.apache.flink.connectors.hive.HiveTableSink;
33
import org.apache.flink.connectors.hive.HiveSource;
34
import org.apache.flink.connectors.hive.HiveSourceBuilder;
35
36
// Catalog and module classes
37
import org.apache.flink.table.catalog.hive.HiveCatalog;
38
import org.apache.flink.table.module.hive.HiveModule;
39
40
// Configuration and options
41
import org.apache.flink.connectors.hive.HiveOptions;
42
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
43
```
44
45
## Basic Usage
46
47
### SQL DDL Usage (Most Common)
48
49
```sql
50
-- Create Hive catalog
51
CREATE CATALOG hive_catalog WITH (
52
'type' = 'hive',
53
'hive-conf-dir' = '/path/to/hive/conf',
54
'hive-version' = '3.1.2'
55
);
56
57
-- Use the Hive catalog
58
USE CATALOG hive_catalog;
59
USE default;
60
61
-- Read from existing Hive table
62
SELECT * FROM my_hive_table WHERE partition_date = '2023-01-01';
63
64
-- Create and write to new Hive table
65
CREATE TABLE new_hive_table (
66
id BIGINT,
67
name STRING,
68
event_time TIMESTAMP,
69
partition_date STRING
70
) PARTITIONED BY (partition_date)
71
STORED AS PARQUET
72
TBLPROPERTIES (
73
'sink.partition-commit.policy.kind' = 'metastore,success-file'
74
);
75
76
INSERT INTO new_hive_table SELECT id, name, event_time, '2023-01-01' FROM source_table;
77
```
78
79
### Programmatic Table API Usage
80
81
```java
82
import org.apache.flink.table.api.EnvironmentSettings;
83
import org.apache.flink.table.api.TableEnvironment;
84
import org.apache.flink.table.catalog.hive.HiveCatalog;
85
86
// Create table environment
87
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
88
TableEnvironment tableEnv = TableEnvironment.create(settings);
89
90
// Create Hive catalog
91
String catalogName = "hive_catalog";
92
String defaultDatabase = "default";
93
String hiveConfDir = "/path/to/hive/conf";
94
String hiveVersion = "3.1.2";
95
96
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, hiveVersion);
97
tableEnv.registerCatalog(catalogName, hive);
98
tableEnv.useCatalog(catalogName);
99
100
// Execute queries
101
tableEnv.executeSql("SELECT * FROM my_hive_table LIMIT 10").print();
102
```
103
104
### DataStream API Usage
105
106
```java
107
import org.apache.flink.connectors.hive.HiveSource;
108
import org.apache.flink.connectors.hive.HiveSourceBuilder;
109
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
110
import org.apache.flink.table.data.RowData;
111
112
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
113
114
HiveSource<RowData> hiveSource = new HiveSourceBuilder()
115
.setTableIdentifier(tableIdentifier)
116
.setHiveConfiguration(hiveConf)
117
.build();
118
119
env.fromSource(hiveSource, WatermarkStrategy.noWatermarks(), "Hive Source")
120
.print();
121
122
env.execute("Hive Streaming Job");
123
```
124
125
## Architecture
126
127
The Flink Hive Connector is built around several key components:
128
129
- **Factory System**: SPI-based factory classes for automatic connector discovery and configuration
130
- **Table Connectors**: Source and sink implementations supporting both batch and streaming modes
131
- **Catalog Integration**: HiveCatalog providing unified metadata management across Flink and Hive
132
- **Module System**: HiveModule enabling access to Hive built-in functions within Flink SQL
133
- **Configuration Management**: Comprehensive options for fine-tuning performance and behavior
134
- **Shaded Dependencies**: Self-contained JAR with Hive 3.1.2 dependencies to avoid conflicts
135
136
The connector supports both legacy and modern Flink table interfaces, enabling seamless migration from older versions while providing access to the latest features.
137
138
## Capabilities
139
140
### Factory Registration
141
142
Factory classes that enable automatic connector discovery through Flink's Service Provider Interface (SPI). These factories handle connector instantiation and configuration validation.
143
144
```java { .api }
145
public class HiveCatalogFactory implements CatalogFactory {
146
public String factoryIdentifier(): "hive";
147
}
148
149
public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
150
public String factoryIdentifier(): "hive";
151
}
152
153
public class HiveModuleFactory implements ModuleFactory {
154
public String factoryIdentifier(): "hive";
155
}
156
```
157
158
[Factory Registration](./factory-registration.md)
159
160
### Table Sources and Sinks
161
162
Core table connector implementations providing read and write capabilities for Hive tables with support for batch processing, streaming ingestion, partition management, and performance optimizations.
163
164
```java { .api }
165
public class HiveTableSource implements ScanTableSource,
166
SupportsPartitionPushDown, SupportsProjectionPushDown, SupportsLimitPushDown {
167
// Batch and streaming source for Hive tables
168
}
169
170
public class HiveTableSink implements DynamicTableSink {
171
// Sink for writing to Hive tables with partition support
172
}
173
174
public final class HiveSource<T> implements Source<T, HiveSplitEnumeratorState, HiveSourceSplit> {
175
// Low-level source using new Source API
176
}
177
```
178
179
[Table Sources and Sinks](./table-sources-sinks.md)
180
181
### Catalog Integration
182
183
Hive catalog implementation that provides unified metadata management, enabling Flink to access Hive databases, tables, partitions, and functions through the Hive metastore.
184
185
```java { .api }
186
public class HiveCatalog extends AbstractCatalog {
187
public HiveCatalog(String catalogName, String defaultDatabase,
188
String hiveConfDir, String hiveVersion);
189
190
public static boolean isHiveTable(Map<String, String> properties);
191
}
192
```
193
194
[Catalog Integration](./catalog-integration.md)
195
196
### Function Module
197
198
Module system providing access to Hive built-in functions within Flink SQL, enabling compatibility with existing Hive UDFs and maintaining function behavior consistency.
199
200
```java { .api }
201
public class HiveModule implements Module {
202
public HiveModule(String hiveVersion);
203
}
204
```
205
206
[Function Module](./function-module.md)
207
208
### Configuration Management
209
210
Comprehensive configuration options for controlling connector behavior, performance tuning, and feature enablement across reading, writing, streaming, and lookup join scenarios.
211
212
```java { .api }
213
public class HiveOptions {
214
// Reading options
215
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
216
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;
217
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
218
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;
219
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM;
220
public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_SPLIT_MAX_BYTES;
221
public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_FILE_OPEN_COST;
222
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM;
223
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_DYNAMIC_GROUPING_ENABLED;
224
225
// Writing options
226
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
227
public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;
228
public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_CLASS;
229
public static final ConfigOption<String> SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
230
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE;
231
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_THREAD_NUM;
232
233
// Streaming options
234
public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
235
public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE;
236
public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
237
public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET;
238
public static final ConfigOption<PartitionOrder> STREAMING_SOURCE_PARTITION_ORDER;
239
240
// Lookup join options
241
public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
242
}
243
```
244
245
[Configuration Management](./configuration-management.md)
246
247
### Lookup Joins
248
249
Specialized lookup table source for dimension table joins, providing caching capabilities and optimized access patterns for real-time data enrichment scenarios.
250
251
```java { .api }
252
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
253
// Lookup join capabilities for dimension tables
254
}
255
```
256
257
[Lookup Joins](./lookup-joins.md)
258
259
## Types
260
261
### Core Interfaces and Classes
262
263
```java { .api }
264
// Exception handling
265
public class FlinkHiveException extends RuntimeException {
266
public FlinkHiveException(String message);
267
public FlinkHiveException(Throwable cause);
268
public FlinkHiveException(String message, Throwable cause);
269
}
270
271
// Partition representation
272
public class HiveTablePartition {
273
// Constructors
274
public HiveTablePartition(StorageDescriptor storageDescriptor, Properties tableProps);
275
public HiveTablePartition(StorageDescriptor storageDescriptor,
276
Map<String, String> partitionSpec, Properties tableProps);
277
278
// Instance methods
279
public StorageDescriptor getStorageDescriptor();
280
public Map<String, String> getPartitionSpec();
281
public Properties getTableProps();
282
public boolean equals(Object o);
283
public int hashCode();
284
public String toString();
285
286
// Static factory methods
287
public static HiveTablePartition ofTable(HiveConf hiveConf, String hiveVersion,
288
String dbName, String tableName);
289
public static HiveTablePartition ofPartition(HiveConf hiveConf, String hiveVersion,
290
String dbName, String tableName,
291
LinkedHashMap<String, String> partitionSpec);
292
}
293
294
// Configuration wrapper
295
public class JobConfWrapper implements Serializable {
296
public JobConfWrapper(JobConf jobConf);
297
public JobConf conf();
298
}
299
300
// Source implementation with generic type parameter
301
public final class HiveSource<T> implements Source<T, HiveSplitEnumeratorState, HiveSourceSplit> {
302
// Source interface methods
303
public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();
304
public SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>> getEnumeratorCheckpointSerializer();
305
public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> createEnumerator(
306
SplitEnumeratorContext<HiveSourceSplit> enumContext);
307
public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> restoreEnumerator(
308
SplitEnumeratorContext<HiveSourceSplit> enumContext,
309
PendingSplitsCheckpoint<HiveSourceSplit> checkpoint);
310
}
311
312
// Table source with interface implementations
313
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,
314
SupportsProjectionPushDown, SupportsLimitPushDown, SupportsStatisticReport, SupportsDynamicFiltering {
315
316
public HiveTableSource(JobConf jobConf, ReadableConfig flinkConf,
317
ObjectPath tablePath, CatalogTable catalogTable);
318
319
// ScanTableSource methods
320
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext);
321
public ChangelogMode getChangelogMode();
322
public DynamicTableSource copy();
323
public String asSummaryString();
324
325
// Optimization interface methods
326
public void applyLimit(long limit);
327
public Optional<List<Map<String, String>>> listPartitions();
328
public void applyPartitions(List<Map<String, String>> remainingPartitions);
329
public List<String> listAcceptedFilterFields();
330
public void applyDynamicFiltering(List<String> candidateFilterFields);
331
public boolean supportsNestedProjection();
332
public void applyProjection(int[][] projectedFields, DataType producedDataType);
333
public TableStats reportStatistics();
334
}
335
336
// Table sink with interface implementations
337
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
338
public HiveTableSink(ReadableConfig flinkConf, JobConf jobConf,
339
ObjectIdentifier identifier, CatalogTable table, Integer configuredParallelism);
340
341
// DynamicTableSink methods
342
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
343
public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
344
public DynamicTableSink copy();
345
public String asSummaryString();
346
347
// Partitioning and overwrite support
348
public boolean requiresPartitionGrouping(boolean supportsGrouping);
349
public void applyStaticPartition(Map<String, String> partition);
350
public void applyOverwrite(boolean overwrite);
351
}
352
353
// Lookup table source extending HiveTableSource
354
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
355
public HiveLookupTableSource(JobConf jobConf, ReadableConfig flinkConf,
356
ObjectPath tablePath, CatalogTable catalogTable);
357
358
// LookupTableSource methods
359
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
360
public DynamicTableSource copy();
361
}
362
363
// Builder pattern
364
public class HiveSourceBuilder {
365
// Constructors
366
public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf, String hiveVersion,
367
String dbName, String tableName, Map<String, String> tableOptions);
368
public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf, ObjectPath tablePath,
369
String hiveVersion, CatalogTable catalogTable);
370
371
// Configuration methods
372
public HiveSourceBuilder setProjectedFields(int[] projectedFields);
373
public HiveSourceBuilder setLimit(Long limit);
374
public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
375
public HiveSourceBuilder setDynamicFilterPartitionKeys(List<String> dynamicFilterPartitionKeys);
376
377
// Build methods
378
public HiveSource<RowData> buildWithDefaultBulkFormat();
379
public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
380
}
381
```
382
383
### Enums and Constants
384
385
```java { .api }
386
// Partition ordering strategies
387
public enum PartitionOrder implements DescribedEnum {
388
CREATE_TIME("create-time", "Order partitions by creation time"),
389
PARTITION_TIME("partition-time", "Order partitions by partition time"),
390
PARTITION_NAME("partition-name", "Order partitions by partition name");
391
392
public String toString();
393
public InlineElement getDescription();
394
}
395
396
// Version constants
397
public class HiveShimLoader {
398
public static final String HIVE_VERSION_V3_1_2 = "3.1.2";
399
public static String getHiveVersion();
400
}
401
402
// Catalog configuration constants
403
public class HiveCatalogConfig {
404
public static final String DEFAULT_LIST_COLUMN_TYPES_SEPARATOR = ":";
405
public static final String COMMENT = "comment";
406
public static final String PARTITION_LOCATION = "partition.location";
407
}
408
```