CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6-2-11

Apache Flink SQL connector for Apache Hive 2.3.6 with Scala 2.11 binary compatibility

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration

Configuration options and factory classes for setting up Hive integration with customizable behavior for performance, compatibility, and operational requirements.

Capabilities

HiveOptions

Configuration options for tuning Hive connector behavior and performance.

/**
 * Configuration options for Hive connector operations
 */
public class HiveOptions {
    /**
     * Whether to use Hadoop MapRed record reader for ORC files
     * Default: false
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER =
        ConfigOptions.key("table.exec.hive.fallback-mapred-reader")
            .defaultValue(false)
            .withDescription(
                "If it is false, using flink native vectorized reader to read orc files; " +
                "If it is true, using hadoop mapred record reader to read orc files.");
    
    /**
     * Whether to infer source parallelism based on splits
     * Default: true
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
        ConfigOptions.key("table.exec.hive.infer-source-parallelism")
            .defaultValue(true)
            .withDescription(
                "If is false, parallelism of source are set by config.\n" +
                "If is true, source parallelism is inferred according to splits number.\n");
    
    /**
     * Maximum inferred parallelism for source operator
     * Default: 1000
     */
    public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =
        ConfigOptions.key("table.exec.hive.infer-source-parallelism.max")
            .defaultValue(1000)
            .withDescription("Sets max infer parallelism for source operator.");
    
    /**
     * Whether to use Hadoop MapRed record writer for Parquet and ORC files
     * Default: true
     */
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER =
        ConfigOptions.key("table.exec.hive.fallback-mapred-writer")
            .booleanType()
            .defaultValue(true)
            .withDescription(
                "If it is false, using flink native writer to write parquet and orc files; " +
                "If it is true, using hadoop mapred record writer to write parquet and orc files.");
}

Catalog Factory Configuration

Factory and configuration options for creating Hive catalog instances.

/**
 * Factory for creating HiveCatalog instances with configuration validation
 */
public class HiveCatalogFactory implements CatalogFactory {
    /**
     * Get the factory identifier for service discovery
     * @return "hive" identifier string
     */
    public String factoryIdentifier();
    
    /**
     * Create HiveCatalog from configuration context
     * @param context - Factory context with configuration options
     * @return Configured HiveCatalog instance
     */
    public Catalog createCatalog(Context context);
    
    /**
     * Get required configuration options
     * @return Set of required ConfigOption objects (empty for Hive)
     */
    public Set<ConfigOption<?>> requiredOptions();
    
    /**
     * Get optional configuration options
     * @return Set of optional ConfigOption objects
     */
    public Set<ConfigOption<?>> optionalOptions();
}

/**
 * Configuration options for HiveCatalogFactory
 */
public class HiveCatalogFactoryOptions {
    /**
     * Factory identifier for service discovery
     */
    public static final String IDENTIFIER = "hive";
    
    /**
     * Default database name for the catalog
     * Default: "default"
     */
    public static final ConfigOption<String> DEFAULT_DATABASE =
        ConfigOptions.key("default-database")
            .stringType()
            .defaultValue("default")
            .withDescription("Default database name for the catalog.");
    
    /**
     * Directory containing hive-site.xml configuration file
     * Default: null (uses classpath)
     */
    public static final ConfigOption<String> HIVE_CONF_DIR =
        ConfigOptions.key("hive-conf-dir")
            .stringType()
            .noDefaultValue()
            .withDescription("Directory containing hive-site.xml configuration file.");
    
    /**
     * Directory containing Hadoop configuration files
     * Default: null (uses classpath)
     */
    public static final ConfigOption<String> HADOOP_CONF_DIR =
        ConfigOptions.key("hadoop-conf-dir")
            .stringType()
            .noDefaultValue()
            .withDescription("Directory containing Hadoop configuration files.");
    
    /**
     * Hive version string for compatibility
     * Default: null (auto-detected)
     */
    public static final ConfigOption<String> HIVE_VERSION =
        ConfigOptions.key("hive-version")
            .stringType()
            .noDefaultValue()
            .withDescription("Hive version string for compatibility.");
}

Module Factory Configuration

Factory and configuration for HiveModule creation.

/**
 * Factory for creating HiveModule instances
 */
public class HiveModuleFactory implements ModuleFactory {
    /**
     * Get the factory identifier
     * @return "hive" identifier string
     */
    public String factoryIdentifier();
    
    /**
     * Create HiveModule from configuration context
     * @param context - Factory context with configuration options
     * @return Configured HiveModule instance
     */
    public Module createModule(Context context);
    
    /**
     * Get required configuration options
     * @return Set of required ConfigOption objects (empty for Hive module)
     */
    public Set<ConfigOption<?>> requiredOptions();
    
