CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12

Apache Flink SQL connector for Apache Hive 3.1.2, enabling unified batch and stream processing with Hive tables.

Pending
Overview
Eval results
Files

factory-registration.mddocs/

Factory Registration

Factory classes that enable automatic connector discovery through Flink's Service Provider Interface (SPI). These factories handle connector instantiation, configuration validation, and provide the entry points for using the Hive connector in Flink applications.

Capabilities

Hive Catalog Factory

Creates HiveCatalog instances for integrating with Hive metastore, enabling unified metadata management across Flink and Hive systems.

/**
 * Factory for creating HiveCatalog instances through Flink's catalog discovery mechanism
 * Factory identifier: "hive"
 */
public class HiveCatalogFactory implements CatalogFactory {
    /** Factory identifier constant */
    public static final String IDENTIFIER = "hive";
    
    /**
     * Returns the unique identifier for this catalog factory
     * @return "hive"
     */
    public String factoryIdentifier();
    
    /**
     * Returns the set of configuration options supported by this factory
     * Required options: default-database
     * @return Set of required configuration options
     */
    public Set<ConfigOption<?>> requiredOptions();
    
    /**
     * Returns the set of optional configuration options
     * Optional options: hive-conf-dir, hive-version, hadoop-conf-dir, property-version
     * @return Set of optional configuration options  
     */
    public Set<ConfigOption<?>> optionalOptions();
    
    /**
     * Creates a new HiveCatalog instance based on the provided context
     * @param context Factory context containing configuration and class loader
     * @return Configured HiveCatalog instance
     */
    public Catalog createCatalog(Context context);
}

Usage Example:

-- SQL DDL usage (automatic factory discovery)
CREATE CATALOG hive_catalog WITH (
    'type' = 'hive',
    'hive-conf-dir' = '/opt/hive/conf',
    'hive-version' = '3.1.2',
    'hadoop-conf-dir' = '/opt/hadoop/conf',
    'default-database' = 'default'
);

USE CATALOG hive_catalog;

Configuration Options:

  • hive-conf-dir (required): Path to Hive configuration directory containing hive-site.xml
  • hive-version (optional): Hive version for compatibility, defaults to "3.1.2"
  • hadoop-conf-dir (optional): Path to Hadoop configuration directory
  • default-database (optional): Default database name, defaults to "default"

Hive Dynamic Table Factory

Creates HiveTableSource and HiveTableSink instances using Flink's modern dynamic table interface, supporting both batch and streaming operations.

/**
 * Dynamic table factory for creating Hive table sources and sinks
 * Factory identifier: "hive" 
 */
public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    /** Factory identifier constant */
    public static final String IDENTIFIER = "hive";
    
    /**
     * Constructor requiring HiveConf instance
     * @param hiveConf Hive configuration instance
     */
    public HiveDynamicTableFactory(HiveConf hiveConf);
    
    /**
     * Returns the unique identifier for this table factory
     * @return "hive"
     */
    public String factoryIdentifier();
    
    /**
     * Returns required configuration options
     * @return Set of required configuration options
     */
    public Set<ConfigOption<?>> requiredOptions();
    
    /**
     * Returns optional configuration options
     * @return Set of optional configuration options
     */
    public Set<ConfigOption<?>> optionalOptions();
    
    /**
     * Creates a DynamicTableSource for reading from Hive tables
     * @param context Factory context with table schema and options
     * @return Configured HiveTableSource instance
     */
    public DynamicTableSource createDynamicTableSource(Context context);
    
    /**
     * Creates a DynamicTableSink for writing to Hive tables  
     * @param context Factory context with table schema and options
     * @return Configured HiveTableSink instance
     */
    public DynamicTableSink createDynamicTableSink(Context context);
}

Usage Example:

-- Creating a Hive table with streaming source enabled
CREATE TABLE hive_stream_table (
    id BIGINT,
    name STRING,
    event_time TIMESTAMP(3),
    partition_date STRING
) PARTITIONED BY (partition_date)
WITH (
    'connector' = 'hive',
    'streaming-source.enable' = 'true',
    'streaming-source.partition.include' = 'latest',
    'streaming-source.monitor-interval' = '1 min'
);

Hive Module Factory

