CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12

Apache Flink SQL connector for Apache Hive 3.1.2, enabling unified batch and stream processing with Hive tables.

Pending
Overview
Eval results
Files

configuration-management.mddocs/

Configuration Management

Comprehensive configuration system providing fine-grained control over connector behavior, performance tuning, and feature enablement. The configuration options cover reading, writing, streaming, lookup joins, and advanced optimization scenarios.

Capabilities

Core Configuration Options

Central configuration class containing all Hive connector-specific options for controlling behavior and performance.

/**
 * Configuration options for Hive connector behavior and performance tuning
 * All options can be set in Flink configuration or as table properties
 */
public class HiveOptions {
    
    // Reading Configuration Options
    
    /**
     * Whether to fallback to Hadoop MapReduce RecordReader for file reading
     * Default: false (use native vectorized readers when possible)
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
    
    /**
     * Whether to read files in subdirectories of partitions
     * Default: false
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;
    
    /**
     * Whether to infer source parallelism based on number of files/splits
     * Default: true
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
    
    /**
     * Maximum size of a single split for reading
     * Default: 128MB
     */
    public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_SPLIT_MAX_SIZE;
    
    /**
     * Estimated cost of opening a file (used for split calculation)
     * Default: 4MB
     */
    public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_FILE_OPEN_COST;
    
    // Writing Configuration Options
    
    /**
     * Whether to fallback to Hadoop MapReduce OutputFormat for writing
     * Default: false (use native writers when possible)
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
    
    /**
     * Whether to sort records by dynamic partition columns before writing
     * Default: false
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_SORT_BY_DYNAMIC_PARTITION_ENABLE;
    
    /**
     * Whether to automatically gather table statistics after writing
     * Default: false
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE;
    
    /**
     * Partition commit policy for determining when partitions are ready
     * Options: "metastore", "success-file", "metastore,success-file"
     * Default: (not set, partitions committed immediately)
     */
    public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;
    
    // Streaming Configuration Options
    
    /**
     * Whether to enable streaming source mode for continuous partition monitoring
     * Default: false
     */
    public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
    
    /**
     * Which partitions to include in streaming mode
     * Options: "all" (all partitions), "latest" (only latest partition)
     * Default: "all"
     */
    public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE;
    
    /**
     * Interval for monitoring new partitions in streaming mode
     * Default: 1 minute
     */
    public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
    
    /**
     * Starting point for consuming partitions in streaming mode
     * Format: "yyyy-MM-dd HH:mm:ss" or partition name pattern
     * Default: (consume from latest available partition)
     */
    public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET;
    
    /**
     * Ordering strategy for processing partitions in streaming mode
     */
    public static final ConfigOption<PartitionOrder> STREAMING_SOURCE_PARTITION_ORDER;
    
    // Lookup Join Configuration Options
    
    /**
     * Time-to-live for cached lookup results
     * Default: 60 minutes
     */
    public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
    
    /**
     * Maximum number of entries in lookup cache
     * Default: 10000
     */
    public static final ConfigOption<Integer> LOOKUP_JOIN_CACHE_MAX_SIZE;
    
    // Partition Ordering Enum
    
    /**
     * Enumeration for partition ordering strategies in streaming mode
     */
    public enum PartitionOrder {
        /** Order by partition creation time in Hive metastore */
        CREATE_TIME,
        /** Order by partition time extracted from partition name */  
        PARTITION_TIME,
        /** Order by partition name alphabetically */
        PARTITION_NAME
    }
}

Catalog Configuration Options

Configuration options specific to HiveCatalog creation and behavior.

/**
 * Configuration options for HiveCatalog factory
 */
public class HiveCatalogFactoryOptions {
    
    /**
     * Path to directory containing hive-site.xml and other Hive configuration files
     * Required for catalog creation
     */
    public static final ConfigOption<String> HIVE_CONF_DIR;
    
    /**
     * Hive version for compatibility and feature support
     * Default: "3.1.2"
     */
    public static final ConfigOption<String> HIVE_VERSION;
    
    /**
     * Path to directory containing Hadoop configuration files (core-site.xml, hdfs-site.xml)
     * Optional, but recommended for HDFS and other Hadoop filesystem access
     */
    public static final ConfigOption<String> HADOOP_CONF_DIR;
    
    /**
     * Default database name to use when none is specified
     * Default: "default"
     */
    public static final ConfigOption<String> DEFAULT_DATABASE;
}

HiveServer2 Endpoint Configuration

Configuration options for HiveServer2-compatible endpoint in Flink SQL Gateway.

/**
 * Configuration options for HiveServer2 endpoint
 */
public class HiveServer2EndpointConfigOptions {
    
    // Server Configuration
    
    /**
     * Host address for Thrift server
     * Default: "localhost"
     */
    public static final ConfigOption<String> THRIFT_HOST;
    
    /**
     * Port number for Thrift server
     * Default: 10000
     */
    public static final ConfigOption<Integer> THRIFT_PORT;
    
    /**
     * Minimum number of worker threads for handling client connections
     * Default: 5
     */
    public static final ConfigOption<Integer> THRIFT_WORKER_THREADS_MIN;
    
    /**
     * Maximum number of worker threads for handling client connections
     * Default: 500
     */
    public static final ConfigOption<Integer> THRIFT_WORKER_THREADS_MAX;
    
