or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mdconfiguration.mddata-source.mdfunctions.mdindex.mdtable-api.md
tile.json

configuration.mddocs/

Configuration

Comprehensive configuration system for tuning Hive connector behavior, performance optimization, and feature toggles. The configuration options control various aspects of data reading, writing, and metadata operations.

Capabilities

HiveOptions

Main configuration class containing all Hive connector-specific options.

/**
 * Configuration options for Hive connector behavior
 */
public class HiveOptions {
    
    /** Fallback to MapReduce reader when vectorized reader fails */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
    
    /** Automatically infer source parallelism based on file splits */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
    
    /** Maximum parallelism when inferring source parallelism */
    public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;
    
    /** Fallback to MapReduce writer when vectorized writer fails */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
}

Configuration Categories

Source Configuration

Options for controlling how data is read from Hive tables.

/** Enable fallback to MapReduce reader for compatibility */
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER = 
    ConfigOptions.key("table.exec.hive.fallback-mapred-reader")
        .booleanType()
        .defaultValue(false)
        .withDescription("Whether to fallback to MapReduce reader when vectorized reader fails");

/** Automatically infer parallelism from input splits */
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM = 
    ConfigOptions.key("table.exec.hive.infer-source-parallelism")
        .booleanType() 
        .defaultValue(true)
        .withDescription("Whether to infer source parallelism based on number of file splits");

/** Maximum inferred parallelism limit */
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX = 
    ConfigOptions.key("table.exec.hive.infer-source-parallelism.max")
        .intType()
        .defaultValue(1000)
        .withDescription("Maximum parallelism that can be inferred for Hive sources");

Usage Examples:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;

// Configure table environment
Configuration config = new Configuration();

// Enable MapReduce reader fallback for compatibility
config.setBoolean("table.exec.hive.fallback-mapred-reader", true);

// Configure parallelism inference
config.setBoolean("table.exec.hive.infer-source-parallelism", true);
config.setInteger("table.exec.hive.infer-source-parallelism.max", 500);

// Apply configuration to table environment
TableEnvironment tableEnv = TableEnvironment.create(
    EnvironmentSettings.newInstance().withConfiguration(config).build()
);

Sink Configuration

Options for controlling how data is written to Hive tables.

/** Enable fallback to MapReduce writer for compatibility */
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER = 
    ConfigOptions.key("table.exec.hive.fallback-mapred-writer")
        .booleanType()
        .defaultValue(true)  
        .withDescription("Whether to fallback to MapReduce writer when vectorized writer fails");

Usage Examples:

// Enable MapReduce writer fallback
config.setBoolean("table.exec.hive.fallback-mapred-writer", true);

// Write to Hive table with fallback enabled
tableEnv.executeSql("INSERT INTO hive_table SELECT * FROM source_table");

Streaming Configuration

Options specific to streaming mode operations.

// Configure streaming-specific options
config.setString("table.exec.source.idle-timeout", "10s");
config.setString("partition.discovery.interval-millis", "60000");  // 1 minute
config.setString("source.monitor-interval", "30s");

Performance Tuning

Configuration options for optimizing performance based on workload characteristics.

// Optimize for large batch workloads
config.setBoolean("table.exec.hive.infer-source-parallelism", true);
config.setInteger("table.exec.hive.infer-source-parallelism.max", 2000);

// Optimize for small files
config.setBoolean("table.exec.hive.fallback-mapred-reader", false);  // Use vectorized reader

// Memory optimization
config.setString("table.exec.resource.default-parallelism", "4");

Global Flink Configuration

Table API Configuration

Configure table-level behavior that affects Hive integration:

Configuration config = new Configuration();

// Source configuration
config.setString("table.exec.source.idle-timeout", "0");  // No timeout for batch
config.setBoolean("table.exec.source.parallelism-inference.enabled", true);

// Sink configuration  
config.setBoolean("table.exec.sink.not-null-enforcer", false);
config.setString("table.exec.sink.upsert-materialize", "none");

// General table configuration
config.setString("table.sql-dialect", "hive");  // Use Hive SQL dialect

Execution Configuration

Configure Flink execution parameters that impact Hive connector performance:

// Checkpointing for streaming jobs
config.setString("execution.checkpointing.interval", "300s");
config.setString("state.backend", "rocksdb");

// Memory configuration
config.setString("taskmanager.memory.process.size", "4gb");
config.setString("jobmanager.memory.process.size", "1gb");

// Parallelism configuration
config.setInteger("parallelism.default", 4);

Configuration Examples

Batch Processing Configuration

Optimal configuration for large batch processing workloads:

Configuration batchConfig = new Configuration();

// Disable streaming features
batchConfig.setString("table.exec.source.idle-timeout", "0");

// Optimize parallelism
batchConfig.setBoolean("table.exec.hive.infer-source-parallelism", true);
batchConfig.setInteger("table.exec.hive.infer-source-parallelism.max", 1000);

// Use vectorized readers for performance
batchConfig.setBoolean("table.exec.hive.fallback-mapred-reader", false);

// Memory optimization
batchConfig.setString("table.exec.resource.default-parallelism", "8");

Streaming Configuration

Configuration for continuous streaming from Hive tables:

Configuration streamConfig = new Configuration();

// Enable streaming features
streamConfig.setString("table.exec.source.idle-timeout", "30s");

// Partition monitoring
streamConfig.setString("partition.discovery.interval-millis", "60000");

// Moderate parallelism for streaming
streamConfig.setBoolean("table.exec.hive.infer-source-parallelism", true);
streamConfig.setInteger("table.exec.hive.infer-source-parallelism.max", 200);

// Checkpointing
streamConfig.setString("execution.checkpointing.interval", "300s");

High-Throughput Configuration

Configuration for maximum throughput scenarios:

Configuration highThroughputConfig = new Configuration();

// Maximize parallelism
highThroughputConfig.setInteger("table.exec.hive.infer-source-parallelism.max", 2000);
highThroughputConfig.setBoolean("table.exec.hive.infer-source-parallelism", true);

// Use vectorized operations
highThroughputConfig.setBoolean("table.exec.hive.fallback-mapred-reader", false);
highThroughputConfig.setBoolean("table.exec.hive.fallback-mapred-writer", false);

// Memory optimization
highThroughputConfig.setString("taskmanager.memory.process.size", "8gb");
highThroughputConfig.setString("taskmanager.memory.managed.fraction", "0.6");

Compatibility Configuration

Configuration for maximum compatibility with various Hive setups:

Configuration compatConfig = new Configuration();

// Enable fallbacks for compatibility
compatConfig.setBoolean("table.exec.hive.fallback-mapred-reader", true);
compatConfig.setBoolean("table.exec.hive.fallback-mapred-writer", true);

// Conservative parallelism
compatConfig.setInteger("table.exec.hive.infer-source-parallelism.max", 100);

// Use Hive SQL dialect
compatConfig.setString("table.sql-dialect", "hive");

Dynamic Configuration

Runtime Configuration Changes

Some configuration options can be changed at runtime:

// Change configuration during job execution
tableEnv.getConfig().getConfiguration()
    .setBoolean("table.exec.hive.fallback-mapred-reader", true);

// Apply configuration to specific queries
tableEnv.executeSql("SET 'table.exec.hive.infer-source-parallelism.max' = '500'");

Per-Table Configuration

Configure options for specific Hive tables:

// Create table with specific options
tableEnv.executeSql(
    "CREATE TABLE hive_table (...) " +
    "WITH (" +
    "'connector' = 'hive', " +
    "'table.exec.hive.fallback-mapred-reader' = 'true'" +
    ")"
);