or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-integration.mdconfiguration-management.mdfactory-registration.mdfunction-module.mdindex.mdlookup-joins.mdtable-sources-sinks.md
tile.json

tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12

Apache Flink SQL connector for Apache Hive 3.1.2, enabling unified batch and stream processing with Hive tables.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-sql-connector-hive-3.1.2_2.12@1.16.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12@1.16.0

index.mddocs/

Apache Flink SQL Connector for Hive 3.1.2

The Apache Flink SQL Connector for Hive 3.1.2 enables seamless integration between Apache Flink and Apache Hive 3.1.2, providing unified batch and stream processing capabilities with Hive tables. This connector allows Flink to read from and write to Hive tables, supports both partitioned and non-partitioned tables in streaming and batch modes, and includes comprehensive catalog integration for metadata management.

Package Information

  • Package Name: flink-sql-connector-hive-3.1.2_2.12
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-sql-connector-hive-3.1.2_2.12
  • Version: 1.16.3
  • Installation: Add Maven dependency in pom.xml:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-hive-3.1.2_2.12</artifactId>
    <version>1.16.3</version>
</dependency>

Core Imports

// Factory classes for programmatic usage
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
import org.apache.flink.table.module.hive.HiveModuleFactory;

// Core connector classes
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.HiveTableSink;
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;

// Catalog and module classes
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;

// Configuration and options
import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;

Basic Usage

SQL DDL Usage (Most Common)

-- Create Hive catalog
CREATE CATALOG hive_catalog WITH (
    'type' = 'hive',
    'hive-conf-dir' = '/path/to/hive/conf',
    'hive-version' = '3.1.2'
);

-- Use the Hive catalog
USE CATALOG hive_catalog;
USE default;

-- Read from existing Hive table
SELECT * FROM my_hive_table WHERE partition_date = '2023-01-01';

-- Create and write to new Hive table
CREATE TABLE new_hive_table (
    id BIGINT,
    name STRING,
    event_time TIMESTAMP,
    partition_date STRING
) PARTITIONED BY (partition_date)
STORED AS PARQUET
TBLPROPERTIES (
    'sink.partition-commit.policy.kind' = 'metastore,success-file'
);

INSERT INTO new_hive_table SELECT id, name, event_time, '2023-01-01' FROM source_table;

Programmatic Table API Usage

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

// Create table environment
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// Create Hive catalog
String catalogName = "hive_catalog";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive/conf";
String hiveVersion = "3.1.2";

HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, hiveVersion);
tableEnv.registerCatalog(catalogName, hive);
tableEnv.useCatalog(catalogName);

// Execute queries
tableEnv.executeSql("SELECT * FROM my_hive_table LIMIT 10").print();

DataStream API Usage

import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

HiveSource<RowData> hiveSource = new HiveSourceBuilder()
    .setTableIdentifier(tableIdentifier)
    .setHiveConfiguration(hiveConf)
    .build();

env.fromSource(hiveSource, WatermarkStrategy.noWatermarks(), "Hive Source")
   .print();

env.execute("Hive Streaming Job");

Architecture

The Flink Hive Connector is built around several key components:

  • Factory System: SPI-based factory classes for automatic connector discovery and configuration
  • Table Connectors: Source and sink implementations supporting both batch and streaming modes
  • Catalog Integration: HiveCatalog providing unified metadata management across Flink and Hive
  • Module System: HiveModule enabling access to Hive built-in functions within Flink SQL
  • Configuration Management: Comprehensive options for fine-tuning performance and behavior
  • Shaded Dependencies: Self-contained JAR with Hive 3.1.2 dependencies to avoid conflicts

The connector supports both legacy and modern Flink table interfaces, enabling seamless migration from older versions while providing access to the latest features.

Capabilities

Factory Registration

Factory classes that enable automatic connector discovery through Flink's Service Provider Interface (SPI). These factories handle connector instantiation and configuration validation.