    /**
     * Get optional configuration options  
     * @return Set of optional ConfigOption objects
     */  
    public Set<ConfigOption<?>> optionalOptions();
}

/**
 * Configuration options for HiveModule
 */
public class HiveModuleOptions {
    /**
     * Hive version for function compatibility
     * Default: null (uses latest supported)
     */
    public static final ConfigOption<String> HIVE_VERSION =
        ConfigOptions.key("hive-version")
            .stringType()
            .noDefaultValue()
            .withDescription("Hive version for function compatibility.");
}

Streaming Source Configuration

Configuration options specific to streaming Hive sources.

/**
 * Configuration options from FileSystemConnectorOptions used by Hive connector
 */
public class FileSystemConnectorOptions {
    /**
     * Enable streaming source mode for partition monitoring
     * Default: false
     */
    public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
        ConfigOptions.key("streaming-source.enable")
            .booleanType()
            .defaultValue(false)
            .withDescription("Enable streaming source mode for partition monitoring.");
    
    /**
     * Which partitions to include in streaming mode
     * Options: "all", "latest"
     * Default: "all"
     */
    public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE =
        ConfigOptions.key("streaming-source.partition.include")
            .stringType()
            .defaultValue("all")
            .withDescription("Which partitions to include: 'all' or 'latest'.");
    
    /**
     * Interval for monitoring new partitions (in milliseconds)
     * Default: 60000 (1 minute)
     */
    public static final ConfigOption<Long> STREAMING_SOURCE_MONITOR_INTERVAL =
        ConfigOptions.key("streaming-source.monitor-interval")
            .longType()
            .defaultValue(60000L)
            .withDescription("Interval for monitoring new partitions in milliseconds.");
    
    /**
     * Configured parallelism for sink operations
     * Default: null (uses default parallelism)
     */
    public static final ConfigOption<Integer> SINK_PARALLELISM =
        ConfigOptions.key("sink.parallelism")
            .intType()
            .noDefaultValue()
            .withDescription("Configured parallelism for sink operations.");
}

Hadoop and Hive Configuration Utilities

Utility classes for managing Hadoop and Hive configuration.

/**
 * Utilities for managing Hive configuration
 */
public class HiveConfUtils {
    /**
     * Create HiveConf from configuration directory
     * @param hiveConfDir - Directory containing hive-site.xml (can be null)
     * @return Configured HiveConf instance
     */
    public static HiveConf create(String hiveConfDir);
    
    /**
     * Get Hive configuration with custom properties
     * @param hiveConf - Base Hive configuration
     * @param customProps - Additional properties to set
     * @return Updated HiveConf instance
     */
    public static HiveConf create(HiveConf hiveConf, Map<String, String> customProps);
}

/**
 * Utilities for managing Hadoop JobConf
 */
public class JobConfUtils {
    /**
     * Create JobConf with security credentials
     * @param hiveConf - Hive configuration to base JobConf on
     * @return JobConf with security credentials configured
     */
    public static JobConf createJobConfWithCredentials(HiveConf hiveConf);
    
    /**
     * Create JobConf with custom properties
     * @param hiveConf - Base Hive configuration
     * @param extraConf - Additional configuration properties
     * @return Configured JobConf instance
     */
    public static JobConf createJobConf(HiveConf hiveConf, Map<String, String> extraConf);
}

/**
 * Factory for creating Hadoop FileSystem instances
 */
public class HadoopFileSystemFactory {
    /**
     * Create file system factory with configuration
     * @param hadoopConf - Hadoop configuration
     */
    public HadoopFileSystemFactory(org.apache.hadoop.conf.Configuration hadoopConf);
    
    /**
     * Create file system for given URI
     * @param fsUri - File system URI
     * @return FileSystem instance
     * @throws IOException if creation fails
     */
    public FileSystem create(URI fsUri) throws IOException;
}

Dynamic Table Factory Configuration

Configuration for the dynamic table factory system.

/**
 * Configuration options for dynamic table operations
 */
public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    /**
     * Get factory identifier (throws UnsupportedOperationException)
     * Hive factory only works through catalog, not standalone
     * @return Not supported
     * @throws UnsupportedOperationException always
     */
    public String factoryIdentifier();
    
    /**
     * Get required options (throws UnsupportedOperationException)  
     * @return Not supported
     * @throws UnsupportedOperationException always
     */
    public Set<ConfigOption<?>> requiredOptions();
    
    /**
     * Get optional options (throws UnsupportedOperationException)
     * @return Not supported  
     * @throws UnsupportedOperationException always
     */
    public Set<ConfigOption<?>> optionalOptions();
    
    /**
     * Create dynamic table source based on context
     * @param context - Creation context with catalog table info
     * @return DynamicTableSource implementation
     */
    public DynamicTableSource createDynamicTableSource(Context context);
    
    /**
     * Create dynamic table sink based on context
     * @param context - Creation context with catalog table info
     * @return DynamicTableSink implementation
     */
    public DynamicTableSink createDynamicTableSink(Context context);
}

