0
# Configuration and Options
1
2
Comprehensive configuration system for customizing Flink Hive connector behavior, performance tuning, and environment-specific settings. The configuration options control source/sink behavior, catalog integration, module loading, and performance optimizations.
3
4
## Capabilities
5
6
### HiveOptions
7
8
Core configuration options for Hive connector behavior and performance tuning.
9
10
```java { .api }
11
/**
12
* Configuration options for Hive connector behavior
13
* Controls reader/writer fallback, streaming mode, caching, and partition handling
14
*/
15
public class HiveOptions {
16
17
/**
18
* Whether to fallback to MapRed reader for compatibility
19
* Default: false
20
* Type: Boolean
21
*/
22
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
23
24
/**
25
* Whether to fallback to MapRed writer for compatibility
26
* Default: false
27
* Type: Boolean
28
*/
29
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
30
31
/**
32
* Enable streaming source mode for continuous monitoring
33
* Default: false
34
* Type: Boolean
35
*/
36
public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
37
38
/**
39
* Monitoring interval for streaming source partition discovery
40
* Default: 1 minute
41
* Type: Duration
42
*/
43
public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
44
45
/**
46
* Cache TTL for lookup join operations
47
* Default: no caching
48
* Type: Duration
49
*/
50
public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
51
52
/**
53
* Whether to automatically infer source parallelism
54
* Default: true
55
* Type: Boolean
56
*/
57
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
58
59
/**
60
* Partition commit policy for sinks
61
* Values: "metastore", "success-file", "metastore,success-file"
62
* Default: "metastore"
63
* Type: String
64
*/
65
public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;
66
67
/**
68
* Trigger for partition commit
69
* Values: "partition-time", "process-time"
70
* Default: "process-time"
71
* Type: String
72
*/
73
public static final ConfigOption<String> SINK_PARTITION_COMMIT_TRIGGER;
74
75
/**
76
* Delay before committing partitions
77
* Default: 0 (immediate)
78
* Type: Duration
79
*/
80
public static final ConfigOption<Duration> SINK_PARTITION_COMMIT_DELAY;
81
82
/**
83
* Watermark timezone for partition-time commit trigger
84
* Default: system timezone
85
* Type: String
86
*/
87
public static final ConfigOption<String> SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE;
88
89
/**
90
* Custom success file names for partition commit
91
* Default: "_SUCCESS"
92
* Type: String
93
*/
94
public static final ConfigOption<String> SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
95
96
/**
97
* Enable reading partitions with subdirectories
98
* Default: true
99
* Type: Boolean
100
*/
101
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;
102
103
/**
104
* Maximum inferred source parallelism
105
* Default: 1000
106
* Type: Integer
107
*/
108
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;
109
110
/**
111
* Thread count for loading partition splits
112
* Default: 3
113
* Type: Integer
114
*/
115
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM;
116
117
/**
118
* Maximum split size for Hive table reading
119
* Default: 128MB
120
* Type: MemorySize
121
*/
122
public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_SPLIT_MAX_BYTES;
123
124
/**
125
* Estimated cost to open a file for split calculation
126
* Default: 4MB
127
* Type: MemorySize
128
*/
129
public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_FILE_OPEN_COST;
130
131
/**
132
* Thread count for calculating partition sizes
133
* Default: 3
134
* Type: Integer
135
*/
136
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM;
137
138
/**
139
* Enable dynamic partition grouping
140
* Default: false
141
* Type: Boolean
142
*/
143
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_DYNAMIC_GROUPING_ENABLED;
144
145
/**
146
* Thread count for reading table/partition statistics
147
* Default: 3
148
* Type: Integer
149
*/
150
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM;
151
152
/**
153
* Average size threshold for small file compaction
154
* Default: 16MB
155
* Type: MemorySize
156
*/
157
public static final ConfigOption<MemorySize> COMPACT_SMALL_FILES_AVG_SIZE;
158
159
/**
160
* Enable automatic statistics gathering for sink
161
* Default: true
162
* Type: Boolean
163
*/
164
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE;
165
}
166
```
167
168
**Configuration Usage Examples:**
169
170
```java
171
import org.apache.flink.configuration.Configuration;
172
import org.apache.flink.connectors.hive.HiveOptions;
173
174
// Configure Hive connector options
175
Configuration config = new Configuration();
176
177
// Enable streaming mode with 30-second monitoring
178
config.set(HiveOptions.STREAMING_SOURCE_ENABLE, true);
179
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(30));
180
181
// Configure lookup join caching
182
config.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(10));
183
184
// Enable automatic parallelism inference
185
config.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
186
187
// Configure partition commit policy
188
config.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");
189
config.set(HiveOptions.SINK_PARTITION_COMMIT_TRIGGER, "partition-time");
190
config.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofHours(1));
191
192
// Apply configuration to table environment
193
TableEnvironment tableEnv = TableEnvironment.create(
194
EnvironmentSettings.newInstance()
195
.withConfiguration(config)
196
.build()
197
);
198
```
199
200
### HiveCatalogFactoryOptions
201
202
Configuration options for HiveCatalog creation and metastore connection.
203
204
```java { .api }
205
/**
206
* Configuration options for HiveCatalogFactory
207
* Controls catalog metadata, Hive configuration, and connection settings
208
*/
209
public class HiveCatalogFactoryOptions {
210
211
/**
212
* Default database name for the catalog
213
* Default: "default"
214
* Type: String
215
*/
216
public static final ConfigOption<String> DEFAULT_DATABASE;
217
218
/**
219
* Directory containing Hive configuration files (hive-site.xml)
220
* Default: null (use classpath)
221
* Type: String
222
*/
223
public static final ConfigOption<String> HIVE_CONF_DIR;
224
225
/**
226
* Hive version for compatibility
227
* Supported: "2.3.4", "2.3.6", "2.3.9", "3.1.2", "3.1.3"
228
* Default: auto-detected
229
* Type: String
230
*/
231
public static final ConfigOption<String> HIVE_VERSION;
232
233
/**
234
* Directory containing Hadoop configuration files (core-site.xml, hdfs-site.xml)
235
* Default: null (use classpath)
236
* Type: String
237
*/
238
public static final ConfigOption<String> HADOOP_CONF_DIR;
239
}
240
```
241
242
**Catalog Configuration Examples:**
243
244
```java
245
// Programmatic catalog configuration
246
Map<String, String> catalogProperties = new HashMap<>();
247
catalogProperties.put("type", "hive");
248
catalogProperties.put("default-database", "analytics");
249
catalogProperties.put("hive-conf-dir", "/etc/hive/conf");
250
catalogProperties.put("hadoop-conf-dir", "/etc/hadoop/conf");
251
catalogProperties.put("hive-version", "2.3.9");
252
253
// Create catalog with properties
254
HiveCatalog catalog = new HiveCatalog(
255
"production_hive",
256
catalogProperties.get("default-database"),
257
catalogProperties.get("hive-conf-dir"),
258
catalogProperties.get("hadoop-conf-dir"),
259
catalogProperties.get("hive-version")
260
);
261
262
// SQL DDL catalog creation
263
tableEnv.executeSql("""
264
CREATE CATALOG production_hive WITH (
265
'type' = 'hive',
266
'default-database' = 'analytics',
267
'hive-conf-dir' = '/etc/hive/conf',
268
'hadoop-conf-dir' = '/etc/hadoop/conf',
269
'hive-version' = '2.3.9'
270
)
271
""");
272
```
273
274
### HiveModuleOptions
275
276
Configuration options for HiveModule function loading and compatibility.
277
278
```java { .api }
279
/**
280
* Configuration options for HiveModule
281
* Controls function loading and Hive version compatibility
282
*/
283
public class HiveModuleOptions {
284
285
/**
286
* Hive version for function compatibility
287
* Default: auto-detected from classpath
288
* Type: String
289
*/
290
public static final ConfigOption<String> HIVE_VERSION;
291
}
292
```
293
294
**Module Configuration Examples:**
295
296
```java
297
// Module configuration via factory
298
Map<String, String> moduleProperties = new HashMap<>();
299
moduleProperties.put("type", "hive");
300
moduleProperties.put("hive-version", "2.3.9");
301
302
// Load module with specific version
303
HiveModule module = new HiveModule("2.3.9");
304
tableEnv.loadModule("hive", module);
305
306
// SQL DDL module loading
307
tableEnv.executeSql("""
308
LOAD MODULE hive WITH (
309
'type' = 'hive',
310
'hive-version' = '2.3.9'
311
)
312
""");
313
```
314
315
## Performance Configuration
316
317
### Source Performance Options
318
319
```java
320
// Optimize source performance
321
Configuration sourceConfig = new Configuration();
322
323
// Enable parallelism inference for optimal performance
324
sourceConfig.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
325
326
// Configure reader fallback (disable for better performance)
327
sourceConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);
328
329
// Set optimal batch size for bulk reading
330
sourceConfig.setString("table.exec.resource.default-parallelism", "8");
331
332
// Configure memory for large datasets
333
sourceConfig.setString("taskmanager.memory.process.size", "4g");
334
sourceConfig.setString("taskmanager.memory.flink.size", "3g");
335
```
336
337
### Sink Performance Options
338
339
```java
340
// Optimize sink performance and reliability
341
Configuration sinkConfig = new Configuration();
342
343
// Configure partition commit for reliability
344
sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");
345
sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_TRIGGER, "partition-time");
346
sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofMinutes(5));
347
348
// Disable writer fallback for better performance
349
sinkConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
350
351
// Configure checkpointing for exactly-once semantics
352
sinkConfig.setString("execution.checkpointing.interval", "30s");
353
sinkConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
354
```
355
356
### Streaming Configuration
357
358
```java
359
// Configure streaming mode for real-time processing
360
Configuration streamingConfig = new Configuration();
361
362
// Enable streaming source
363
streamingConfig.set(HiveOptions.STREAMING_SOURCE_ENABLE, true);
364
streamingConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(10));
365
366
// Configure lookup join caching
367
streamingConfig.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(15));
368
369
// Set watermark configuration
370
streamingConfig.setString("table.exec.source.idle-timeout", "30s");
371
streamingConfig.setString("pipeline.time-characteristic", "EventTime");
372
```
373
374
## Environment-Specific Configuration
375
376
### Production Environment
377
378
```java
379
// Production-ready configuration
380
Configuration prodConfig = new Configuration();
381
382
// Reliability settings
383
prodConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");
384
prodConfig.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofMinutes(10));
385
386
// Performance settings
387
prodConfig.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
388
prodConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);
389
prodConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
390
391
// Memory and resource settings
392
prodConfig.setString("taskmanager.numberOfTaskSlots", "4");
393
prodConfig.setString("taskmanager.memory.process.size", "8g");
394
prodConfig.setString("jobmanager.memory.process.size", "2g");
395
396
// Checkpointing for fault tolerance
397
prodConfig.setString("execution.checkpointing.interval", "60s");
398
prodConfig.setString("state.backend", "rocksdb");
399
prodConfig.setString("state.checkpoints.dir", "hdfs://namenode:9000/flink/checkpoints");
400
prodConfig.setString("state.savepoints.dir", "hdfs://namenode:9000/flink/savepoints");
401
```
402
403
### Development Environment
404
405
```java
406
// Development-friendly configuration
407
Configuration devConfig = new Configuration();
408
409
// Fast feedback settings
410
devConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(5));
411
devConfig.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofSeconds(10));
412
413
// Simplified commit policy
414
devConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore");
415
416
// Reduced resource usage
417
devConfig.setString("taskmanager.numberOfTaskSlots", "2");
418
devConfig.setString("taskmanager.memory.process.size", "2g");
419
420
// Local state backend
421
devConfig.setString("state.backend", "filesystem");
422
devConfig.setString("state.checkpoints.dir", "file:///tmp/flink-checkpoints");
423
```
424
425
## SQL Configuration
426
427
### Table Properties Configuration
428
429
```sql
430
-- Configure Hive table with connector properties
431
CREATE TABLE streaming_events (
432
user_id STRING,
433
event_type STRING,
434
event_time TIMESTAMP(3),
435
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
436
) PARTITIONED BY (event_date STRING)
437
WITH (
438
'connector' = 'hive',
439
'streaming-source.enable' = 'true',
440
'streaming-source.monitor-interval' = '10s',
441
'sink.partition-commit.trigger' = 'partition-time',
442
'sink.partition-commit.delay' = '1 h',
443
'sink.partition-commit.policy.kind' = 'metastore,success-file'
444
);
445
446
-- Configure external table with custom properties
447
CREATE TABLE external_logs (
448
log_level STRING,
449
message STRING,
450
log_time TIMESTAMP
451
) PARTITIONED BY (date_partition STRING)
452
WITH (
453
'connector' = 'hive',
454
'streaming-source.enable' = 'false',
455
'lookup.join-cache.ttl' = '1 h'
456
);
457
```
458
459
### Session Configuration
460
461
```sql
462
-- Set session-level configuration
463
SET 'table.exec.hive.infer-source-parallelism' = 'true';
464
SET 'table.exec.hive.fallback-mapred-reader' = 'false';
465
SET 'execution.checkpointing.interval' = '30s';
466
467
-- Configure catalog defaults
468
SET 'table.sql-dialect' = 'hive';
469
SET 'table.exec.hive.fallback-mapred-writer' = 'false';
470
```
471
472
## Advanced Configuration Patterns
473
474
### Multi-Cluster Configuration
475
476
```java
477
// Configure for multi-cluster Hive setup
478
Configuration multiClusterConfig = new Configuration();
479
480
// Primary cluster configuration
481
Map<String, String> primaryCatalogProps = Map.of(
482
"type", "hive",
483
"default-database", "production",
484
"hive-conf-dir", "/etc/hive/primary/conf",
485
"hadoop-conf-dir", "/etc/hadoop/primary/conf",
486
"hive-version", "2.3.9"
487
);
488
489
// Secondary cluster configuration
490
Map<String, String> secondaryCatalogProps = Map.of(
491
"type", "hive",
492
"default-database", "analytics",
493
"hive-conf-dir", "/etc/hive/secondary/conf",
494
"hadoop-conf-dir", "/etc/hadoop/secondary/conf",
495
"hive-version", "2.3.9"
496
);
497
498
// Register multiple catalogs
499
tableEnv.executeSql("CREATE CATALOG primary_hive WITH " + formatProperties(primaryCatalogProps));
500
tableEnv.executeSql("CREATE CATALOG secondary_hive WITH " + formatProperties(secondaryCatalogProps));
501
502
// Cross-cluster queries
503
Table result = tableEnv.sqlQuery("""
504
SELECT p.*, s.analytics_data
505
FROM primary_hive.production.users p
506
JOIN secondary_hive.analytics.user_metrics s
507
ON p.user_id = s.user_id
508
""");
509
```
510
511
### Version-Specific Configuration
512
513
```java
514
// Handle different Hive versions
515
public class HiveConfigurationManager {
516
517
public Configuration getHive239Configuration() {
518
Configuration config = new Configuration();
519
config.set(HiveCatalogFactoryOptions.HIVE_VERSION, "2.3.9");
520
config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);
521
config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
522
return config;
523
}
524
525
public Configuration getHive313Configuration() {
526
Configuration config = new Configuration();
527
config.set(HiveCatalogFactoryOptions.HIVE_VERSION, "3.1.3");
528
// Version 3.x specific optimizations
529
config.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
530
return config;
531
}
532
}
533
```
534
535
### Resource-Aware Configuration
536
537
```java
538
// Configure based on available resources
539
public Configuration createResourceAwareConfig(int availableCores, long availableMemoryMB) {
540
Configuration config = new Configuration();
541
542
// Scale parallelism based on cores
543
int parallelism = Math.max(1, availableCores / 2);
544
config.setString("parallelism.default", String.valueOf(parallelism));
545
546
// Configure memory based on available resources
547
long taskManagerMemory = Math.min(availableMemoryMB / 2, 8192); // Max 8GB per TM
548
config.setString("taskmanager.memory.process.size", taskManagerMemory + "m");
549
550
// Adjust monitoring interval based on load
551
Duration monitorInterval = availableCores > 8 ?
552
Duration.ofSeconds(5) : Duration.ofSeconds(30);
553
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, monitorInterval);
554
555
return config;
556
}
557
```