0
# Configuration
1
2
Configuration options and factory classes for setting up Hive integration with customizable behavior for performance, compatibility, and operational requirements.
3
4
## Capabilities
5
6
### HiveOptions
7
8
Configuration options for tuning Hive connector behavior and performance.
9
10
```java { .api }
11
/**
12
* Configuration options for Hive connector operations
13
*/
14
public class HiveOptions {
15
/**
16
* Whether to use Hadoop MapRed record reader for ORC files
17
* Default: false
18
*/
19
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER =
20
ConfigOptions.key("table.exec.hive.fallback-mapred-reader")
21
.defaultValue(false)
22
.withDescription(
23
"If it is false, using flink native vectorized reader to read orc files; " +
24
"If it is true, using hadoop mapred record reader to read orc files.");
25
26
/**
27
* Whether to infer source parallelism based on splits
28
* Default: true
29
*/
30
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
31
ConfigOptions.key("table.exec.hive.infer-source-parallelism")
32
.defaultValue(true)
33
.withDescription(
34
"If is false, parallelism of source are set by config.\n" +
35
"If is true, source parallelism is inferred according to splits number.\n");
36
37
/**
38
* Maximum inferred parallelism for source operator
39
* Default: 1000
40
*/
41
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =
42
ConfigOptions.key("table.exec.hive.infer-source-parallelism.max")
43
.defaultValue(1000)
44
.withDescription("Sets max infer parallelism for source operator.");
45
46
/**
47
* Whether to use Hadoop MapRed record writer for Parquet and ORC files
48
* Default: true
49
*/
50
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER =
51
ConfigOptions.key("table.exec.hive.fallback-mapred-writer")
52
.booleanType()
53
.defaultValue(true)
54
.withDescription(
55
"If it is false, using flink native writer to write parquet and orc files; " +
56
"If it is true, using hadoop mapred record writer to write parquet and orc files.");
57
}
58
```
59
60
### Catalog Factory Configuration
61
62
Factory and configuration options for creating Hive catalog instances.
63
64
```java { .api }
65
/**
66
* Factory for creating HiveCatalog instances with configuration validation
67
*/
68
public class HiveCatalogFactory implements CatalogFactory {
69
/**
70
* Get the factory identifier for service discovery
71
* @return "hive" identifier string
72
*/
73
public String factoryIdentifier();
74
75
/**
76
* Create HiveCatalog from configuration context
77
* @param context - Factory context with configuration options
78
* @return Configured HiveCatalog instance
79
*/
80
public Catalog createCatalog(Context context);
81
82
/**
83
* Get required configuration options
84
* @return Set of required ConfigOption objects (empty for Hive)
85
*/
86
public Set<ConfigOption<?>> requiredOptions();
87
88
/**
89
* Get optional configuration options
90
* @return Set of optional ConfigOption objects
91
*/
92
public Set<ConfigOption<?>> optionalOptions();
93
}
94
95
/**
96
* Configuration options for HiveCatalogFactory
97
*/
98
public class HiveCatalogFactoryOptions {
99
/**
100
* Factory identifier for service discovery
101
*/
102
public static final String IDENTIFIER = "hive";
103
104
/**
105
* Default database name for the catalog
106
* Default: "default"
107
*/
108
public static final ConfigOption<String> DEFAULT_DATABASE =
109
ConfigOptions.key("default-database")
110
.stringType()
111
.defaultValue("default")
112
.withDescription("Default database name for the catalog.");
113
114
/**
115
* Directory containing hive-site.xml configuration file
116
* Default: null (uses classpath)
117
*/
118
public static final ConfigOption<String> HIVE_CONF_DIR =
119
ConfigOptions.key("hive-conf-dir")
120
.stringType()
121
.noDefaultValue()
122
.withDescription("Directory containing hive-site.xml configuration file.");
123
124
/**
125
* Directory containing Hadoop configuration files
126
* Default: null (uses classpath)
127
*/
128
public static final ConfigOption<String> HADOOP_CONF_DIR =
129
ConfigOptions.key("hadoop-conf-dir")
130
.stringType()
131
.noDefaultValue()
132
.withDescription("Directory containing Hadoop configuration files.");
133
134
/**
135
* Hive version string for compatibility
136
* Default: null (auto-detected)
137
*/
138
public static final ConfigOption<String> HIVE_VERSION =
139
ConfigOptions.key("hive-version")
140
.stringType()
141
.noDefaultValue()
142
.withDescription("Hive version string for compatibility.");
143
}
144
```
145
146
### Module Factory Configuration
147
148
Factory and configuration for HiveModule creation.
149
150
```java { .api }
151
/**
152
* Factory for creating HiveModule instances
153
*/
154
public class HiveModuleFactory implements ModuleFactory {
155
/**
156
* Get the factory identifier
157
* @return "hive" identifier string
158
*/
159
public String factoryIdentifier();
160
161
/**
162
* Create HiveModule from configuration context
163
* @param context - Factory context with configuration options
164
* @return Configured HiveModule instance
165
*/
166
public Module createModule(Context context);
167
168
/**
169
* Get required configuration options
170
* @return Set of required ConfigOption objects (empty for Hive module)
171
*/
172
public Set<ConfigOption<?>> requiredOptions();
173
174
/**
175
* Get optional configuration options
176
* @return Set of optional ConfigOption objects
177
*/
178
public Set<ConfigOption<?>> optionalOptions();
179
}
180
181
/**
182
* Configuration options for HiveModule
183
*/
184
public class HiveModuleOptions {
185
/**
186
* Hive version for function compatibility
187
* Default: null (uses latest supported)
188
*/
189
public static final ConfigOption<String> HIVE_VERSION =
190
ConfigOptions.key("hive-version")
191
.stringType()
192
.noDefaultValue()
193
.withDescription("Hive version for function compatibility.");
194
}
195
```
196
197
### Streaming Source Configuration
198
199
Configuration options specific to streaming Hive sources.
200
201
```java { .api }
202
/**
203
* Configuration options from FileSystemConnectorOptions used by Hive connector
204
*/
205
public class FileSystemConnectorOptions {
206
/**
207
* Enable streaming source mode for partition monitoring
208
* Default: false
209
*/
210
public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
211
ConfigOptions.key("streaming-source.enable")
212
.booleanType()
213
.defaultValue(false)
214
.withDescription("Enable streaming source mode for partition monitoring.");
215
216
/**
217
* Which partitions to include in streaming mode
218
* Options: "all", "latest"
219
* Default: "all"
220
*/
221
public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE =
222
ConfigOptions.key("streaming-source.partition.include")
223
.stringType()
224
.defaultValue("all")
225
.withDescription("Which partitions to include: 'all' or 'latest'.");
226
227
/**
228
* Interval for monitoring new partitions (in milliseconds)
229
* Default: 60000 (1 minute)
230
*/
231
public static final ConfigOption<Long> STREAMING_SOURCE_MONITOR_INTERVAL =
232
ConfigOptions.key("streaming-source.monitor-interval")
233
.longType()
234
.defaultValue(60000L)
235
.withDescription("Interval for monitoring new partitions in milliseconds.");
236
237
/**
238
* Configured parallelism for sink operations
239
* Default: null (uses default parallelism)
240
*/
241
public static final ConfigOption<Integer> SINK_PARALLELISM =
242
ConfigOptions.key("sink.parallelism")
243
.intType()
244
.noDefaultValue()
245
.withDescription("Configured parallelism for sink operations.");
246
}
247
```
248
249
### Hadoop and Hive Configuration Utilities
250
251
Utility classes for managing Hadoop and Hive configuration.
252
253
```java { .api }
254
/**
255
* Utilities for managing Hive configuration
256
*/
257
public class HiveConfUtils {
258
/**
259
* Create HiveConf from configuration directory
260
* @param hiveConfDir - Directory containing hive-site.xml (can be null)
261
* @return Configured HiveConf instance
262
*/
263
public static HiveConf create(String hiveConfDir);
264
265
/**
266
* Get Hive configuration with custom properties
267
* @param hiveConf - Base Hive configuration
268
* @param customProps - Additional properties to set
269
* @return Updated HiveConf instance
270
*/
271
public static HiveConf create(HiveConf hiveConf, Map<String, String> customProps);
272
}
273
274
/**
275
* Utilities for managing Hadoop JobConf
276
*/
277
public class JobConfUtils {
278
/**
279
* Create JobConf with security credentials
280
* @param hiveConf - Hive configuration to base JobConf on
281
* @return JobConf with security credentials configured
282
*/
283
public static JobConf createJobConfWithCredentials(HiveConf hiveConf);
284
285
/**
286
* Create JobConf with custom properties
287
* @param hiveConf - Base Hive configuration
288
* @param extraConf - Additional configuration properties
289
* @return Configured JobConf instance
290
*/
291
public static JobConf createJobConf(HiveConf hiveConf, Map<String, String> extraConf);
292
}
293
294
/**
295
* Factory for creating Hadoop FileSystem instances
296
*/
297
public class HadoopFileSystemFactory {
298
/**
299
* Create file system factory with configuration
300
* @param hadoopConf - Hadoop configuration
301
*/
302
public HadoopFileSystemFactory(org.apache.hadoop.conf.Configuration hadoopConf);
303
304
/**
305
* Create file system for given URI
306
* @param fsUri - File system URI
307
* @return FileSystem instance
308
* @throws IOException if creation fails
309
*/
310
public FileSystem create(URI fsUri) throws IOException;
311
}
312
```
313
314
### Dynamic Table Factory Configuration
315
316
Configuration for the dynamic table factory system.
317
318
```java { .api }
319
/**
320
* Configuration options for dynamic table operations
321
*/
322
public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
323
/**
324
* Get factory identifier (throws UnsupportedOperationException)
325
* Hive factory only works through catalog, not standalone
326
* @return Not supported
327
* @throws UnsupportedOperationException always
328
*/
329
public String factoryIdentifier();
330
331
/**
332
* Get required options (throws UnsupportedOperationException)
333
* @return Not supported
334
* @throws UnsupportedOperationException always
335
*/
336
public Set<ConfigOption<?>> requiredOptions();
337
338
/**
339
* Get optional options (throws UnsupportedOperationException)
340
* @return Not supported
341
* @throws UnsupportedOperationException always
342
*/
343
public Set<ConfigOption<?>> optionalOptions();
344
345
/**
346
* Create dynamic table source based on context
347
* @param context - Creation context with catalog table info
348
* @return DynamicTableSource implementation
349
*/
350
public DynamicTableSource createDynamicTableSource(Context context);
351
352
/**
353
* Create dynamic table sink based on context
354
* @param context - Creation context with catalog table info
355
* @return DynamicTableSink implementation
356
*/
357
public DynamicTableSink createDynamicTableSink(Context context);
358
}
359
```
360
361
**Usage Examples:**
362
363
```java
364
import org.apache.flink.table.api.TableEnvironment;
365
import org.apache.flink.configuration.Configuration;
366
import org.apache.flink.connectors.hive.HiveOptions;
367
368
// Configure Hive connector options
369
Configuration config = new Configuration();
370
371
// Use native Flink readers for better performance
372
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);
373
374
// Use native Flink writers for better performance
375
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
376
377
// Enable source parallelism inference
378
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
379
config.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 500);
380
381
// Create table environment with configuration
382
TableEnvironment tableEnv = TableEnvironment.create(
383
EnvironmentSettings.newInstance()
384
.inBatchMode()
385
.withConfiguration(config)
386
.build()
387
);
388
```
389
390
```java
391
// Create Hive catalog with full configuration options
392
Map<String, String> catalogOptions = new HashMap<>();
393
catalogOptions.put("type", "hive");
394
catalogOptions.put("default-database", "analytics");
395
catalogOptions.put("hive-conf-dir", "/opt/hive/conf");
396
catalogOptions.put("hadoop-conf-dir", "/opt/hadoop/etc/hadoop");
397
catalogOptions.put("hive-version", "2.3.6");
398
399
// Use catalog factory to create catalog
400
CatalogFactory.Context context = new CatalogFactory.Context() {
401
public String getName() { return "hive_catalog"; }
402
public Map<String, String> getOptions() { return catalogOptions; }
403
public ReadableConfig getConfiguration() { return Configuration.fromMap(catalogOptions); }
404
public ClassLoader getClassLoader() { return Thread.currentThread().getContextClassLoader(); }
405
};
406
407
HiveCatalogFactory factory = new HiveCatalogFactory();
408
Catalog hiveCatalog = factory.createCatalog(context);
409
410
tableEnv.registerCatalog("hive_catalog", hiveCatalog);
411
tableEnv.useCatalog("hive_catalog");
412
```
413
414
```java
415
// Configure streaming Hive source
416
tableEnv.executeSql(
417
"CREATE TABLE streaming_events (" +
418
" event_id BIGINT," +
419
" user_id BIGINT," +
420
" event_time TIMESTAMP(3)," +
421
" event_type STRING," +
422
" partition_date STRING" +
423
") PARTITIONED BY (partition_date) " +
424
"STORED AS PARQUET " +
425
"TBLPROPERTIES (" +
426
" 'streaming-source.enable' = 'true'," +
427
" 'streaming-source.partition.include' = 'all'," +
428
" 'streaming-source.monitor-interval' = '30000'" + // 30 seconds
429
")"
430
);
431
432
// Query streaming table
433
Table result = tableEnv.sqlQuery(
434
"SELECT event_type, COUNT(*) as event_count " +
435
"FROM streaming_events " +
436
"WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +
437
"GROUP BY event_type"
438
);
439
```
440
441
```java
442
// Load Hive module with specific version
443
Map<String, String> moduleOptions = new HashMap<>();
444
moduleOptions.put("hive-version", "2.3.6");
445
446
ModuleFactory.Context moduleContext = new ModuleFactory.Context() {
447
public Map<String, String> getOptions() { return moduleOptions; }
448
public ReadableConfig getConfiguration() { return Configuration.fromMap(moduleOptions); }
449
public ClassLoader getClassLoader() { return Thread.currentThread().getContextClassLoader(); }
450
};
451
452
HiveModuleFactory moduleFactory = new HiveModuleFactory();
453
Module hiveModule = moduleFactory.createModule(moduleContext);
454
455
tableEnv.loadModule("hive", hiveModule);
456
```
457
458
## Types
459
460
```java { .api }
461
public interface ConfigOption<T> {
462
/**
463
* Get the option key
464
* @return Configuration key string
465
*/
466
String key();
467
468
/**
469
* Get the default value
470
* @return Default value for this option
471
*/
472
T defaultValue();
473
474
/**
475
* Get the option description
476
* @return Human-readable description
477
*/
478
String description();
479
}
480
481
public interface CatalogFactory extends Factory {
482
/**
483
* Create catalog from context
484
* @param context - Creation context
485
* @return Catalog instance
486
*/
487
Catalog createCatalog(Context context);
488
489
/**
490
* Context interface for catalog creation
491
*/
492
interface Context {
493
String getName();
494
Map<String, String> getOptions();
495
ReadableConfig getConfiguration();
496
ClassLoader getClassLoader();
497
}
498
}
499
500
public interface ModuleFactory extends Factory {
501
/**
502
* Create module from context
503
* @param context - Creation context
504
* @return Module instance
505
*/
506
Module createModule(Context context);
507
508
/**
509
* Context interface for module creation
510
*/
511
interface Context {
512
Map<String, String> getOptions();
513
ReadableConfig getConfiguration();
514
ClassLoader getClassLoader();
515
}
516
}
517
518
public interface Factory {
519
/**
520
* Get unique factory identifier
521
* @return Factory identifier string
522
*/
523
String factoryIdentifier();
524
525
/**
526
* Get required configuration options
527
* @return Set of required options
528
*/
529
Set<ConfigOption<?>> requiredOptions();
530
531
/**
532
* Get optional configuration options
533
* @return Set of optional options
534
*/
535
Set<ConfigOption<?>> optionalOptions();
536
}
537
```