public class HiveCatalogFactory implements CatalogFactory {
    public String factoryIdentifier(): "hive";
}

public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    public String factoryIdentifier(): "hive";
}

public class HiveModuleFactory implements ModuleFactory {
    public String factoryIdentifier(): "hive";
}

Factory Registration

Table Sources and Sinks

Core table connector implementations providing read and write capabilities for Hive tables with support for batch processing, streaming ingestion, partition management, and performance optimizations.

public class HiveTableSource implements ScanTableSource, 
    SupportsPartitionPushDown, SupportsProjectionPushDown, SupportsLimitPushDown {
    // Batch and streaming source for Hive tables
}

public class HiveTableSink implements DynamicTableSink {
    // Sink for writing to Hive tables with partition support
}

public final class HiveSource<T> implements Source<T, HiveSplitEnumeratorState, HiveSourceSplit> {
    // Low-level source using new Source API
}

Table Sources and Sinks

Catalog Integration

Hive catalog implementation that provides unified metadata management, enabling Flink to access Hive databases, tables, partitions, and functions through the Hive metastore.

public class HiveCatalog extends AbstractCatalog {
    public HiveCatalog(String catalogName, String defaultDatabase, 
                       String hiveConfDir, String hiveVersion);
    
    public static boolean isHiveTable(Map<String, String> properties);
}

Catalog Integration

Function Module

Module system providing access to Hive built-in functions within Flink SQL, enabling compatibility with existing Hive UDFs and maintaining function behavior consistency.

public class HiveModule implements Module {
    public HiveModule(String hiveVersion);
}

Function Module

Configuration Management

Comprehensive configuration options for controlling connector behavior, performance tuning, and feature enablement across reading, writing, streaming, and lookup join scenarios.

public class HiveOptions {
    // Reading options
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;
    public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;
    public static final ConfigOption<Integer> TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM;
    public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_SPLIT_MAX_BYTES;
    public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_FILE_OPEN_COST;
    public static final ConfigOption<Integer> TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM;
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_DYNAMIC_GROUPING_ENABLED;
    
    // Writing options  
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;
    public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;
    public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_CLASS;
    public static final ConfigOption<String> SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
    public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE;
    public static final ConfigOption<Integer> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_THREAD_NUM;
    
    // Streaming options
    public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;
    public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE;
    public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;
    public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET;
    public static final ConfigOption<PartitionOrder> STREAMING_SOURCE_PARTITION_ORDER;
    
    // Lookup join options
    public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;
}

Configuration Management

Lookup Joins

Specialized lookup table source for dimension table joins, providing caching capabilities and optimized access patterns for real-time data enrichment scenarios.

public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
    // Lookup join capabilities for dimension tables
}

Lookup Joins

Types

Core Interfaces and Classes

// Exception handling
public class FlinkHiveException extends RuntimeException {
    public FlinkHiveException(String message);
    public FlinkHiveException(Throwable cause);
    public FlinkHiveException(String message, Throwable cause);
}

// Partition representation
public class HiveTablePartition {
    // Constructors
    public HiveTablePartition(StorageDescriptor storageDescriptor, Properties tableProps);
    public HiveTablePartition(StorageDescriptor storageDescriptor, 
                             Map<String, String> partitionSpec, Properties tableProps);
    
    // Instance methods
    public StorageDescriptor getStorageDescriptor();
    public Map<String, String> getPartitionSpec();
    public Properties getTableProps();
    public boolean equals(Object o);
    public int hashCode();
    public String toString();
    
    // Static factory methods
    public static HiveTablePartition ofTable(HiveConf hiveConf, String hiveVersion, 
                                           String dbName, String tableName);
    public static HiveTablePartition ofPartition(HiveConf hiveConf, String hiveVersion, 
                                               String dbName, String tableName,
                                               LinkedHashMap<String, String> partitionSpec);
}

