or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mdconfiguration.mddatastream-source.mdhive-functions.mdindex.mdtable-api.md
tile.json

tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6_2-12

Apache Flink SQL connector for Apache Hive 2.3.6 providing integration between Flink and Hive for reading/writing Hive tables and using Hive Metastore as a catalog.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-sql-connector-hive-2.3.6_2.12@1.15.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6_2-12@1.15.0

index.mddocs/

Apache Flink Hive Connector 2.3.6

Apache Flink SQL connector for Apache Hive 2.3.6 that provides comprehensive integration between Flink and Hive, enabling both batch and streaming access to Hive tables, Hive catalog integration, and Hive function support. This connector serves as a bridge between Flink's unified stream and batch processing capabilities and the Hive data warehouse ecosystem.

Package Information

  • Package Name: flink-sql-connector-hive-2.3.6_2.12
  • Package Type: maven
  • Language: Java
  • Installation: Add to Maven dependencies:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
    <version>1.15.4</version>
</dependency>

For runtime classpath (if not using uber JAR):

# Download and place in Flink lib directory
cp flink-sql-connector-hive-2.3.6_2.12-1.15.4.jar $FLINK_HOME/lib/

Core Imports

// Catalog integration
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;

// DataStream API source
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.connectors.hive.HiveTablePartition;

// Table API integration
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.HiveTableSink;
import org.apache.flink.connectors.hive.HiveLookupTableSource;
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;

// Module for Hive functions
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.table.module.hive.HiveModuleOptions;

// Configuration options
import org.apache.flink.connectors.hive.HiveOptions;

// Exception handling
import org.apache.flink.connectors.hive.FlinkHiveException;

// Annotations for nullability
import javax.annotation.Nullable;
import javax.annotation.Nonnull;

Basic Usage

Catalog Integration

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

// Create table environment
EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inBatchMode()
    .build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// Create and register Hive catalog
String catalogName = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/opt/hive-conf";
String hadoopConfDir = "/opt/hadoop-conf";
String hiveVersion = "2.3.6";

HiveCatalog hive = new HiveCatalog(
    catalogName, defaultDatabase, hiveConfDir, hadoopConfDir, hiveVersion);
tableEnv.registerCatalog(catalogName, hive);
tableEnv.useCatalog(catalogName);

// Use Hive tables with SQL
tableEnv.executeSql("SELECT * FROM my_hive_table").print();

DataStream API Usage

import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create job configuration for Hive
Configuration jobConf = new Configuration();
// Set Hive metastore URI and other configs
jobConf.set("hive.metastore.uris", "thrift://localhost:9083");

// Build Hive source
HiveSource<RowData> source = new HiveSourceBuilder(
    jobConf,
    env.getConfiguration(),
    "2.3.6",
    "default",
    "my_table",
    Collections.emptyMap()
).buildWithDefaultBulkFormat();

// Add source to stream
env.fromSource(source, WatermarkStrategy.noWatermarks(), "hive-source")
   .print();

env.execute("Hive Stream Job");

Module Registration for Hive Functions

import org.apache.flink.table.module.hive.HiveModule;

// Register Hive module to access Hive built-in functions
tableEnv.loadModule("hive", new HiveModule("2.3.6"));

// Use Hive functions in SQL
tableEnv.executeSql("SELECT concat('Hello', ' ', 'World')").print();

Architecture

The Flink Hive Connector is built around several key components:

  • Catalog Integration: HiveCatalog provides full metastore integration for persistent table metadata
  • Source/Sink Components: HiveSource, HiveTableSource, and HiveTableSink for data access
  • Module System: HiveModule enables access to Hive built-in functions within Flink SQL
  • Configuration Management: Comprehensive options for tuning performance and behavior
  • Version Abstraction: Shim system supporting multiple Hive versions with consistent API
  • File Format Support: Works with Parquet, ORC, text files, and other Hadoop-compatible formats

Capabilities

Catalog Integration

Complete Hive metastore integration allowing Flink to use Hive as a persistent catalog for storing table definitions, schemas, and metadata across sessions.

HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir, 
           @Nullable String hadoopConfDir, @Nullable String hiveVersion);

// Database operations
void createDatabase(CatalogDatabase database, boolean ignoreIfExists);
CatalogDatabase getDatabase(String databaseName);
List<String> listDatabases();

// Table operations  
void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
CatalogBaseTable getTable(ObjectPath tablePath);
List<String> listTables(String databaseName);

Catalog Integration

DataStream Source

Low-level streaming and batch source for reading Hive tables directly in DataStream API programs with full control over parallelism, partitioning, and data formats.

class HiveSourceBuilder {
    HiveSourceBuilder(@Nonnull JobConf jobConf, @Nonnull ReadableConfig flinkConf, @Nullable String hiveVersion,
                     @Nonnull String dbName, @Nonnull String tableName, @Nonnull Map<String, String> tableOptions);
    
    HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
    HiveSourceBuilder setLimit(@Nullable Long limit);
    HiveSourceBuilder setProjectedFields(int[] projectedFields);
    
    HiveSource<RowData> buildWithDefaultBulkFormat();
    <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
}

class HiveTablePartition {
    static HiveTablePartition ofTable(HiveConf hiveConf, @Nullable String hiveVersion, 
                                     String dbName, String tableName);
    static HiveTablePartition ofPartition(HiveConf hiveConf, @Nullable String hiveVersion,
                                         String dbName, String tableName, 
                                         LinkedHashMap<String, String> partitionSpec);
}

DataStream Source

Table API Integration

High-level Table API integration providing HiveTableSource and HiveTableSink for seamless SQL access to Hive tables with pushdown optimizations.

// Created automatically via catalog registration
// Supports predicate pushdown, projection pushdown, partition pruning
interface SupportsPartitionPushDown {
    Result applyPartitions(List<Map<String, String>> remainingPartitions);
}

interface SupportsProjectionPushDown {
    boolean supportsNestedProjection();
    void applyProjection(int[][] projectedFields);
}

interface SupportsLimitPushDown {
    void applyLimit(long limit);
}

Table API Integration

Hive Functions

Module system integration enabling access to Hive built-in functions within Flink SQL, including string functions, date functions, and mathematical operations.

class HiveModule implements Module {
    HiveModule();
    HiveModule(String hiveVersion);
    
    Set<String> listFunctions();
    Optional<FunctionDefinition> getFunctionDefinition(String name);
    String getHiveVersion();
}

Hive Functions

Configuration Options

Comprehensive configuration system for tuning connector behavior, performance optimization, and streaming source configuration.

class HiveOptions {
    // Performance tuning
    ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
    ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;
    
    // Streaming source options
    ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
    ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
    ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET;
    
    // Lookup join caching
    ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
}

enum PartitionOrder {
    CREATE_TIME, PARTITION_TIME, PARTITION_NAME
}

Configuration Options

Types

class FlinkHiveException extends RuntimeException {
    FlinkHiveException(String message);
    FlinkHiveException(Throwable cause);
    FlinkHiveException(String message, Throwable cause);
}

class HiveSourceSplit extends FileSourceSplit {
    // Represents a split of Hive data for parallel processing
}

interface HiveFunction {
    void setArgumentTypesAndConstants(Object[] constantArguments, DataType[] argTypes);
    DataType getHiveResultType(Object[] constantArguments, DataType[] argTypes);
}