Usage Examples:

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.hive.HiveOptions;

// Configure Hive connector options
Configuration config = new Configuration();

// Use native Flink readers for better performance
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);

// Use native Flink writers for better performance
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);

// Enable source parallelism inference
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
config.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 500);

// Create table environment with configuration
TableEnvironment tableEnv = TableEnvironment.create(
    EnvironmentSettings.newInstance()
        .inBatchMode()
        .withConfiguration(config)
        .build()
);
// Create Hive catalog with full configuration options
Map<String, String> catalogOptions = new HashMap<>();
catalogOptions.put("type", "hive");
catalogOptions.put("default-database", "analytics");
catalogOptions.put("hive-conf-dir", "/opt/hive/conf");
catalogOptions.put("hadoop-conf-dir", "/opt/hadoop/etc/hadoop");
catalogOptions.put("hive-version", "2.3.6");

// Use catalog factory to create catalog
CatalogFactory.Context context = new CatalogFactory.Context() {
    public String getName() { return "hive_catalog"; }
    public Map<String, String> getOptions() { return catalogOptions; }
    public ReadableConfig getConfiguration() { return Configuration.fromMap(catalogOptions); }
    public ClassLoader getClassLoader() { return Thread.currentThread().getContextClassLoader(); }
};

HiveCatalogFactory factory = new HiveCatalogFactory();
Catalog hiveCatalog = factory.createCatalog(context);

tableEnv.registerCatalog("hive_catalog", hiveCatalog);
tableEnv.useCatalog("hive_catalog");
// Configure streaming Hive source
tableEnv.executeSql(
    "CREATE TABLE streaming_events (" +
    "  event_id BIGINT," +
    "  user_id BIGINT," +
    "  event_time TIMESTAMP(3)," +
    "  event_type STRING," +
    "  partition_date STRING" +
    ") PARTITIONED BY (partition_date) " +
    "STORED AS PARQUET " +
    "TBLPROPERTIES (" +
    "  'streaming-source.enable' = 'true'," +
    "  'streaming-source.partition.include' = 'all'," +
    "  'streaming-source.monitor-interval' = '30000'" + // 30 seconds
    ")"
);

// Query streaming table
Table result = tableEnv.sqlQuery(
    "SELECT event_type, COUNT(*) as event_count " +
    "FROM streaming_events " +
    "WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +
    "GROUP BY event_type"
);
// Load Hive module with specific version
Map<String, String> moduleOptions = new HashMap<>();
moduleOptions.put("hive-version", "2.3.6");

ModuleFactory.Context moduleContext = new ModuleFactory.Context() {
    public Map<String, String> getOptions() { return moduleOptions; }
    public ReadableConfig getConfiguration() { return Configuration.fromMap(moduleOptions); }
    public ClassLoader getClassLoader() { return Thread.currentThread().getContextClassLoader(); }
};

HiveModuleFactory moduleFactory = new HiveModuleFactory();
Module hiveModule = moduleFactory.createModule(moduleContext);

tableEnv.loadModule("hive", hiveModule);

Types

public interface ConfigOption<T> {
    /**
     * Get the option key
     * @return Configuration key string
     */
    String key();
    
    /**
     * Get the default value
     * @return Default value for this option
     */
    T defaultValue();
    
    /**
     * Get the option description
     * @return Human-readable description
     */
    String description();
}

public interface CatalogFactory extends Factory {
    /**
     * Create catalog from context
     * @param context - Creation context
     * @return Catalog instance
     */
    Catalog createCatalog(Context context);
    
    /**
     * Context interface for catalog creation
     */
    interface Context {
        String getName();
        Map<String, String> getOptions();
        ReadableConfig getConfiguration();
        ClassLoader getClassLoader();
    }
}

public interface ModuleFactory extends Factory {
    /**
     * Create module from context
     * @param context - Creation context
     * @return Module instance
     */
    Module createModule(Context context);
    
    /**
     * Context interface for module creation
     */
    interface Context {
        Map<String, String> getOptions();
        ReadableConfig getConfiguration();
        ClassLoader getClassLoader();
    }
}

public interface Factory {
    /**
     * Get unique factory identifier
     * @return Factory identifier string
     */
    String factoryIdentifier();
    
    /**
     * Get required configuration options
     * @return Set of required options
     */
    Set<ConfigOption<?>> requiredOptions();
    
    /**
     * Get optional configuration options
     * @return Set of optional options
     */
    Set<ConfigOption<?>> optionalOptions();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6-2-11

docs

catalog-operations.md

configuration.md

hive-functions.md

index.md

source-api.md

table-sinks.md

table-sources.md

tile.json