or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-integration.mdconfiguration.mdfunction-module.mdindex.mdpartition-management.mdtable-source-sink.md
tile.json

configuration.mddocs/

Configuration and Options

Comprehensive configuration system for customizing Flink Hive connector behavior, performance tuning, and environment-specific settings. The configuration options control source/sink behavior, catalog integration, module loading, and performance optimizations.

Capabilities

HiveOptions

Core configuration options for Hive connector behavior and performance tuning.

/**
 * Configuration options for Hive connector behavior
 * Controls reader/writer fallback, streaming mode, caching, and partition handling
 */
public class HiveOptions {
    
    /**
     * Whether to fallback to MapRed reader for compatibility
     * Default: false
     * Type: Boolean
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
    
    /**
     * Whether to fallback to MapRed writer for compatibility  
     * Default: false
     * Type: Boolean
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
    
    /**
     * Enable streaming source mode for continuous monitoring
     * Default: false
     * Type: Boolean
     */
    public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
    
    /**
     * Monitoring interval for streaming source partition discovery
     * Default: 1 minute
     * Type: Duration
     */
    public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
    
    /**
     * Cache TTL for lookup join operations
     * Default: no caching
     * Type: Duration
     */
    public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
    
    /**
     * Whether to automatically infer source parallelism
     * Default: true  
     * Type: Boolean
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
    
    /**
     * Partition commit policy for sinks
     * Values: "metastore", "success-file", "metastore,success-file"
     * Default: "metastore"
     * Type: String
     */
    public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;
    
    /**
     * Trigger for partition commit
     * Values: "partition-time", "process-time"  
     * Default: "process-time"
     * Type: String
     */
    public static final ConfigOption<String> SINK_PARTITION_COMMIT_TRIGGER;
    
    /**
     * Delay before committing partitions
     * Default: 0 (immediate)
     * Type: Duration
     */
    public static final ConfigOption<Duration> SINK_PARTITION_COMMIT_DELAY;
    
    /**
     * Watermark timezone for partition-time commit trigger
     * Default: system timezone
     * Type: String
     */
    public static final ConfigOption<String> SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE;
    
    /**
     * Custom success file names for partition commit
     * Default: "_SUCCESS"
     * Type: String
     */
    public static final ConfigOption<String> SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
    
    /**
     * Enable reading partitions with subdirectories
     * Default: true
     * Type: Boolean
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;
    
    /**
     * Maximum inferred source parallelism
     * Default: 1000
     * Type: Integer
     */
    public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;
    
    /**
     * Thread count for loading partition splits
     * Default: 3
     * Type: Integer
     */
    public static final ConfigOption<Integer> TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM;
    
    /**
     * Maximum split size for Hive table reading
     * Default: 128MB
     * Type: MemorySize  
     */
    public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_SPLIT_MAX_BYTES;
    
    /**
     * Estimated cost to open a file for split calculation
     * Default: 4MB
     * Type: MemorySize
     */
    public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_FILE_OPEN_COST;
    
    /**
     * Thread count for calculating partition sizes
     * Default: 3
     * Type: Integer
     */
    public static final ConfigOption<Integer> TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM;
    
    /**
     * Enable dynamic partition grouping
     * Default: false
     * Type: Boolean
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_DYNAMIC_GROUPING_ENABLED;
    
    /**
     * Thread count for reading table/partition statistics
     * Default: 3
     * Type: Integer
     */
    public static final ConfigOption<Integer> TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM;
    
    /**
     * Average size threshold for small file compaction
     * Default: 16MB
     * Type: MemorySize
     */
    public static final ConfigOption<MemorySize> COMPACT_SMALL_FILES_AVG_SIZE;
    
    /**
     * Enable automatic statistics gathering for sink
     * Default: true
     * Type: Boolean
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE;
}

Configuration Usage Examples:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.hive.HiveOptions;

// Configure Hive connector options
Configuration config = new Configuration();

// Enable streaming mode with 30-second monitoring
config.set(HiveOptions.STREAMING_SOURCE_ENABLE, true);
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(30));

// Configure lookup join caching
config.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(10));

// Enable automatic parallelism inference
config.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);

// Configure partition commit policy
config.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");
config.set(HiveOptions.SINK_PARTITION_COMMIT_TRIGGER, "partition-time");
config.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofHours(1));

// Apply configuration to table environment
TableEnvironment tableEnv = TableEnvironment.create(
    EnvironmentSettings.newInstance()
        .withConfiguration(config)
        .build()
);

HiveCatalogFactoryOptions

Configuration options for HiveCatalog creation and metastore connection.

/**
 * Configuration options for HiveCatalogFactory
 * Controls catalog metadata, Hive configuration, and connection settings
 */
public class HiveCatalogFactoryOptions {
    
