0
# Configuration Management
1
2
Comprehensive configuration system providing fine-grained control over connector behavior, performance tuning, and feature enablement. The configuration options cover reading, writing, streaming, lookup joins, and advanced optimization scenarios.
3
4
## Capabilities
5
6
### Core Configuration Options
7
8
Central configuration class containing all Hive connector-specific options for controlling behavior and performance.
9
10
```java { .api }
11
/**
12
* Configuration options for Hive connector behavior and performance tuning
13
* All options can be set in Flink configuration or as table properties
14
*/
15
public class HiveOptions {
16
17
// Reading Configuration Options
18
19
/**
20
* Whether to fallback to Hadoop MapReduce RecordReader for file reading
21
* Default: false (use native vectorized readers when possible)
22
*/
23
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
24
25
/**
26
* Whether to read files in subdirectories of partitions
27
* Default: false
28
*/
29
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;
30
31
/**
32
* Whether to infer source parallelism based on number of files/splits
33
* Default: true
34
*/
35
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
36
37
/**
38
* Maximum size of a single split for reading
39
* Default: 128MB
40
*/
41
public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_SPLIT_MAX_SIZE;
42
43
/**
44
* Estimated cost of opening a file (used for split calculation)
45
* Default: 4MB
46
*/
47
public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_FILE_OPEN_COST;
48
49
// Writing Configuration Options
50
51
/**
52
* Whether to fallback to Hadoop MapReduce OutputFormat for writing
53
* Default: false (use native writers when possible)
54
*/
55
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
56
57
/**
58
* Whether to sort records by dynamic partition columns before writing
59
* Default: false
60
*/
61
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_SORT_BY_DYNAMIC_PARTITION_ENABLE;
62
63
/**
64
* Whether to automatically gather table statistics after writing
65
* Default: false
66
*/
67
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE;
68
69
/**
70
* Partition commit policy for determining when partitions are ready
71
* Options: "metastore", "success-file", "metastore,success-file"
72
* Default: (not set, partitions committed immediately)
73
*/
74
public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;
75
76
// Streaming Configuration Options
77
78
/**
79
* Whether to enable streaming source mode for continuous partition monitoring
80
* Default: false
81
*/
82
public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
83
84
/**
85
* Which partitions to include in streaming mode
86
* Options: "all" (all partitions), "latest" (only latest partition)
87
* Default: "all"
88
*/
89
public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE;
90
91
/**
92
* Interval for monitoring new partitions in streaming mode
93
* Default: 1 minute
94
*/
95
public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
96
97
/**
98
* Starting point for consuming partitions in streaming mode
99
* Format: "yyyy-MM-dd HH:mm:ss" or partition name pattern
100
* Default: (consume from latest available partition)
101
*/
102
public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET;
103
104
/**
105
* Ordering strategy for processing partitions in streaming mode
106
*/
107
public static final ConfigOption<PartitionOrder> STREAMING_SOURCE_PARTITION_ORDER;
108
109
// Lookup Join Configuration Options
110
111
/**
112
* Time-to-live for cached lookup results
113
* Default: 60 minutes
114
*/
115
public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
116
117
/**
118
* Maximum number of entries in lookup cache
119
* Default: 10000
120
*/
121
public static final ConfigOption<Integer> LOOKUP_JOIN_CACHE_MAX_SIZE;
122
123
// Partition Ordering Enum
124
125
/**
126
* Enumeration for partition ordering strategies in streaming mode
127
*/
128
public enum PartitionOrder {
129
/** Order by partition creation time in Hive metastore */
130
CREATE_TIME,
131
/** Order by partition time extracted from partition name */
132
PARTITION_TIME,
133
/** Order by partition name alphabetically */
134
PARTITION_NAME
135
}
136
}
137
```
138
139
### Catalog Configuration Options
140
141
Configuration options specific to HiveCatalog creation and behavior.
142
143
```java { .api }
144
/**
145
* Configuration options for HiveCatalog factory
146
*/
147
public class HiveCatalogFactoryOptions {
148
149
/**
150
* Path to directory containing hive-site.xml and other Hive configuration files
151
* Required for catalog creation
152
*/
153
public static final ConfigOption<String> HIVE_CONF_DIR;
154
155
/**
156
* Hive version for compatibility and feature support
157
* Default: "3.1.2"
158
*/
159
public static final ConfigOption<String> HIVE_VERSION;
160
161
/**
162
* Path to directory containing Hadoop configuration files (core-site.xml, hdfs-site.xml)
163
* Optional, but recommended for HDFS and other Hadoop filesystem access
164
*/
165
public static final ConfigOption<String> HADOOP_CONF_DIR;
166
167
/**
168
* Default database name to use when none is specified
169
* Default: "default"
170
*/
171
public static final ConfigOption<String> DEFAULT_DATABASE;
172
}
173
```
174
175
### HiveServer2 Endpoint Configuration
176
177
Configuration options for HiveServer2-compatible endpoint in Flink SQL Gateway.
178
179
```java { .api }
180
/**
181
* Configuration options for HiveServer2 endpoint
182
*/
183
public class HiveServer2EndpointConfigOptions {
184
185
// Server Configuration
186
187
/**
188
* Host address for Thrift server
189
* Default: "localhost"
190
*/
191
public static final ConfigOption<String> THRIFT_HOST;
192
193
/**
194
* Port number for Thrift server
195
* Default: 10000
196
*/
197
public static final ConfigOption<Integer> THRIFT_PORT;
198
199
/**
200
* Minimum number of worker threads for handling client connections
201
* Default: 5
202
*/
203
public static final ConfigOption<Integer> THRIFT_WORKER_THREADS_MIN;
204
205
/**
206
* Maximum number of worker threads for handling client connections
207
* Default: 500
208
*/
209
public static final ConfigOption<Integer> THRIFT_WORKER_THREADS_MAX;
210
211
/**
212
* Maximum message size for Thrift communication
213
* Default: 104857600 (100MB)
214
*/
215
public static final ConfigOption<MemorySize> THRIFT_MAX_MESSAGE_SIZE;
216
217
// Session Configuration
218
219
/**
220
* Default catalog name for new sessions
221
* Default: "default_catalog"
222
*/
223
public static final ConfigOption<String> CATALOG_DEFAULT_NAME;
224
225
/**
226
* Default database name for new sessions
227
* Default: "default"
228
*/
229
public static final ConfigOption<String> CATALOG_DEFAULT_DATABASE;
230
231
/**
232
* Module configuration for built-in function access
233
*/
234
public static final ConfigOption<String> MODULE_NAME;
235
}
236
```
237
238
## Configuration Usage Patterns
239
240
### SQL DDL Configuration
241
242
```sql
243
-- Table-level configuration
244
CREATE TABLE hive_streaming_source (
245
id BIGINT,
246
data STRING,
247
event_time TIMESTAMP(3),
248
partition_hour STRING
249
) PARTITIONED BY (partition_hour)
250
WITH (
251
'connector' = 'hive',
252
253
-- Streaming configuration
254
'streaming-source.enable' = 'true',
255
'streaming-source.partition.include' = 'latest',
256
'streaming-source.monitor-interval' = '5 min',
257
'streaming-source.consume-start-offset' = '2023-01-01 00:00:00',
258
'streaming-source.partition-order' = 'CREATE_TIME',
259
260
-- Reading optimization
261
'table.exec.hive.infer-source-parallelism' = 'true',
262
'table.exec.hive.split-max-size' = '256MB',
263
264
-- Lookup join caching
265
'lookup.join.cache.ttl' = '30 min',
266
'lookup.join.cache.max-size' = '5000'
267
);
268
269
-- Sink table configuration
270
CREATE TABLE hive_sink (
271
id BIGINT,
272
processed_data STRING,
273
partition_date STRING
274
) PARTITIONED BY (partition_date)
275
WITH (
276
'connector' = 'hive',
277
278
-- Writing configuration
279
'sink.partition-commit.policy.kind' = 'metastore,success-file',
280
'table.exec.hive.sink.sort-by-dynamic-partition.enable' = 'true',
281
'table.exec.hive.sink.statistic-auto-gather.enable' = 'true',
282
283
-- Performance tuning
284
'table.exec.hive.fallback-mapred-writer' = 'false'
285
);
286
287
-- Catalog configuration
288
CREATE CATALOG production_hive WITH (
289
'type' = 'hive',
290
'hive-conf-dir' = '/opt/hive/conf',
291
'hadoop-conf-dir' = '/opt/hadoop/conf',
292
'hive-version' = '3.1.2',
293
'default-database' = 'analytics'
294
);
295
```
296
297
### Programmatic Configuration
298
299
```java
300
// Flink configuration for global settings
301
Configuration flinkConfig = new Configuration();
302
303
// Reading optimization
304
flinkConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
305
flinkConfig.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(256));
306
flinkConfig.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(8));
307
308
// Writing optimization
309
flinkConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
310
flinkConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_SINK_SORT_BY_DYNAMIC_PARTITION_ENABLE, true);
311
312
// Streaming configuration
313
flinkConfig.setBoolean(HiveOptions.STREAMING_SOURCE_ENABLE, true);
314
flinkConfig.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "latest");
315
flinkConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(5));
316
317
// Create environment with configuration
318
EnvironmentSettings settings = EnvironmentSettings.newInstance()
319
.withConfiguration(flinkConfig)
320
.build();
321
TableEnvironment tableEnv = TableEnvironment.create(settings);
322
```
323
324
### Performance Tuning Guidelines
325
326
#### Reading Performance
327
328
```java
329
// Optimize for large files with few splits
330
config.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(512));
331
config.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(16));
332
333
// Optimize for many small files
334
config.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(64));
335
config.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(1));
336
337
// Enable parallelism inference for optimal resource usage
338
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
339
```
340
341
#### Streaming Performance
342
343
```java
344
// High-frequency partition updates
345
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(1));
346
config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "latest");
347
348
// Batch processing of historical data
349
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(30));
350
config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "all");
351
config.setString(HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET, "2023-01-01 00:00:00");
352
```
353
354
#### Writing Performance
355
356
```java
357
// Enable dynamic partition sorting for better compression
358
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_SINK_SORT_BY_DYNAMIC_PARTITION_ENABLE, true);
359
360
// Configure partition commit policies
361
config.setString(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");
362
363
// Enable automatic statistics gathering
364
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE, true);
365
```
366
367
### Environment-Specific Configuration
368
369
#### Development Environment
370
```properties
371
# Relaxed settings for development
372
table.exec.hive.infer-source-parallelism=true
373
table.exec.hive.split-max-size=64MB
374
streaming-source.monitor-interval=30s
375
lookup.join.cache.ttl=5min
376
```
377
378
#### Production Environment
379
```properties
380
# Optimized settings for production
381
table.exec.hive.infer-source-parallelism=true
382
table.exec.hive.split-max-size=256MB
383
table.exec.hive.file-open-cost=8MB
384
streaming-source.monitor-interval=5min
385
streaming-source.partition-order=CREATE_TIME
386
sink.partition-commit.policy.kind=metastore,success-file
387
table.exec.hive.sink.statistic-auto-gather.enable=true
388
lookup.join.cache.ttl=60min
389
lookup.join.cache.max-size=50000
390
```
391
392
### Configuration Validation and Troubleshooting
393
394
Common configuration issues and their solutions:
395
396
**Split Size Issues:**
397
```java
398
// Problem: Too many small splits causing overhead
399
// Solution: Increase split size and file open cost
400
config.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(256));
401
config.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(8));
402
```
403
404
**Streaming Lag Issues:**
405
```java
406
// Problem: Partitions not being detected quickly enough
407
// Solution: Reduce monitor interval and optimize partition ordering
408
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(1));
409
config.set(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER, PartitionOrder.CREATE_TIME);
410
```
411
412
**Memory Issues in Lookup Joins:**
413
```java
414
// Problem: Lookup cache consuming too much memory
415
// Solution: Reduce cache size and TTL
416
config.set(HiveOptions.LOOKUP_JOIN_CACHE_MAX_SIZE, 5000);
417
config.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(30));
418
```