Comprehensive configuration system for tuning connector behavior, performance optimization, and streaming source configuration.
Central configuration class containing all options for customizing Hive connector behavior, performance tuning, and streaming source settings.
/**
* Configuration options for Hive connector behavior and performance tuning
* All options can be set via Flink configuration or table properties
*/
class HiveOptions {
// Performance and Reader Configuration
/**
* Whether to infer parallelism for Hive source based on splits
* When enabled, Flink automatically determines optimal parallelism from file splits
*/
ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM = ConfigOptions
.key("table.exec.hive.infer-source-parallelism")
.booleanType()
.defaultValue(true);
/**
* Maximum parallelism when inferring source parallelism
* Limits the maximum number of parallel subtasks for Hive sources
*/
ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX = ConfigOptions
.key("table.exec.hive.infer-source-parallelism.max")
.intType()
.defaultValue(1000);
/**
* Whether to fallback to MapRed reader when native readers fail
* Provides compatibility fallback for reading complex file formats
*/
ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER = ConfigOptions
.key("table.exec.hive.fallback-mapred-reader")
.booleanType()
.defaultValue(false);
/**
* Whether to fallback to MapRed writer when native writers fail
* Provides compatibility fallback for writing complex file formats
*/
ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER = ConfigOptions
.key("table.exec.hive.fallback-mapred-writer")
.booleanType()
.defaultValue(true);
/**
* Number of threads for loading partition splits in parallel
* Higher values improve partition discovery performance for tables with many partitions
*/
ConfigOption<Integer> TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM = ConfigOptions
.key("table.exec.hive.load-partition-splits.thread-num")
.intType()
.defaultValue(3);
// Streaming Source Configuration
/**
* Enable streaming mode for Hive sources
* When enabled, continuously monitors for new partitions
*/
ConfigOption<Boolean> STREAMING_SOURCE_ENABLE = ConfigOptions
.key("streaming-source.enable")
.booleanType()
.defaultValue(false);
/**
* Strategy for including partitions in streaming source
* Controls which partitions are included when monitoring for new data
*/
ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE = ConfigOptions
.key("streaming-source.partition.include")
.stringType()
.defaultValue("all");
/**
* Interval for monitoring new partitions in streaming mode
* How frequently to check for new partitions when streaming is enabled
*/
ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL = ConfigOptions
.key("streaming-source.monitor-interval")
.durationType()
.noDefaultValue();
/**
* Starting offset for consuming partitions in streaming mode
* Controls from which point to start consuming when starting streaming job
*/
ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET = ConfigOptions
.key("streaming-source.consume-start-offset")
.stringType()
.noDefaultValue();
/**
* Order for consuming partitions in streaming mode
* Determines the order in which partitions are processed
*/
ConfigOption<String> STREAMING_SOURCE_PARTITION_ORDER = ConfigOptions
.key("streaming-source.partition-order")
.stringType()
.defaultValue("partition-name");
// Lookup Join Configuration
/**
* TTL for lookup join cache entries
* How long to cache lookup results before refreshing from Hive table
*/
ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL = ConfigOptions
.key("lookup.cache.ttl")
.durationType()
.defaultValue(Duration.ofHours(1));
}Usage Examples:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.hive.HiveOptions;
// Configure via Flink Configuration
Configuration config = new Configuration();
// Performance tuning
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
config.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 500);
config.setInteger(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM, 5);
// Streaming configuration
config.setBoolean(HiveOptions.STREAMING_SOURCE_ENABLE, true);
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(5));
config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "latest");
config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER, "partition-time");
config.setString(HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET, "2023-01-01");
// Lookup join caching
config.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(30));
// Apply to table environment
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.getConfig().addConfiguration(config);Enumeration defining the order in which partitions are processed in streaming mode.
/**
* Enumeration for partition ordering strategies in streaming mode
* Controls the sequence in which partitions are consumed
*/
enum PartitionOrder {
/**
* Order partitions by their creation time in Hive metastore
* Processes partitions in the order they were added to the table
*/
CREATE_TIME,
/**
* Order partitions by partition time extracted from partition values
* Requires partition columns that can be interpreted as timestamps
*/
PARTITION_TIME,
/**
* Order partitions lexicographically by partition name
* Uses string comparison of partition path names
*/
PARTITION_NAME
}Usage Examples:
// Configure partition ordering
config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER, "partition-time");
// Example table with time-based partitions
tableEnv.executeSql(
"CREATE TABLE time_series_data (" +
" sensor_id STRING," +
" measurement DOUBLE," +
" event_time TIMESTAMP(3)" +
") PARTITIONED BY (" +
" year INT," +
" month INT," +
" day INT" +
") WITH (" +
" 'connector' = 'hive'," +
" 'streaming-source.enable' = 'true'," +
" 'streaming-source.partition-order' = 'partition-time'" +
")"
);// Optimize for large tables with many partitions
Configuration perfConfig = new Configuration();
// Enable parallelism inference for optimal resource usage
perfConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
perfConfig.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 2000);
// Increase partition loading threads for faster metadata discovery
perfConfig.setInteger(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM, 10);
// Enable fallback readers for compatibility
perfConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);// Configure for streaming ingestion
Configuration streamConfig = new Configuration();
// Enable continuous partition monitoring
streamConfig.setBoolean(HiveOptions.STREAMING_SOURCE_ENABLE, true);
// Monitor for new partitions every 30 seconds
streamConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(30));
// Only include recent partitions (last 7 days)
streamConfig.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "latest");
// Start consuming from specific date
streamConfig.setString(HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET, "2023-12-01");
// Process partitions in chronological order
streamConfig.setString(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER, "partition-time");// Optimize lookup joins for better performance
Configuration lookupConfig = new Configuration();
// Cache lookup results for 15 minutes
lookupConfig.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(15));
// Apply configuration
tableEnv.getConfig().addConfiguration(lookupConfig);
// Use in lookup join
tableEnv.executeSql(
"SELECT " +
" o.order_id," +
" o.customer_id," +
" c.customer_name " +
"FROM orders o " +
"LEFT JOIN customer_profiles FOR SYSTEM_TIME AS OF o.order_time AS c " +
" ON o.customer_id = c.customer_id"
);Configure options for specific tables via table properties:
-- Create table with specific performance settings
CREATE TABLE high_volume_data (
id BIGINT,
data STRING,
timestamp_col TIMESTAMP(3)
) PARTITIONED BY (
year INT,
month INT,
day INT
) WITH (
'connector' = 'hive',
'table.exec.hive.infer-source-parallelism' = 'true',
'table.exec.hive.infer-source-parallelism.max' = '1000',
'streaming-source.enable' = 'true',
'streaming-source.monitor-interval' = '30s',
'streaming-source.partition-order' = 'partition-time'
);
-- Create table optimized for lookup joins
CREATE TABLE reference_data (
key_id BIGINT,
reference_value STRING,
last_updated TIMESTAMP(3)
) WITH (
'connector' = 'hive',
'lookup.cache.ttl' = '1h'
);// Configuration optimized for development and testing
Configuration devConfig = new Configuration();
devConfig.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 10);
devConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(10));
devConfig.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(1));// Configuration optimized for production workloads
Configuration prodConfig = new Configuration();
prodConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
prodConfig.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 5000);
prodConfig.setInteger(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM, 20);
prodConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(5));
prodConfig.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofHours(6));The connector validates configuration values and provides helpful error messages:
// Invalid configuration will throw ConfigurationException
try {
Configuration config = new Configuration();
config.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, -1); // Invalid
tableEnv.getConfig().addConfiguration(config);
} catch (ValidationException e) {
System.err.println("Configuration error: " + e.getMessage());
}Some configuration options can be updated dynamically:
// Update configuration for existing table environment
Configuration dynamicConfig = new Configuration();
dynamicConfig.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(45));
tableEnv.getConfig().addConfiguration(dynamicConfig);
// Configuration takes effect for new queries
// Existing running queries retain their original configuration