Creates HiveModule instances that provide access to Hive built-in functions within Flink SQL environments.

/**
 * Factory for creating HiveModule instances to access Hive built-in functions
 * Factory identifier: "hive"
 */
public class HiveModuleFactory implements ModuleFactory {
    /** Factory identifier constant */
    public static final String IDENTIFIER = "hive";
    
    /**
     * Returns the unique identifier for this module factory
     * @return "hive"
     */
    public String factoryIdentifier();
    
    /**
     * Returns required configuration options
     * @return Set of required configuration options (empty for HiveModule)
     */
    public Set<ConfigOption<?>> requiredOptions();
    
    /**
     * Returns optional configuration options
     * Optional options: hive-version
     * @return Set of optional configuration options
     */
    public Set<ConfigOption<?>> optionalOptions();
    
    /**
     * Creates a new HiveModule instance
     * @param context Factory context containing configuration
     * @return Configured HiveModule instance
     */
    public Module createModule(Context context);
}

Usage Example:

-- Load Hive module to access Hive functions
LOAD MODULE hive WITH ('hive-version' = '3.1.2');

-- Use Hive built-in functions
SELECT id, name, substr(name, 1, 3) as name_prefix FROM my_table;

Hive Dialect Factory

Creates Hive SQL dialect parser for executing Hive-compatible SQL statements within Flink.

/**
 * Factory for creating Hive SQL dialect parser
 * Factory identifier: "hive"
 */
public class HiveDialectFactory implements SqlDialectFactory {
    /**
     * Returns the unique identifier for this dialect factory
     * @return "hive"
     */
    public String factoryIdentifier();
    
    /**
     * Creates a new SqlDialect instance for parsing Hive SQL
     * @param context Factory context
     * @return Configured Hive SQL dialect
     */
    public SqlDialect createSqlDialect(Context context);
}

Usage Example:

// Programmatic usage
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

// Execute Hive-compatible SQL
tableEnv.executeSql("CREATE TABLE hive_table AS SELECT * FROM source_table");

HiveServer2 Endpoint Factory

Creates HiveServer2-compatible endpoints for JDBC/ODBC access to Flink through the SQL Gateway.

/**
 * Factory for creating HiveServer2-compatible endpoints
 */
public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory {
    /**
     * Returns the unique identifier for this endpoint factory
     * @return Factory identifier string
     */
    public String factoryIdentifier();
    
    /**
     * Creates a new HiveServer2 endpoint
     * @param context Factory context with configuration
     * @return Configured HiveServer2 endpoint
     */
    public SqlGatewayEndpoint createSqlGatewayEndpoint(Context context);
}

Configuration Options:

  • thrift.host: Thrift server host address
  • thrift.port: Thrift server port number
  • thrift.worker.threads.min: Minimum worker threads
  • thrift.worker.threads.max: Maximum worker threads
  • thrift.max.message.size: Maximum message size

Legacy Table Factory

Provides backward compatibility with older Flink table factory interface.

/**
 * Legacy table factory for backward compatibility
 * @deprecated Use HiveDynamicTableFactory instead
 */
@Deprecated
public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Row> {
    /**
     * Creates a table source using legacy interface
     * @param properties Configuration properties
     * @return Configured table source
     */
    public TableSource<Row> createTableSource(Map<String, String> properties);
    
    /**
     * Creates a table sink using legacy interface  
     * @param properties Configuration properties
     * @return Configured table sink
     */
    public TableSink<Row> createTableSink(Map<String, String> properties);
}

Factory Discovery

All factories are automatically discovered through Java's Service Provider Interface (SPI) mechanism. The connector JAR includes the necessary service registration files in META-INF/services/ that enable Flink to find and instantiate the appropriate factory classes based on the specified type or connector identifier.

Service Registration Files:

  • META-INF/services/org.apache.flink.table.factories.Factory
  • META-INF/services/org.apache.flink.table.factories.CatalogFactory
  • META-INF/services/org.apache.flink.table.factories.ModuleFactory

This automatic discovery means that simply including the connector JAR in the classpath makes all Hive integration capabilities available without additional configuration steps.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12

docs

catalog-integration.md

configuration-management.md

factory-registration.md

function-module.md

index.md

lookup-joins.md

table-sources-sinks.md

tile.json