or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-integration.mdconfiguration.mdfunction-module.mdindex.mdpartition-management.mdtable-source-sink.md
tile.json

tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-9_2-12

Apache Flink SQL connector for Apache Hive 2.3.9 - enables reading from and writing to Hive tables

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-sql-connector-hive-2.3.9_2.12@1.19.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-9_2-12@1.19.0

index.mddocs/

Apache Flink SQL Connector for Hive 2.3.9

Apache Flink SQL connector for Apache Hive 2.3.9 enables seamless integration between Flink's streaming and batch processing capabilities and Apache Hive data warehouses. This connector provides comprehensive access to Hive tables, metastore operations, and built-in functions, allowing developers to leverage existing Hive infrastructure within Flink applications.

Package Information

  • Package Name: flink-sql-connector-hive-2.3.9_2.12
  • Package Type: maven
  • Group ID: org.apache.flink
  • Language: Java
  • Installation: Add dependency to pom.xml
  • Hive Version: 2.3.9
  • Scala Version: 2.12
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-hive-2.3.9_2.12</artifactId>
    <version>1.19.3</version>
</dependency>

Core Imports

// Main factories
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
import org.apache.flink.table.module.hive.HiveModule;

// Source and sink classes
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.HiveTableSink;

// Configuration
import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;

// Data structures
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.FlinkHiveException;

Basic Usage

Table API Integration

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

// Create table environment
EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inBatchMode()
    .build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// Create and register Hive catalog
String catalogName = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/opt/hive-conf";
String version = "2.3.9";

HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, null, version);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");

// Query Hive tables
Table result = tableEnv.sqlQuery("SELECT * FROM mytable WHERE active = true");
result.execute().print();

DataStream API Integration

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Build Hive source
JobConf jobConf = new JobConf();
jobConf.set("hive.metastore.uris", "thrift://localhost:9083");

HiveSource<RowData> source = new HiveSourceBuilder(
    jobConf,
    new Configuration(),
    "2.3.9", // hiveVersion
    "mydb", 
    "mytable",
    Collections.emptyMap()
).buildWithDefaultBulkFormat();

// Create data stream
DataStream<RowData> stream = env
    .fromSource(source, WatermarkStrategy.noWatermarks(), "hive-source");

stream.print();
env.execute("Hive Stream Processing");

Architecture

The Flink Hive connector is built around several key architectural components:

  • Factory System: Plugin-based factories (HiveDynamicTableFactory, HiveCatalogFactory) for dynamic registration and configuration
  • Source/Sink Framework: Unified data access layer with streaming and batch support through HiveSource and HiveTableSink
  • Catalog Integration: Full Hive metastore integration via HiveCatalog for schema discovery and table management
  • Function Module: Native Hive function support through HiveModule for UDF compatibility
  • Partition Management: Intelligent partition handling with pruning and dynamic discovery capabilities
  • Type System: Seamless type mapping between Hive and Flink data types with full serialization support

Capabilities

Table Source and Sink Operations

Core functionality for reading from and writing to Hive tables using Flink's Table API and DataStream API. Supports both batch and streaming modes with comprehensive partition handling.

// Main source builder
public class HiveSourceBuilder {
    public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf, 
                           String hiveVersion, String dbName, 
                           String tableName, Map<String, String> tableOptions);
    public HiveSource<RowData> buildWithDefaultBulkFormat();
    public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
    public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
    public HiveSourceBuilder setLimit(Long limit);
    public HiveSourceBuilder setProjectedFields(int[] projectedFields);
}

// Dynamic table source
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown, 
                                       SupportsProjectionPushDown, SupportsLimitPushDown {
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
    public Result applyPartitions(List<Map<String, String>> remainingPartitions);
    public Result applyProjection(int[][] projectedFields, DataType producedDataType);
    public Result applyLimit(long limit);
}

// Dynamic table sink  
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
    public Result applyStaticPartition(Map<String, String> partition);
    public Result applyOverwrite(boolean overwrite);
}

Table Source and Sink Operations

Catalog Integration

Complete Hive metastore integration providing schema discovery, table management, and metadata operations. Enables transparent access to existing Hive data warehouses.

public class HiveCatalog extends AbstractCatalog {
    public HiveCatalog(String catalogName, String defaultDatabase, 
                      String hiveConfDir, String hadoopConfDir, String hiveVersion);
    public List<String> listDatabases() throws DatabaseNotExistException, CatalogException;
    public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException;
    public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException;
    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath);
}

public class HiveCatalogFactory implements CatalogFactory {
    public Catalog createCatalog(Context context);
    public String factoryIdentifier();
}

Catalog Integration

Function Module Integration

Hive built-in function support enabling use of Hive UDFs within Flink SQL queries. Provides seamless function compatibility and registration.

public class HiveModule implements Module {
    public HiveModule();
    public HiveModule(String hiveVersion);
    public Set<String> listFunctions();
    public Optional<FunctionDefinition> getFunctionDefinition(String name);
}

Function Module Integration

Configuration and Options

Comprehensive configuration system for customizing connector behavior, performance tuning, and environment-specific settings.

public class HiveOptions {
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
    public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
    public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
    public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
    public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;
}

public class HiveCatalogFactoryOptions {
    public static final ConfigOption<String> DEFAULT_DATABASE;
    public static final ConfigOption<String> HIVE_CONF_DIR;
    public static final ConfigOption<String> HIVE_VERSION;
    public static final ConfigOption<String> HADOOP_CONF_DIR;
}

Configuration and Options

Partition Management and Utilities

Advanced partition handling utilities for efficient data access, partition pruning, and metadata management in partitioned Hive tables.

public class HiveTablePartition implements Serializable {
    public static HiveTablePartition ofTable(StorageDescriptor storageDescriptor, 
                                            Map<String, String> tableParameters);
    public static HiveTablePartition ofPartition(StorageDescriptor storageDescriptor,
                                                Map<String, String> partitionSpec,
                                                Map<String, String> tableParameters);
    public StorageDescriptor getStorageDescriptor();
    public Map<String, String> getPartitionSpec();
}

public class HivePartitionUtils {
    public static List<HiveTablePartition> getAllPartitions(JobConf jobConf, String catalogName,
                                                           ObjectPath tablePath, List<String> partitionColNames);
    public static byte[] serializeHiveTablePartition(List<HiveTablePartition> partitions);
    public static List<HiveTablePartition> deserializeHiveTablePartition(byte[] bytes, ClassLoader classLoader);
}

Partition Management and Utilities

Types

// Core exception type
public class FlinkHiveException extends RuntimeException {
    public FlinkHiveException(String message);
    public FlinkHiveException(Throwable cause);
    public FlinkHiveException(String message, Throwable cause);
}

// Configuration wrapper
public class JobConfWrapper implements Serializable {
    public JobConfWrapper(JobConf jobConf);
    public JobConf conf();
}