Apache Flink SQL connector for Apache Hive 3.1.2, enabling unified batch and stream processing with Hive tables.
—
Comprehensive configuration system providing fine-grained control over connector behavior, performance tuning, and feature enablement. The configuration options cover reading, writing, streaming, lookup joins, and advanced optimization scenarios.
Central configuration class containing all Hive connector-specific options for controlling behavior and performance.
/**
* Configuration options for Hive connector behavior and performance tuning
* All options can be set in Flink configuration or as table properties
*/
public class HiveOptions {
// Reading Configuration Options
/**
* Whether to fallback to Hadoop MapReduce RecordReader for file reading
* Default: false (use native vectorized readers when possible)
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
/**
* Whether to read files in subdirectories of partitions
* Default: false
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;
/**
* Whether to infer source parallelism based on number of files/splits
* Default: true
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
/**
* Maximum size of a single split for reading
* Default: 128MB
*/
public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_SPLIT_MAX_SIZE;
/**
* Estimated cost of opening a file (used for split calculation)
* Default: 4MB
*/
public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_FILE_OPEN_COST;
// Writing Configuration Options
/**
* Whether to fallback to Hadoop MapReduce OutputFormat for writing
* Default: false (use native writers when possible)
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
/**
* Whether to sort records by dynamic partition columns before writing
* Default: false
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_SORT_BY_DYNAMIC_PARTITION_ENABLE;
/**
* Whether to automatically gather table statistics after writing
* Default: false
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE;
/**
* Partition commit policy for determining when partitions are ready
* Options: "metastore", "success-file", "metastore,success-file"
* Default: (not set, partitions committed immediately)
*/
public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;
// Streaming Configuration Options
/**
* Whether to enable streaming source mode for continuous partition monitoring
* Default: false
*/
public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
/**
* Which partitions to include in streaming mode
* Options: "all" (all partitions), "latest" (only latest partition)
* Default: "all"
*/
public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE;
/**
* Interval for monitoring new partitions in streaming mode
* Default: 1 minute
*/
public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
/**
* Starting point for consuming partitions in streaming mode
* Format: "yyyy-MM-dd HH:mm:ss" or partition name pattern
* Default: (consume from latest available partition)
*/
public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET;
/**
* Ordering strategy for processing partitions in streaming mode
*/
public static final ConfigOption<PartitionOrder> STREAMING_SOURCE_PARTITION_ORDER;
// Lookup Join Configuration Options
/**
* Time-to-live for cached lookup results
* Default: 60 minutes
*/
public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
/**
* Maximum number of entries in lookup cache
* Default: 10000
*/
public static final ConfigOption<Integer> LOOKUP_JOIN_CACHE_MAX_SIZE;
// Partition Ordering Enum
/**
* Enumeration for partition ordering strategies in streaming mode
*/
public enum PartitionOrder {
/** Order by partition creation time in Hive metastore */
CREATE_TIME,
/** Order by partition time extracted from partition name */
PARTITION_TIME,
/** Order by partition name alphabetically */
PARTITION_NAME
}
}Configuration options specific to HiveCatalog creation and behavior.
/**
* Configuration options for HiveCatalog factory
*/
public class HiveCatalogFactoryOptions {
/**
* Path to directory containing hive-site.xml and other Hive configuration files
* Required for catalog creation
*/
public static final ConfigOption<String> HIVE_CONF_DIR;
/**
* Hive version for compatibility and feature support
* Default: "3.1.2"
*/
public static final ConfigOption<String> HIVE_VERSION;
/**
* Path to directory containing Hadoop configuration files (core-site.xml, hdfs-site.xml)
* Optional, but recommended for HDFS and other Hadoop filesystem access
*/
public static final ConfigOption<String> HADOOP_CONF_DIR;
/**
* Default database name to use when none is specified
* Default: "default"
*/
public static final ConfigOption<String> DEFAULT_DATABASE;
}Configuration options for HiveServer2-compatible endpoint in Flink SQL Gateway.
/**
* Configuration options for HiveServer2 endpoint
*/
public class HiveServer2EndpointConfigOptions {
// Server Configuration
/**
* Host address for Thrift server
* Default: "localhost"
*/
public static final ConfigOption<String> THRIFT_HOST;
/**
* Port number for Thrift server
* Default: 10000
*/
public static final ConfigOption<Integer> THRIFT_PORT;
/**
* Minimum number of worker threads for handling client connections
* Default: 5
*/
public static final ConfigOption<Integer> THRIFT_WORKER_THREADS_MIN;
/**
* Maximum number of worker threads for handling client connections
* Default: 500
*/
public static final ConfigOption<Integer> THRIFT_WORKER_THREADS_MAX;
/**
* Maximum message size for Thrift communication
* Default: 104857600 (100MB)
*/
public static final ConfigOption<MemorySize> THRIFT_MAX_MESSAGE_SIZE;
// Session Configuration
/**
* Default catalog name for new sessions
* Default: "default_catalog"
*/
public static final ConfigOption<String> CATALOG_DEFAULT_NAME;
/**
* Default database name for new sessions
* Default: "default"
*/
public static final ConfigOption<String> CATALOG_DEFAULT_DATABASE;
/**
* Module configuration for built-in function access
*/
public static final ConfigOption<String> MODULE_NAME;
}-- Table-level configuration
CREATE TABLE hive_streaming_source (
id BIGINT,
data STRING,
event_time TIMESTAMP(3),
partition_hour STRING
) PARTITIONED BY (partition_hour)
WITH (
'connector' = 'hive',
-- Streaming configuration
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '5 min',
'streaming-source.consume-start-offset' = '2023-01-01 00:00:00',
'streaming-source.partition-order' = 'CREATE_TIME',
-- Reading optimization
'table.exec.hive.infer-source-parallelism' = 'true',
'table.exec.hive.split-max-size' = '256MB',
-- Lookup join caching
'lookup.join.cache.ttl' = '30 min',
'lookup.join.cache.max-size' = '5000'
);
-- Sink table configuration
CREATE TABLE hive_sink (
id BIGINT,
processed_data STRING,
partition_date STRING
) PARTITIONED BY (partition_date)
WITH (
'connector' = 'hive',
-- Writing configuration
'sink.partition-commit.policy.kind' = 'metastore,success-file',
'table.exec.hive.sink.sort-by-dynamic-partition.enable' = 'true',
'table.exec.hive.sink.statistic-auto-gather.enable' = 'true',
-- Performance tuning
'table.exec.hive.fallback-mapred-writer' = 'false'
);
-- Catalog configuration
CREATE CATALOG production_hive WITH (
'type' = 'hive',
'hive-conf-dir' = '/opt/hive/conf',
'hadoop-conf-dir' = '/opt/hadoop/conf',
'hive-version' = '3.1.2',
'default-database' = 'analytics'
);// Flink configuration for global settings
Configuration flinkConfig = new Configuration();
// Reading optimization
flinkConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
flinkConfig.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(256));
flinkConfig.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(8));
// Writing optimization
flinkConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
flinkConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_SINK_SORT_BY_DYNAMIC_PARTITION_ENABLE, true);
// Streaming configuration
flinkConfig.setBoolean(HiveOptions.STREAMING_SOURCE_ENABLE, true);
flinkConfig.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "latest");
flinkConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(5));
// Create environment with configuration
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.withConfiguration(flinkConfig)
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);// Optimize for large files with few splits
config.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(512));
config.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(16));
// Optimize for many small files
config.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(64));
config.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(1));
// Enable parallelism inference for optimal resource usage
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);// High-frequency partition updates
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(1));
config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "latest");
// Batch processing of historical data
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(30));
config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "all");
config.setString(HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET, "2023-01-01 00:00:00");// Enable dynamic partition sorting for better compression
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_SINK_SORT_BY_DYNAMIC_PARTITION_ENABLE, true);
// Configure partition commit policies
config.setString(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");
// Enable automatic statistics gathering
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE, true);# Relaxed settings for development
table.exec.hive.infer-source-parallelism=true
table.exec.hive.split-max-size=64MB
streaming-source.monitor-interval=30s
lookup.join.cache.ttl=5min# Optimized settings for production
table.exec.hive.infer-source-parallelism=true
table.exec.hive.split-max-size=256MB
table.exec.hive.file-open-cost=8MB
streaming-source.monitor-interval=5min
streaming-source.partition-order=CREATE_TIME
sink.partition-commit.policy.kind=metastore,success-file
table.exec.hive.sink.statistic-auto-gather.enable=true
lookup.join.cache.ttl=60min
lookup.join.cache.max-size=50000Common configuration issues and their solutions:
Split Size Issues:
// Problem: Too many small splits causing overhead
// Solution: Increase split size and file open cost
config.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(256));
config.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(8));Streaming Lag Issues:
// Problem: Partitions not being detected quickly enough
// Solution: Reduce monitor interval and optimize partition ordering
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(1));
config.set(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER, PartitionOrder.CREATE_TIME);Memory Issues in Lookup Joins:
// Problem: Lookup cache consuming too much memory
// Solution: Reduce cache size and TTL
config.set(HiveOptions.LOOKUP_JOIN_CACHE_MAX_SIZE, 5000);
config.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(30));Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12