CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6-2-11

Apache Flink SQL connector for Apache Hive 2.3.6 with Scala 2.11 binary compatibility

Pending
Overview
Eval results
Files

table-sources.mddocs/

Table Sources

Reading data from Hive tables with support for both batch and streaming modes, partition pruning, projection pushdown, and lookup joins. Provides comprehensive integration with Flink's table ecosystem while leveraging Hive's storage capabilities.

Capabilities

HiveTableSource

Primary table source for reading Hive tables in streaming mode with continuous partition monitoring.

/**
 * Table source for reading data from Hive tables in streaming mode
 * Supports continuous partition monitoring and various optimizations
 */
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown, SupportsProjectionPushDown, SupportsLimitPushDown {
    /**
     * Creates HiveTableSource for streaming Hive table access
     * @param jobConf - Hadoop job configuration  
     * @param conf - Flink configuration
     * @param tablePath - Path to the Hive table
     * @param catalogTable - Catalog table metadata
     */
    public HiveTableSource(JobConf jobConf, ReadableConfig conf, ObjectPath tablePath, CatalogTable catalogTable);
    
    /**
     * Get the scan runtime provider for reading data
     * @param scanContext - Context for scan operation
     * @return ScanRuntimeProvider for data stream creation
     */
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
    
    /**
     * Get the changelog mode supported by this source
     * @return ChangelogMode indicating supported change types
     */
    public ChangelogMode getChangelogMode();
    
    /**
     * Copy this source with different configuration
     * @return New HiveTableSource instance
     */
    public DynamicTableSource copy();
    
    /**
     * Get string summary of this table source
     * @return Human-readable description
     */
    public String asSummaryString();
}

HiveLookupTableSource

Table source with both scan and lookup capabilities for dimension table use cases.

/**
 * Table source with both scan and lookup capabilities for Hive tables
 * Ideal for dimension tables used in joins
 */
public class HiveLookupTableSource implements LookupTableSource, ScanTableSource {
    /**
     * Creates HiveLookupTableSource for scan and lookup operations
     * @param jobConf - Hadoop job configuration
     * @param conf - Flink configuration  
     * @param tablePath - Path to the Hive table
     * @param catalogTable - Catalog table metadata
     */
    public HiveLookupTableSource(JobConf jobConf, ReadableConfig conf, ObjectPath tablePath, CatalogTable catalogTable);
    
    /**
     * Get the lookup runtime provider for join operations
     * @param context - Context for lookup operation
     * @return LookupRuntimeProvider for lookup function creation
     */
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
    
    /**
     * Get the scan runtime provider for reading data
     * @param scanContext - Context for scan operation  
     * @return ScanRuntimeProvider for data stream creation
     */
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
    
    /**
     * Get the changelog mode supported by this source
     * @return ChangelogMode indicating supported change types
     */
    public ChangelogMode getChangelogMode();
    
    /**
     * Copy this source with different configuration
     * @return New HiveLookupTableSource instance
     */
    public DynamicTableSource copy();
}

Partition Pushdown Support

Interface for optimizing queries by pushing partition filters to the source.

/**
 * Apply partition pushdown optimization
 * Filters data at the source level based on partition specifications
 * @param remainingPartitions - List of partition specs that remain after filtering
 */
public void applyPartitions(List<Map<String, String>> remainingPartitions);

/**
 * Check if nested projection is supported
 * @return true if nested field projection is supported
 */
public boolean supportsNestedProjection();

Projection Pushdown Support

Interface for optimizing queries by pushing column projections to the source.

/**
 * Apply projection pushdown optimization
 * Only reads specified columns from the underlying storage
 * @param projectedFields - Array of field indices to project in nested format
 */
public void applyProjection(int[][] projectedFields);

Limit Pushdown Support

Interface for optimizing queries by pushing LIMIT operations to the source.

/**
 * Apply limit pushdown optimization
 * Limits the number of records read at the source level
 * @param limit - Maximum number of records to read
 */
public void applyLimit(long limit);

Reading Context and Configuration

Context classes for configuring Hive table reading behavior.

/**
 * Context for Hive partition operations
 */
public class HivePartitionContext {
    public HivePartitionContext(List<HiveTablePartition> allPartitions, List<HiveTablePartition> remainingPartitions);
    public List<HiveTablePartition> getAllPartitions();
    public List<HiveTablePartition> getRemainingPartitions();
}

/**
 * Context for continuous Hive partition monitoring
 */
public class HiveContinuousPartitionContext extends HivePartitionFetcherContextBase<HiveTablePartition> {
    public HiveContinuousPartitionContext(ObjectPath tablePath, CatalogTable catalogTable, List<String> partitionKeys);
    public List<HiveTablePartition> getPartitions(List<String> partitionValues);
}

Input Format Classes

Low-level input format classes for reading Hive table data.