// Configuration wrapper
public class JobConfWrapper implements Serializable {
    public JobConfWrapper(JobConf jobConf);
    public JobConf conf();
}

// Source implementation with generic type parameter
public final class HiveSource<T> implements Source<T, HiveSplitEnumeratorState, HiveSourceSplit> {
    // Source interface methods
    public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();
    public SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>> getEnumeratorCheckpointSerializer();
    public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> createEnumerator(
        SplitEnumeratorContext<HiveSourceSplit> enumContext);
    public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> restoreEnumerator(
        SplitEnumeratorContext<HiveSourceSplit> enumContext, 
        PendingSplitsCheckpoint<HiveSourceSplit> checkpoint);
}

// Table source with interface implementations
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown, 
    SupportsProjectionPushDown, SupportsLimitPushDown, SupportsStatisticReport, SupportsDynamicFiltering {
    
    public HiveTableSource(JobConf jobConf, ReadableConfig flinkConf, 
                          ObjectPath tablePath, CatalogTable catalogTable);
    
    // ScanTableSource methods
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext);
    public ChangelogMode getChangelogMode();
    public DynamicTableSource copy();
    public String asSummaryString();
    
    // Optimization interface methods
    public void applyLimit(long limit);
    public Optional<List<Map<String, String>>> listPartitions();
    public void applyPartitions(List<Map<String, String>> remainingPartitions);
    public List<String> listAcceptedFilterFields();
    public void applyDynamicFiltering(List<String> candidateFilterFields);
    public boolean supportsNestedProjection();
    public void applyProjection(int[][] projectedFields, DataType producedDataType);
    public TableStats reportStatistics();
}

// Table sink with interface implementations
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
    public HiveTableSink(ReadableConfig flinkConf, JobConf jobConf, 
                        ObjectIdentifier identifier, CatalogTable table, Integer configuredParallelism);
    
    // DynamicTableSink methods
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
    public DynamicTableSink copy();
    public String asSummaryString();
    
    // Partitioning and overwrite support
    public boolean requiresPartitionGrouping(boolean supportsGrouping);
    public void applyStaticPartition(Map<String, String> partition);
    public void applyOverwrite(boolean overwrite);
}

// Lookup table source extending HiveTableSource
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
    public HiveLookupTableSource(JobConf jobConf, ReadableConfig flinkConf, 
                               ObjectPath tablePath, CatalogTable catalogTable);
    
    // LookupTableSource methods
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
    public DynamicTableSource copy();
}

// Builder pattern
public class HiveSourceBuilder {
    // Constructors
    public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf, String hiveVersion, 
                           String dbName, String tableName, Map<String, String> tableOptions);
    public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf, ObjectPath tablePath, 
                           String hiveVersion, CatalogTable catalogTable);
    
    // Configuration methods
    public HiveSourceBuilder setProjectedFields(int[] projectedFields);
    public HiveSourceBuilder setLimit(Long limit);  
    public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
    public HiveSourceBuilder setDynamicFilterPartitionKeys(List<String> dynamicFilterPartitionKeys);
    
    // Build methods
    public HiveSource<RowData> buildWithDefaultBulkFormat();
    public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
}

Enums and Constants

// Partition ordering strategies
public enum PartitionOrder implements DescribedEnum {
    CREATE_TIME("create-time", "Order partitions by creation time"),
    PARTITION_TIME("partition-time", "Order partitions by partition time"),
    PARTITION_NAME("partition-name", "Order partitions by partition name");
    
    public String toString();
    public InlineElement getDescription();
}

// Version constants
public class HiveShimLoader {
    public static final String HIVE_VERSION_V3_1_2 = "3.1.2";
    public static String getHiveVersion();
}

// Catalog configuration constants  
public class HiveCatalogConfig {
    public static final String DEFAULT_LIST_COLUMN_TYPES_SEPARATOR = ":";
    public static final String COMMENT = "comment";
    public static final String PARTITION_LOCATION = "partition.location";
}