Apache Flink SQL connector for Apache Hive 3.1.2, enabling unified batch and stream processing with Hive tables.
—
Core table connector implementations providing comprehensive read and write capabilities for Hive tables. These classes support both batch and streaming processing modes, with advanced features including partition management, performance optimizations, and streaming ingestion capabilities.
Primary table source implementation for reading data from Hive tables, supporting both batch and streaming modes with advanced pushdown optimizations.
/**
* Table source for reading Hive tables with comprehensive optimization support
* Supports: partition pushdown, projection pushdown, limit pushdown, statistics reporting
*/
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,
SupportsProjectionPushDown, SupportsLimitPushDown, SupportsStatisticReport {
/**
* Creates table source scan runtime provider for execution
* @param scanContext Context containing runtime information
* @return ScanRuntimeProvider for execution
*/
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
/**
* Applies partition pruning to reduce data scanning
* @param remainingPartitions Partitions that remain after pruning
*/
public void applyPartitions(List<Map<String, String>> remainingPartitions);
/**
* Applies column projection to minimize data transfer
* @param projectedFields Array of projected field indices
* @param nestedFields Nested field projections
*/
public void applyProjection(int[][] projectedFields, DataType[] nestedFields);
/**
* Applies limit pushdown for query optimization
* @param limit Maximum number of records to read
*/
public void applyLimit(long limit);
/**
* Reports table statistics for cost-based optimization
* @param reportContext Context for statistics reporting
* @return Table statistics including row count and column statistics
*/
public TableStats reportStatistics(StatisticReportContext reportContext);
/**
* Creates a copy of this table source for planning
* @return Deep copy of the table source
*/
public DynamicTableSource copy();
/**
* Returns string summary of the table source
* @return Human-readable description
*/
public String asSummaryString();
}Usage Examples:
-- Batch reading with partition pruning
SELECT id, name, amount
FROM hive_table
WHERE partition_date BETWEEN '2023-01-01' AND '2023-01-31'
AND region = 'us-west';
-- Streaming source configuration
CREATE TABLE hive_stream_source (
id BIGINT,
event_data STRING,
event_time TIMESTAMP(3),
partition_hour STRING
) PARTITIONED BY (partition_hour)
WITH (
'connector' = 'hive',
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '10 min',
'streaming-source.consume-start-offset' = '2023-01-01 00:00:00'
);Primary table sink implementation for writing data to Hive tables with comprehensive partitioning and commit policy support.
/**
* Table sink for writing data to Hive tables with partition support
* Supports: dynamic partitioning, overwrite modes, custom commit policies
*/
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
/**
* Creates sink runtime provider for execution
* @param sinkContext Context containing runtime information
* @return SinkRuntimeProvider for execution
*/
public SinkRuntimeProvider getSinkRuntimeProvider(SinkContext sinkContext);
/**
* Applies static partition specifications
* @param partitions Map of partition column to value
*/
public void applyStaticPartition(Map<String, String> partitions);
/**
* Configures overwrite mode for the sink
* @param overwrite Whether to overwrite existing data
*/
public void applyOverwrite(boolean overwrite);
/**
* Creates a copy of this table sink for planning
* @return Deep copy of the table sink
*/
public DynamicTableSink copy();
/**
* Returns string summary of the table sink
* @return Human-readable description
*/
public String asSummaryString();
}Usage Examples:
-- Writing to partitioned table
INSERT INTO hive_partitioned_table
PARTITION (year='2023', month='01')
SELECT id, name, amount FROM source_table;
-- Dynamic partitioning with overwrite
INSERT OVERWRITE hive_table
SELECT id, name, amount, DATE_FORMAT(event_time, 'yyyy-MM-dd') as partition_date
FROM streaming_source;
-- Sink configuration with commit policies
CREATE TABLE hive_sink (
id BIGINT,
data STRING,
partition_date STRING
) PARTITIONED BY (partition_date)
WITH (
'connector' = 'hive',
'sink.partition-commit.policy.kind' = 'metastore,success-file',
'table.exec.hive.sink.statistic-auto-gather.enable' = 'true'
);Low-level source implementation using Flink's new Source API, providing fine-grained control over data ingestion and parallelism.
/**
* Generic source implementation using Flink's new Source API
* Provides low-level control over data ingestion and split management
* @param <T> Output data type
*/
public final class HiveSource<T> implements Source<T, HiveSplitEnumeratorState, HiveSourceSplit>,
ResultTypeQueryable<T> {
/**
* Creates a bounded split enumerator for batch execution
* @param enumContext Enumerator context
* @return Split enumerator instance
*/
public SplitEnumerator<HiveSourceSplit, HiveSplitEnumeratorState> createEnumerator(
SplitEnumeratorContext<HiveSourceSplit> enumContext);
/**
* Restores split enumerator from checkpoint state
* @param enumContext Enumerator context
* @param checkpoint Checkpoint state to restore from
* @return Restored split enumerator
*/
public SplitEnumerator<HiveSourceSplit, HiveSplitEnumeratorState> restoreEnumerator(
SplitEnumeratorContext<HiveSourceSplit> enumContext,
HiveSplitEnumeratorState checkpoint);
/**
* Creates a source reader for processing splits
* @param readerContext Reader context
* @return Source reader instance
*/
public SourceReader<T, HiveSourceSplit> createReader(SourceReaderContext readerContext);
/**
* Returns the boundedness of this source
* @return Boundedness.BOUNDED for batch, CONTINUOUS_UNBOUNDED for streaming
*/
public Boundedness getBoundedness();
/**
* Returns the produced data type
* @return TypeInformation for the output type
*/
public TypeInformation<T> getProducedType();
}Usage Example:
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
// Create streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Build Hive source
HiveSource<RowData> hiveSource = new HiveSourceBuilder()
.setTableIdentifier(ObjectIdentifier.of("catalog", "database", "table"))
.setHiveConfiguration(hiveConf)
.setProjectFields(new int[]{0, 1, 2}) // Project specific columns
.setLimit(10000) // Limit number of records
.build();
// Add to execution graph
env.fromSource(hiveSource, WatermarkStrategy.noWatermarks(), "Hive Source")
.map(row -> processRow(row))
.print();
env.execute("Hive DataStream Job");Builder pattern implementation for constructing HiveSource instances with fluent API for configuration.
/**
* Builder for creating HiveSource instances with fluent configuration API
*/
public class HiveSourceBuilder {
/**
* Sets the table identifier for the source
* @param tableIdentifier Table identifier (catalog.database.table)
* @return This builder instance for chaining
*/
public HiveSourceBuilder setTableIdentifier(ObjectIdentifier tableIdentifier);
/**
* Sets Hive configuration for the source
* @param hiveConf Hive configuration object
* @return This builder instance for chaining
*/
public HiveSourceBuilder setHiveConfiguration(HiveConf hiveConf);
/**
* Sets projected field indices to minimize data transfer
* @param projectFields Array of field indices to project
* @return This builder instance for chaining
*/
public HiveSourceBuilder setProjectFields(int[] projectFields);
/**
* Sets maximum number of records to read
* @param limit Maximum record count
* @return This builder instance for chaining
*/
public HiveSourceBuilder setLimit(long limit);
/**
* Sets specific partitions to read from
* @param partitions List of partition specifications
* @return This builder instance for chaining
*/
public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
/**
* Builds the configured HiveSource instance
* @return Configured HiveSource<RowData> instance
*/
public HiveSource<RowData> build();
}Specialized table source for dimension table lookups in streaming joins, providing caching and optimized access patterns.
/**
* Lookup table source for dimension table joins with caching support
* Extends HiveTableSource with lookup join capabilities
*/
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
/**
* Creates lookup runtime provider for join operations
* @param lookupContext Context containing lookup configuration
* @return LookupRuntimeProvider for execution
*/
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext);
/**
* Creates a copy of this lookup table source
* @return Deep copy of the lookup table source
*/
public DynamicTableSource copy();
}Usage Example:
-- Configure lookup join with caching
CREATE TABLE dim_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
category STRING
) WITH (
'connector' = 'hive',
'lookup.join.cache.ttl' = '1 hour'
);
-- Use in temporal join
SELECT
orders.order_id,
orders.amount,
dim.name,
dim.category
FROM orders_stream
JOIN dim_table FOR SYSTEM_TIME AS OF orders.proc_time AS dim
ON orders.product_id = dim.id;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12