    /**
     * Default database name for the catalog
     * Default: "default"
     * Type: String
     */
    public static final ConfigOption<String> DEFAULT_DATABASE;
    
    /**
     * Directory containing Hive configuration files (hive-site.xml)
     * Default: null (use classpath)
     * Type: String
     */
    public static final ConfigOption<String> HIVE_CONF_DIR;
    
    /**
     * Hive version for compatibility
     * Supported: "2.3.4", "2.3.6", "2.3.9", "3.1.2", "3.1.3"
     * Default: auto-detected
     * Type: String
     */
    public static final ConfigOption<String> HIVE_VERSION;
    
    /**
     * Directory containing Hadoop configuration files (core-site.xml, hdfs-site.xml)
     * Default: null (use classpath)
     * Type: String
     */
    public static final ConfigOption<String> HADOOP_CONF_DIR;
}

Catalog Configuration Examples:

// Programmatic catalog configuration
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put("type", "hive");
catalogProperties.put("default-database", "analytics");
catalogProperties.put("hive-conf-dir", "/etc/hive/conf");
catalogProperties.put("hadoop-conf-dir", "/etc/hadoop/conf");
catalogProperties.put("hive-version", "2.3.9");

// Create catalog with properties
HiveCatalog catalog = new HiveCatalog(
    "production_hive",
    catalogProperties.get("default-database"),
    catalogProperties.get("hive-conf-dir"),
    catalogProperties.get("hadoop-conf-dir"),
    catalogProperties.get("hive-version")
);

// SQL DDL catalog creation
tableEnv.executeSql("""
    CREATE CATALOG production_hive WITH (
        'type' = 'hive',
        'default-database' = 'analytics',
        'hive-conf-dir' = '/etc/hive/conf',
        'hadoop-conf-dir' = '/etc/hadoop/conf',
        'hive-version' = '2.3.9'
    )
""");

HiveModuleOptions

Configuration options for HiveModule function loading and compatibility.

/**
 * Configuration options for HiveModule
 * Controls function loading and Hive version compatibility
 */
public class HiveModuleOptions {
    
    /**
     * Hive version for function compatibility
     * Default: auto-detected from classpath
     * Type: String
     */
    public static final ConfigOption<String> HIVE_VERSION;
}

Module Configuration Examples:

// Module configuration via factory
Map<String, String> moduleProperties = new HashMap<>();
moduleProperties.put("type", "hive");
moduleProperties.put("hive-version", "2.3.9");

// Load module with specific version
HiveModule module = new HiveModule("2.3.9");
tableEnv.loadModule("hive", module);

// SQL DDL module loading
tableEnv.executeSql("""
    LOAD MODULE hive WITH (
        'type' = 'hive',
        'hive-version' = '2.3.9'
    )
""");

Performance Configuration

Source Performance Options

// Optimize source performance
Configuration sourceConfig = new Configuration();

// Enable parallelism inference for optimal performance
sourceConfig.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);

// Configure reader fallback (disable for better performance)
sourceConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);

// Set optimal batch size for bulk reading
sourceConfig.setString("table.exec.resource.default-parallelism", "8");

// Configure memory for large datasets
sourceConfig.setString("taskmanager.memory.process.size", "4g");
sourceConfig.setString("taskmanager.memory.flink.size", "3g");

Sink Performance Options

// Optimize sink performance and reliability
Configuration sinkConfig = new Configuration();

// Configure partition commit for reliability
sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");
sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_TRIGGER, "partition-time");
sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofMinutes(5));

// Disable writer fallback for better performance
sinkConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);

// Configure checkpointing for exactly-once semantics
sinkConfig.setString("execution.checkpointing.interval", "30s");
sinkConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");

Streaming Configuration

// Configure streaming mode for real-time processing
Configuration streamingConfig = new Configuration();

// Enable streaming source
streamingConfig.set(HiveOptions.STREAMING_SOURCE_ENABLE, true);
streamingConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(10));

// Configure lookup join caching
streamingConfig.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(15));

// Set watermark configuration
streamingConfig.setString("table.exec.source.idle-timeout", "30s");
streamingConfig.setString("pipeline.time-characteristic", "EventTime");

Environment-Specific Configuration

Production Environment

// Production-ready configuration
Configuration prodConfig = new Configuration();

// Reliability settings
prodConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");
prodConfig.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofMinutes(10));

// Performance settings
prodConfig.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
prodConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);
prodConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);

// Memory and resource settings
prodConfig.setString("taskmanager.numberOfTaskSlots", "4");
prodConfig.setString("taskmanager.memory.process.size", "8g");
prodConfig.setString("jobmanager.memory.process.size", "2g");