    /**
     * Maximum message size for Thrift communication
     * Default: 104857600 (100MB)
     */
    public static final ConfigOption<MemorySize> THRIFT_MAX_MESSAGE_SIZE;
    
    // Session Configuration
    
    /**
     * Default catalog name for new sessions
     * Default: "default_catalog"
     */
    public static final ConfigOption<String> CATALOG_DEFAULT_NAME;
    
    /**
     * Default database name for new sessions
     * Default: "default"
     */
    public static final ConfigOption<String> CATALOG_DEFAULT_DATABASE;
    
    /**
     * Module configuration for built-in function access
     */
    public static final ConfigOption<String> MODULE_NAME;
}

Configuration Usage Patterns

SQL DDL Configuration

-- Table-level configuration
CREATE TABLE hive_streaming_source (
    id BIGINT,
    data STRING,
    event_time TIMESTAMP(3),
    partition_hour STRING
) PARTITIONED BY (partition_hour)
WITH (
    'connector' = 'hive',
    
    -- Streaming configuration
    'streaming-source.enable' = 'true',
    'streaming-source.partition.include' = 'latest', 
    'streaming-source.monitor-interval' = '5 min',
    'streaming-source.consume-start-offset' = '2023-01-01 00:00:00',
    'streaming-source.partition-order' = 'CREATE_TIME',
    
    -- Reading optimization
    'table.exec.hive.infer-source-parallelism' = 'true',
    'table.exec.hive.split-max-size' = '256MB',
    
    -- Lookup join caching
    'lookup.join.cache.ttl' = '30 min',
    'lookup.join.cache.max-size' = '5000'
);

-- Sink table configuration
CREATE TABLE hive_sink (
    id BIGINT,
    processed_data STRING,
    partition_date STRING
) PARTITIONED BY (partition_date)
WITH (
    'connector' = 'hive',
    
    -- Writing configuration
    'sink.partition-commit.policy.kind' = 'metastore,success-file',
    'table.exec.hive.sink.sort-by-dynamic-partition.enable' = 'true',
    'table.exec.hive.sink.statistic-auto-gather.enable' = 'true',
    
    -- Performance tuning
    'table.exec.hive.fallback-mapred-writer' = 'false'
);

-- Catalog configuration
CREATE CATALOG production_hive WITH (
    'type' = 'hive',
    'hive-conf-dir' = '/opt/hive/conf',
    'hadoop-conf-dir' = '/opt/hadoop/conf', 
    'hive-version' = '3.1.2',
    'default-database' = 'analytics'
);

Programmatic Configuration

// Flink configuration for global settings
Configuration flinkConfig = new Configuration();

// Reading optimization
flinkConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
flinkConfig.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(256));
flinkConfig.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(8));

// Writing optimization  
flinkConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
flinkConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_SINK_SORT_BY_DYNAMIC_PARTITION_ENABLE, true);

// Streaming configuration
flinkConfig.setBoolean(HiveOptions.STREAMING_SOURCE_ENABLE, true);
flinkConfig.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "latest");
flinkConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(5));

// Create environment with configuration
EnvironmentSettings settings = EnvironmentSettings.newInstance()
    .withConfiguration(flinkConfig)
    .build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

Performance Tuning Guidelines

Reading Performance

// Optimize for large files with few splits
config.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(512));
config.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(16));

// Optimize for many small files
config.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(64));
config.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(1));

// Enable parallelism inference for optimal resource usage
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);

Streaming Performance

// High-frequency partition updates
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(1));
config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "latest");

// Batch processing of historical data
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(30));
config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "all");
config.setString(HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET, "2023-01-01 00:00:00");

Writing Performance

// Enable dynamic partition sorting for better compression
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_SINK_SORT_BY_DYNAMIC_PARTITION_ENABLE, true);

// Configure partition commit policies
config.setString(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");

// Enable automatic statistics gathering
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE, true);

Environment-Specific Configuration

Development Environment

# Relaxed settings for development
table.exec.hive.infer-source-parallelism=true
table.exec.hive.split-max-size=64MB
streaming-source.monitor-interval=30s
lookup.join.cache.ttl=5min

Production Environment

# Optimized settings for production
table.exec.hive.infer-source-parallelism=true
table.exec.hive.split-max-size=256MB
table.exec.hive.file-open-cost=8MB
streaming-source.monitor-interval=5min
streaming-source.partition-order=CREATE_TIME
sink.partition-commit.policy.kind=metastore,success-file
table.exec.hive.sink.statistic-auto-gather.enable=true
lookup.join.cache.ttl=60min
lookup.join.cache.max-size=50000

Configuration Validation and Troubleshooting

Common configuration issues and their solutions:

Split Size Issues:

// Problem: Too many small splits causing overhead
// Solution: Increase split size and file open cost
config.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(256));
config.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(8));

Streaming Lag Issues:

// Problem: Partitions not being detected quickly enough  
// Solution: Reduce monitor interval and optimize partition ordering
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(1));
config.set(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER, PartitionOrder.CREATE_TIME);

Memory Issues in Lookup Joins:

// Problem: Lookup cache consuming too much memory
// Solution: Reduce cache size and TTL
config.set(HiveOptions.LOOKUP_JOIN_CACHE_MAX_SIZE, 5000);
config.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(30));

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12

docs

catalog-integration.md

configuration-management.md

factory-registration.md

function-module.md

index.md

lookup-joins.md

table-sources-sinks.md

tile.json