or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mdconfiguration.mddatastream-source.mdhive-functions.mdindex.mdtable-api.md
tile.json

configuration.mddocs/

Configuration Options

Comprehensive configuration system for tuning connector behavior, performance optimization, and streaming source configuration.

Capabilities

HiveOptions

Central configuration class containing all options for customizing Hive connector behavior, performance tuning, and streaming source settings.

/**
 * Configuration options for Hive connector behavior and performance tuning
 * All options can be set via Flink configuration or table properties
 */
class HiveOptions {
    
    // Performance and Reader Configuration
    /**
     * Whether to infer parallelism for Hive source based on splits
     * When enabled, Flink automatically determines optimal parallelism from file splits
     */
    ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM = ConfigOptions
        .key("table.exec.hive.infer-source-parallelism")
        .booleanType()
        .defaultValue(true);
    
    /**
     * Maximum parallelism when inferring source parallelism
     * Limits the maximum number of parallel subtasks for Hive sources
     */
    ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX = ConfigOptions
        .key("table.exec.hive.infer-source-parallelism.max")
        .intType()
        .defaultValue(1000);
    
    /**
     * Whether to fallback to MapRed reader when native readers fail
     * Provides compatibility fallback for reading complex file formats
     */
    ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER = ConfigOptions
        .key("table.exec.hive.fallback-mapred-reader")
        .booleanType()
        .defaultValue(false);
    
    /**
     * Whether to fallback to MapRed writer when native writers fail
     * Provides compatibility fallback for writing complex file formats
     */
    ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER = ConfigOptions
        .key("table.exec.hive.fallback-mapred-writer")
        .booleanType()
        .defaultValue(true);
    
    /**
     * Number of threads for loading partition splits in parallel
     * Higher values improve partition discovery performance for tables with many partitions
     */
    ConfigOption<Integer> TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM = ConfigOptions
        .key("table.exec.hive.load-partition-splits.thread-num")
        .intType()
        .defaultValue(3);
    
    // Streaming Source Configuration
    /**
     * Enable streaming mode for Hive sources
     * When enabled, continuously monitors for new partitions
     */
    ConfigOption<Boolean> STREAMING_SOURCE_ENABLE = ConfigOptions
        .key("streaming-source.enable")
        .booleanType()
        .defaultValue(false);
    
    /**
     * Strategy for including partitions in streaming source
     * Controls which partitions are included when monitoring for new data
     */
    ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE = ConfigOptions
        .key("streaming-source.partition.include")
        .stringType()
        .defaultValue("all");
    
    /**
     * Interval for monitoring new partitions in streaming mode
     * How frequently to check for new partitions when streaming is enabled
     */
    ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL = ConfigOptions
        .key("streaming-source.monitor-interval")
        .durationType()
        .noDefaultValue();
    
    /**
     * Starting offset for consuming partitions in streaming mode
     * Controls from which point to start consuming when starting streaming job
     */
    ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET = ConfigOptions
        .key("streaming-source.consume-start-offset")
        .stringType()
        .noDefaultValue();
    
    /**
     * Order for consuming partitions in streaming mode
     * Determines the order in which partitions are processed
     */
    ConfigOption<String> STREAMING_SOURCE_PARTITION_ORDER = ConfigOptions
        .key("streaming-source.partition-order")
        .stringType()
        .defaultValue("partition-name");
    
    // Lookup Join Configuration
    /**
     * TTL for lookup join cache entries
     * How long to cache lookup results before refreshing from Hive table
     */
    ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL = ConfigOptions
        .key("lookup.cache.ttl")
        .durationType()
        .defaultValue(Duration.ofHours(1));
}

Usage Examples:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.hive.HiveOptions;

// Configure via Flink Configuration
Configuration config = new Configuration();

// Performance tuning
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
config.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 500);
config.setInteger(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM, 5);

// Streaming configuration
config.setBoolean(HiveOptions.STREAMING_SOURCE_ENABLE, true);
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(5));
config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "latest");
config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER, "partition-time");
config.setString(HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET, "2023-01-01");

// Lookup join caching
config.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(30));

// Apply to table environment
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.getConfig().addConfiguration(config);

PartitionOrder Enum

Enumeration defining the order in which partitions are processed in streaming mode.

/**
 * Enumeration for partition ordering strategies in streaming mode
 * Controls the sequence in which partitions are consumed
 */
enum PartitionOrder {
    /**
     * Order partitions by their creation time in Hive metastore
     * Processes partitions in the order they were added to the table
     */
    CREATE_TIME,
    
