Comprehensive configuration system for customizing Flink Hive connector behavior, performance tuning, and environment-specific settings. The configuration options control source/sink behavior, catalog integration, module loading, and performance optimizations.
Core configuration options for Hive connector behavior and performance tuning.
/**
* Configuration options for Hive connector behavior
* Controls reader/writer fallback, streaming mode, caching, and partition handling
*/
public class HiveOptions {
/**
* Whether to fallback to MapRed reader for compatibility
* Default: false
* Type: Boolean
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
/**
* Whether to fallback to MapRed writer for compatibility
* Default: false
* Type: Boolean
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
/**
* Enable streaming source mode for continuous monitoring
* Default: false
* Type: Boolean
*/
public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
/**
* Monitoring interval for streaming source partition discovery
* Default: 1 minute
* Type: Duration
*/
public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
/**
* Cache TTL for lookup join operations
* Default: no caching
* Type: Duration
*/
public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
/**
* Whether to automatically infer source parallelism
* Default: true
* Type: Boolean
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
/**
* Partition commit policy for sinks
* Values: "metastore", "success-file", "metastore,success-file"
* Default: "metastore"
* Type: String
*/
public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;
/**
* Trigger for partition commit
* Values: "partition-time", "process-time"
* Default: "process-time"
* Type: String
*/
public static final ConfigOption<String> SINK_PARTITION_COMMIT_TRIGGER;
/**
* Delay before committing partitions
* Default: 0 (immediate)
* Type: Duration
*/
public static final ConfigOption<Duration> SINK_PARTITION_COMMIT_DELAY;
/**
* Watermark timezone for partition-time commit trigger
* Default: system timezone
* Type: String
*/
public static final ConfigOption<String> SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE;
/**
* Custom success file names for partition commit
* Default: "_SUCCESS"
* Type: String
*/
public static final ConfigOption<String> SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
/**
* Enable reading partitions with subdirectories
* Default: true
* Type: Boolean
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;
/**
* Maximum inferred source parallelism
* Default: 1000
* Type: Integer
*/
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;
/**
* Thread count for loading partition splits
* Default: 3
* Type: Integer
*/
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM;
/**
* Maximum split size for Hive table reading
* Default: 128MB
* Type: MemorySize
*/
public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_SPLIT_MAX_BYTES;
/**
* Estimated cost to open a file for split calculation
* Default: 4MB
* Type: MemorySize
*/
public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_FILE_OPEN_COST;
/**
* Thread count for calculating partition sizes
* Default: 3
* Type: Integer
*/
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM;
/**
* Enable dynamic partition grouping
* Default: false
* Type: Boolean
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_DYNAMIC_GROUPING_ENABLED;
/**
* Thread count for reading table/partition statistics
* Default: 3
* Type: Integer
*/
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM;
/**
* Average size threshold for small file compaction
* Default: 16MB
* Type: MemorySize
*/
public static final ConfigOption<MemorySize> COMPACT_SMALL_FILES_AVG_SIZE;
/**
* Enable automatic statistics gathering for sink
* Default: true
* Type: Boolean
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE;
}Configuration Usage Examples:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.hive.HiveOptions;
// Configure Hive connector options
Configuration config = new Configuration();
// Enable streaming mode with 30-second monitoring
config.set(HiveOptions.STREAMING_SOURCE_ENABLE, true);
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(30));
// Configure lookup join caching
config.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(10));
// Enable automatic parallelism inference
config.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
// Configure partition commit policy
config.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");
config.set(HiveOptions.SINK_PARTITION_COMMIT_TRIGGER, "partition-time");
config.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofHours(1));
// Apply configuration to table environment
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.newInstance()
.withConfiguration(config)
.build()
);Configuration options for HiveCatalog creation and metastore connection.
/**
* Configuration options for HiveCatalogFactory
* Controls catalog metadata, Hive configuration, and connection settings
*/
public class HiveCatalogFactoryOptions {
/**
* Default database name for the catalog
* Default: "default"
* Type: String
*/
public static final ConfigOption<String> DEFAULT_DATABASE;
/**
* Directory containing Hive configuration files (hive-site.xml)
* Default: null (use classpath)
* Type: String
*/
public static final ConfigOption<String> HIVE_CONF_DIR;
/**
* Hive version for compatibility
* Supported: "2.3.4", "2.3.6", "2.3.9", "3.1.2", "3.1.3"
* Default: auto-detected
* Type: String
*/
public static final ConfigOption<String> HIVE_VERSION;
/**
* Directory containing Hadoop configuration files (core-site.xml, hdfs-site.xml)
* Default: null (use classpath)
* Type: String
*/
public static final ConfigOption<String> HADOOP_CONF_DIR;
}Catalog Configuration Examples:
// Programmatic catalog configuration
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put("type", "hive");
catalogProperties.put("default-database", "analytics");
catalogProperties.put("hive-conf-dir", "/etc/hive/conf");
catalogProperties.put("hadoop-conf-dir", "/etc/hadoop/conf");
catalogProperties.put("hive-version", "2.3.9");
// Create catalog with properties
HiveCatalog catalog = new HiveCatalog(
"production_hive",
catalogProperties.get("default-database"),
catalogProperties.get("hive-conf-dir"),
catalogProperties.get("hadoop-conf-dir"),
catalogProperties.get("hive-version")
);
// SQL DDL catalog creation
tableEnv.executeSql("""
CREATE CATALOG production_hive WITH (
'type' = 'hive',
'default-database' = 'analytics',
'hive-conf-dir' = '/etc/hive/conf',
'hadoop-conf-dir' = '/etc/hadoop/conf',
'hive-version' = '2.3.9'
)
""");Configuration options for HiveModule function loading and compatibility.
/**
* Configuration options for HiveModule
* Controls function loading and Hive version compatibility
*/
public class HiveModuleOptions {
/**
* Hive version for function compatibility
* Default: auto-detected from classpath
* Type: String
*/
public static final ConfigOption<String> HIVE_VERSION;
}Module Configuration Examples:
// Module configuration via factory
Map<String, String> moduleProperties = new HashMap<>();
moduleProperties.put("type", "hive");
moduleProperties.put("hive-version", "2.3.9");
// Load module with specific version
HiveModule module = new HiveModule("2.3.9");
tableEnv.loadModule("hive", module);
// SQL DDL module loading
tableEnv.executeSql("""
LOAD MODULE hive WITH (
'type' = 'hive',
'hive-version' = '2.3.9'
)
""");// Optimize source performance
Configuration sourceConfig = new Configuration();
// Enable parallelism inference for optimal performance
sourceConfig.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
// Configure reader fallback (disable for better performance)
sourceConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);
// Set optimal batch size for bulk reading
sourceConfig.setString("table.exec.resource.default-parallelism", "8");
// Configure memory for large datasets
sourceConfig.setString("taskmanager.memory.process.size", "4g");
sourceConfig.setString("taskmanager.memory.flink.size", "3g");// Optimize sink performance and reliability
Configuration sinkConfig = new Configuration();
// Configure partition commit for reliability
sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");
sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_TRIGGER, "partition-time");
sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofMinutes(5));
// Disable writer fallback for better performance
sinkConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
// Configure checkpointing for exactly-once semantics
sinkConfig.setString("execution.checkpointing.interval", "30s");
sinkConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");// Configure streaming mode for real-time processing
Configuration streamingConfig = new Configuration();
// Enable streaming source
streamingConfig.set(HiveOptions.STREAMING_SOURCE_ENABLE, true);
streamingConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(10));
// Configure lookup join caching
streamingConfig.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(15));
// Set watermark configuration
streamingConfig.setString("table.exec.source.idle-timeout", "30s");
streamingConfig.setString("pipeline.time-characteristic", "EventTime");// Production-ready configuration
Configuration prodConfig = new Configuration();
// Reliability settings
prodConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");
prodConfig.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofMinutes(10));
// Performance settings
prodConfig.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
prodConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);
prodConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
// Memory and resource settings
prodConfig.setString("taskmanager.numberOfTaskSlots", "4");
prodConfig.setString("taskmanager.memory.process.size", "8g");
prodConfig.setString("jobmanager.memory.process.size", "2g");
// Checkpointing for fault tolerance
prodConfig.setString("execution.checkpointing.interval", "60s");
prodConfig.setString("state.backend", "rocksdb");
prodConfig.setString("state.checkpoints.dir", "hdfs://namenode:9000/flink/checkpoints");
prodConfig.setString("state.savepoints.dir", "hdfs://namenode:9000/flink/savepoints");// Development-friendly configuration
Configuration devConfig = new Configuration();
// Fast feedback settings
devConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(5));
devConfig.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofSeconds(10));
// Simplified commit policy
devConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore");
// Reduced resource usage
devConfig.setString("taskmanager.numberOfTaskSlots", "2");
devConfig.setString("taskmanager.memory.process.size", "2g");
// Local state backend
devConfig.setString("state.backend", "filesystem");
devConfig.setString("state.checkpoints.dir", "file:///tmp/flink-checkpoints");-- Configure Hive table with connector properties
CREATE TABLE streaming_events (
user_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) PARTITIONED BY (event_date STRING)
WITH (
'connector' = 'hive',
'streaming-source.enable' = 'true',
'streaming-source.monitor-interval' = '10s',
'sink.partition-commit.trigger' = 'partition-time',
'sink.partition-commit.delay' = '1 h',
'sink.partition-commit.policy.kind' = 'metastore,success-file'
);
-- Configure external table with custom properties
CREATE TABLE external_logs (
log_level STRING,
message STRING,
log_time TIMESTAMP
) PARTITIONED BY (date_partition STRING)
WITH (
'connector' = 'hive',
'streaming-source.enable' = 'false',
'lookup.join-cache.ttl' = '1 h'
);-- Set session-level configuration
SET 'table.exec.hive.infer-source-parallelism' = 'true';
SET 'table.exec.hive.fallback-mapred-reader' = 'false';
SET 'execution.checkpointing.interval' = '30s';
-- Configure catalog defaults
SET 'table.sql-dialect' = 'hive';
SET 'table.exec.hive.fallback-mapred-writer' = 'false';// Configure for multi-cluster Hive setup
Configuration multiClusterConfig = new Configuration();
// Primary cluster configuration
Map<String, String> primaryCatalogProps = Map.of(
"type", "hive",
"default-database", "production",
"hive-conf-dir", "/etc/hive/primary/conf",
"hadoop-conf-dir", "/etc/hadoop/primary/conf",
"hive-version", "2.3.9"
);
// Secondary cluster configuration
Map<String, String> secondaryCatalogProps = Map.of(
"type", "hive",
"default-database", "analytics",
"hive-conf-dir", "/etc/hive/secondary/conf",
"hadoop-conf-dir", "/etc/hadoop/secondary/conf",
"hive-version", "2.3.9"
);
// Register multiple catalogs
tableEnv.executeSql("CREATE CATALOG primary_hive WITH " + formatProperties(primaryCatalogProps));
tableEnv.executeSql("CREATE CATALOG secondary_hive WITH " + formatProperties(secondaryCatalogProps));
// Cross-cluster queries
Table result = tableEnv.sqlQuery("""
SELECT p.*, s.analytics_data
FROM primary_hive.production.users p
JOIN secondary_hive.analytics.user_metrics s
ON p.user_id = s.user_id
""");// Handle different Hive versions
public class HiveConfigurationManager {
public Configuration getHive239Configuration() {
Configuration config = new Configuration();
config.set(HiveCatalogFactoryOptions.HIVE_VERSION, "2.3.9");
config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);
config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
return config;
}
public Configuration getHive313Configuration() {
Configuration config = new Configuration();
config.set(HiveCatalogFactoryOptions.HIVE_VERSION, "3.1.3");
// Version 3.x specific optimizations
config.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
return config;
}
}// Configure based on available resources
public Configuration createResourceAwareConfig(int availableCores, long availableMemoryMB) {
Configuration config = new Configuration();
// Scale parallelism based on cores
int parallelism = Math.max(1, availableCores / 2);
config.setString("parallelism.default", String.valueOf(parallelism));
// Configure memory based on available resources
long taskManagerMemory = Math.min(availableMemoryMB / 2, 8192); // Max 8GB per TM
config.setString("taskmanager.memory.process.size", taskManagerMemory + "m");
// Adjust monitoring interval based on load
Duration monitorInterval = availableCores > 8 ?
Duration.ofSeconds(5) : Duration.ofSeconds(30);
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, monitorInterval);
return config;
}