/**
 * Input format for reading Hive table data
 * Handles various Hive file formats and SerDes
 */
public class HiveTableInputFormat extends RichInputFormat<RowData, HiveTableInputSplit> {
    /**
     * Open input format for reading
     * @param split - Input split to read
     * @throws IOException if open fails
     */
    public void open(HiveTableInputSplit split) throws IOException;
    
    /**
     * Check if more records are available
     * @return true if more records available
     * @throws IOException if check fails
     */
    public boolean reachedEnd() throws IOException;
    
    /**
     * Read next record
     * @param reuse - Reusable RowData object
     * @return Next RowData record
     * @throws IOException if read fails
     */
    public RowData nextRecord(RowData reuse) throws IOException;
    
    /**
     * Close input format
     * @throws IOException if close fails
     */
    public void close() throws IOException;
}

/**
 * Input split for Hive table reading
 */
public class HiveTableInputSplit implements InputSplit {
    public HiveTableInputSplit(int splitNumber, Path path, long start, long length, String[] hosts);
    public int getSplitNumber();
    public String[] getHostnames();
}

Split Reader Implementations

Specialized readers for different Hive file formats.

/**
 * Split reader interface for Hive formats
 */
public interface SplitReader<T> {
    /**
     * Read next record from split
     * @return Next record or null if end reached
     * @throws IOException if read fails
     */
    T read() throws IOException;
    
    /**
     * Close the reader
     * @throws IOException if close fails
     */
    void close() throws IOException;
}

/**
 * Split reader for ORC files with vectorization
 */
public class HiveVectorizedOrcSplitReader implements SplitReader<RowData> {
    public HiveVectorizedOrcSplitReader(HiveShim hiveShim, JobConf jobConf, String[] fieldNames, DataType[] fieldTypes, int[] selectedFields, HiveTableInputSplit split);
}

/**
 * Split reader for Parquet files with vectorization  
 */
public class HiveVectorizedParquetSplitReader implements SplitReader<RowData> {
    public HiveVectorizedParquetSplitReader(HiveShim hiveShim, JobConf jobConf, String[] fieldNames, DataType[] fieldTypes, int[] selectedFields, HiveTableInputSplit split);
}

/**
 * Split reader for MapReduce-based formats
 */
public class HiveMapredSplitReader implements SplitReader<RowData> {
    public HiveMapredSplitReader(JobConf jobConf, String[] fieldNames, DataType[] fieldTypes, int[] selectedFields, HiveTableInputSplit split, HiveShim hiveShim);
}

Usage Examples:

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.configuration.Configuration;

// Set up table environment with Hive catalog
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");
tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.useCatalog("hive");

// Query Hive table with automatic source optimization
Table result = tableEnv.sqlQuery(
    "SELECT customer_id, order_total " +
    "FROM hive_catalog.sales.orders " + 
    "WHERE partition_date >= '2023-01-01' " +
    "AND order_total > 100.0 " +
    "LIMIT 1000"
);

// The HiveTableSource will automatically apply:
// - Partition pushdown (partition_date >= '2023-01-01')
// - Projection pushdown (only customer_id, order_total columns)
// - Limit pushdown (LIMIT 1000)

result.execute().print();
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

// Set up streaming environment for continuous monitoring
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Register Hive catalog
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");
tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.useCatalog("hive");

// Enable continuous partition monitoring
tableEnv.getConfig().getConfiguration().setBoolean("streaming-source.enable", true);
tableEnv.getConfig().getConfiguration().setString("streaming-source.partition.include", "all");

// Stream from Hive table with partition monitoring
Table stream = tableEnv.sqlQuery(
    "SELECT event_time, user_id, action " +
    "FROM hive_catalog.events.user_actions"
);

// Convert to DataStream for further processing
DataStream<Row> dataStream = tableEnv.toAppendStream(stream, Row.class);
dataStream.print();

env.execute("Hive Streaming Source Example");

Types

public class HiveTablePartition {
    public HiveTablePartition(StorageDescriptor storageDescriptor, Map<String, String> partitionSpec);
    public StorageDescriptor getStorageDescriptor();
    public Map<String, String> getPartitionSpec();
    public String getLocation();
}

public class HiveSourceSplit implements SourceSplit {
    public String splitId();
    public HiveTableInputSplit getHiveTableInputSplit();
}

public interface ScanContext {
    DataTypeFactory getDataTypeFactory();
}

public interface LookupContext {
    String[] getKeys();
    DataTypeFactory getDataTypeFactory();
}

public interface ScanRuntimeProvider extends DynamicTableSource.RuntimeProvider {
    // Marker interface for scan providers
}

public interface LookupRuntimeProvider extends DynamicTableSource.RuntimeProvider {
    // Marker interface for lookup providers
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6-2-11

docs

catalog-operations.md

configuration.md

hive-functions.md

index.md

source-api.md

table-sinks.md

table-sources.md

tile.json