// Checkpointing for fault tolerance
prodConfig.setString("execution.checkpointing.interval", "60s");
prodConfig.setString("state.backend", "rocksdb");
prodConfig.setString("state.checkpoints.dir", "hdfs://namenode:9000/flink/checkpoints");
prodConfig.setString("state.savepoints.dir", "hdfs://namenode:9000/flink/savepoints");

Development Environment

// Development-friendly configuration
Configuration devConfig = new Configuration();

// Fast feedback settings
devConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(5));
devConfig.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofSeconds(10));

// Simplified commit policy
devConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore");

// Reduced resource usage
devConfig.setString("taskmanager.numberOfTaskSlots", "2");
devConfig.setString("taskmanager.memory.process.size", "2g");

// Local state backend
devConfig.setString("state.backend", "filesystem");
devConfig.setString("state.checkpoints.dir", "file:///tmp/flink-checkpoints");

SQL Configuration

Table Properties Configuration

-- Configure Hive table with connector properties
CREATE TABLE streaming_events (
    user_id STRING,
    event_type STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) PARTITIONED BY (event_date STRING)
WITH (
    'connector' = 'hive',
    'streaming-source.enable' = 'true',
    'streaming-source.monitor-interval' = '10s',
    'sink.partition-commit.trigger' = 'partition-time',
    'sink.partition-commit.delay' = '1 h',
    'sink.partition-commit.policy.kind' = 'metastore,success-file'
);

-- Configure external table with custom properties
CREATE TABLE external_logs (
    log_level STRING,
    message STRING,
    log_time TIMESTAMP
) PARTITIONED BY (date_partition STRING)
WITH (
    'connector' = 'hive',
    'streaming-source.enable' = 'false',
    'lookup.join-cache.ttl' = '1 h'
);

Session Configuration

-- Set session-level configuration
SET 'table.exec.hive.infer-source-parallelism' = 'true';
SET 'table.exec.hive.fallback-mapred-reader' = 'false';
SET 'execution.checkpointing.interval' = '30s';

-- Configure catalog defaults
SET 'table.sql-dialect' = 'hive';
SET 'table.exec.hive.fallback-mapred-writer' = 'false';

Advanced Configuration Patterns

Multi-Cluster Configuration

// Configure for multi-cluster Hive setup
Configuration multiClusterConfig = new Configuration();

// Primary cluster configuration
Map<String, String> primaryCatalogProps = Map.of(
    "type", "hive",
    "default-database", "production",
    "hive-conf-dir", "/etc/hive/primary/conf",
    "hadoop-conf-dir", "/etc/hadoop/primary/conf",
    "hive-version", "2.3.9"
);

// Secondary cluster configuration  
Map<String, String> secondaryCatalogProps = Map.of(
    "type", "hive", 
    "default-database", "analytics",
    "hive-conf-dir", "/etc/hive/secondary/conf",
    "hadoop-conf-dir", "/etc/hadoop/secondary/conf",
    "hive-version", "2.3.9"
);

// Register multiple catalogs
tableEnv.executeSql("CREATE CATALOG primary_hive WITH " + formatProperties(primaryCatalogProps));
tableEnv.executeSql("CREATE CATALOG secondary_hive WITH " + formatProperties(secondaryCatalogProps));

// Cross-cluster queries
Table result = tableEnv.sqlQuery("""
    SELECT p.*, s.analytics_data
    FROM primary_hive.production.users p
    JOIN secondary_hive.analytics.user_metrics s
    ON p.user_id = s.user_id
""");

Version-Specific Configuration

// Handle different Hive versions
public class HiveConfigurationManager {
    
    public Configuration getHive239Configuration() {
        Configuration config = new Configuration();
        config.set(HiveCatalogFactoryOptions.HIVE_VERSION, "2.3.9");
        config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);
        config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
        return config;
    }
    
    public Configuration getHive313Configuration() {
        Configuration config = new Configuration();
        config.set(HiveCatalogFactoryOptions.HIVE_VERSION, "3.1.3");
        // Version 3.x specific optimizations
        config.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
        return config;
    }
}

Resource-Aware Configuration

// Configure based on available resources
public Configuration createResourceAwareConfig(int availableCores, long availableMemoryMB) {
    Configuration config = new Configuration();
    
    // Scale parallelism based on cores
    int parallelism = Math.max(1, availableCores / 2);
    config.setString("parallelism.default", String.valueOf(parallelism));
    
    // Configure memory based on available resources
    long taskManagerMemory = Math.min(availableMemoryMB / 2, 8192); // Max 8GB per TM
    config.setString("taskmanager.memory.process.size", taskManagerMemory + "m");
    
    // Adjust monitoring interval based on load
    Duration monitorInterval = availableCores > 8 ? 
        Duration.ofSeconds(5) : Duration.ofSeconds(30);
    config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, monitorInterval);
    
    return config;
}