    /**
     * Order partitions by partition time extracted from partition values
     * Requires partition columns that can be interpreted as timestamps
     */
    PARTITION_TIME,
    
    /**
     * Order partitions lexicographically by partition name
     * Uses string comparison of partition path names
     */
    PARTITION_NAME
}

Usage Examples:

// Configure partition ordering
config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER, "partition-time");

// Example table with time-based partitions
tableEnv.executeSql(
    "CREATE TABLE time_series_data (" +
    "  sensor_id STRING," +
    "  measurement DOUBLE," +
    "  event_time TIMESTAMP(3)" +
    ") PARTITIONED BY (" +
    "  year INT," +
    "  month INT," +
    "  day INT" +
    ") WITH (" +
    "  'connector' = 'hive'," +
    "  'streaming-source.enable' = 'true'," +
    "  'streaming-source.partition-order' = 'partition-time'" +
    ")"
);

Configuration Categories

Performance Optimization

// Optimize for large tables with many partitions
Configuration perfConfig = new Configuration();

// Enable parallelism inference for optimal resource usage
perfConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
perfConfig.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 2000);

// Increase partition loading threads for faster metadata discovery
perfConfig.setInteger(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM, 10);

// Enable fallback readers for compatibility
perfConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);

Streaming Source Configuration

// Configure for streaming ingestion
Configuration streamConfig = new Configuration();

// Enable continuous partition monitoring
streamConfig.setBoolean(HiveOptions.STREAMING_SOURCE_ENABLE, true);

// Monitor for new partitions every 30 seconds
streamConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(30));

// Only include recent partitions (last 7 days)
streamConfig.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "latest");

// Start consuming from specific date
streamConfig.setString(HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET, "2023-12-01");

// Process partitions in chronological order
streamConfig.setString(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER, "partition-time");

Lookup Join Optimization

// Optimize lookup joins for better performance
Configuration lookupConfig = new Configuration();

// Cache lookup results for 15 minutes
lookupConfig.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(15));

// Apply configuration
tableEnv.getConfig().addConfiguration(lookupConfig);

// Use in lookup join
tableEnv.executeSql(
    "SELECT " +
    "  o.order_id," +
    "  o.customer_id," +
    "  c.customer_name " +
    "FROM orders o " +
    "LEFT JOIN customer_profiles FOR SYSTEM_TIME AS OF o.order_time AS c " +
    "  ON o.customer_id = c.customer_id"
);

Table-Specific Configuration

Configure options for specific tables via table properties:

-- Create table with specific performance settings
CREATE TABLE high_volume_data (
  id BIGINT,
  data STRING,
  timestamp_col TIMESTAMP(3)
) PARTITIONED BY (
  year INT,
  month INT,
  day INT
) WITH (
  'connector' = 'hive',
  'table.exec.hive.infer-source-parallelism' = 'true',
  'table.exec.hive.infer-source-parallelism.max' = '1000',
  'streaming-source.enable' = 'true',
  'streaming-source.monitor-interval' = '30s',
  'streaming-source.partition-order' = 'partition-time'
);

-- Create table optimized for lookup joins
CREATE TABLE reference_data (
  key_id BIGINT,
  reference_value STRING,
  last_updated TIMESTAMP(3)
) WITH (
  'connector' = 'hive',
  'lookup.cache.ttl' = '1h'
);

Environment-Specific Configuration

Development Environment

// Configuration optimized for development and testing
Configuration devConfig = new Configuration();
devConfig.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 10);
devConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(10));
devConfig.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(1));

Production Environment

// Configuration optimized for production workloads
Configuration prodConfig = new Configuration();
prodConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
prodConfig.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 5000);
prodConfig.setInteger(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM, 20);
prodConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(5));
prodConfig.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofHours(6));

Configuration Validation

The connector validates configuration values and provides helpful error messages:

// Invalid configuration will throw ConfigurationException
try {
    Configuration config = new Configuration();
    config.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, -1); // Invalid
    tableEnv.getConfig().addConfiguration(config);
} catch (ValidationException e) {
    System.err.println("Configuration error: " + e.getMessage());
}

Dynamic Configuration Updates

Some configuration options can be updated dynamically:

// Update configuration for existing table environment
Configuration dynamicConfig = new Configuration();
dynamicConfig.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(45));
tableEnv.getConfig().addConfiguration(dynamicConfig);

// Configuration takes effect for new queries
// Existing running queries retain their original configuration