Core functionality for reading from and writing to Hive tables using Flink's Table API and DataStream API. Supports both batch and streaming modes with comprehensive partition handling, projection pushdown, and efficient data processing.
Unified data source for reading Hive tables in both bounded (batch) and unbounded (streaming) modes.
/**
* Unified data source for reading Hive tables (bounded/unbounded)
* Implements Flink's Source interface for integration with DataStream API
*/
@PublicEvolving
public class HiveSource<T> implements Source<T, HiveSourceSplit, HivePendingSplitsCheckpoint> {
public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();
public SplitEnumerator<HiveSourceSplit, HivePendingSplitsCheckpoint> createEnumerator(
SplitEnumeratorContext<HiveSourceSplit> enumContext) throws Exception;
public SplitEnumerator<HiveSourceSplit, HivePendingSplitsCheckpoint> restoreEnumerator(
SplitEnumeratorContext<HiveSourceSplit> enumContext,
HivePendingSplitsCheckpoint checkpoint) throws Exception;
public SourceReader<T, HiveSourceSplit> createReader(SourceReaderContext readerContext) throws Exception;
}Builder pattern implementation for constructing HiveSource instances with flexible configuration options.
/**
* Builder for constructing HiveSource instances with configuration options
* Provides fluent API for setting partitions, projections, and limits
*/
@PublicEvolving
public class HiveSourceBuilder {
/**
* Create a new HiveSourceBuilder
* @param jobConf Hadoop JobConf with Hive configuration
* @param flinkConf Flink configuration options
* @param hiveVersion Hive version for compatibility (null for auto-detection)
* @param dbName Database name
* @param tableName Table name
* @param tableOptions Additional table options (take precedence over metastore properties)
*/
public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf,
String hiveVersion, String dbName,
String tableName, Map<String, String> tableOptions);
/**
* Build HiveSource with default RowData bulk format
* @return HiveSource configured for RowData processing
*/
public HiveSource<RowData> buildWithDefaultBulkFormat();
/**
* Build HiveSource with custom bulk format
* @param bulkFormat Custom bulk format for reading data
* @return HiveSource configured with custom format
*/
public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
/**
* Set specific partitions to read (for batch mode)
* @param partitions List of partitions to read
* @return Builder instance for chaining
*/
public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
/**
* Set maximum number of records to read
* @param limit Maximum record count
* @return Builder instance for chaining
*/
public HiveSourceBuilder setLimit(Long limit);
/**
* Set field projection for reading subset of columns
* @param projectedFields Array of field indices to project
* @return Builder instance for chaining
*/
public HiveSourceBuilder setProjectedFields(int[] projectedFields);
}Usage Examples:
import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.hadoop.mapred.JobConf;
// Basic source creation
JobConf jobConf = new JobConf();
jobConf.set("hive.metastore.uris", "thrift://localhost:9083");
HiveSource<RowData> source = new HiveSourceBuilder(
jobConf,
new Configuration(),
"2.3.9", // hiveVersion
"default",
"user_events",
Collections.emptyMap()
).buildWithDefaultBulkFormat();
// Source with specific partitions and projection
List<HiveTablePartition> partitions = Arrays.asList(
HiveTablePartition.ofPartition(storageDesc, Map.of("year", "2024", "month", "01"), tableParams),
HiveTablePartition.ofPartition(storageDesc, Map.of("year", "2024", "month", "02"), tableParams)
);
HiveSource<RowData> filteredSource = new HiveSourceBuilder(
jobConf,
new Configuration(),
"2.3.9", // hiveVersion
"analytics",
"sales_data",
Collections.emptyMap()
)
.setPartitions(partitions)
.setProjectedFields(new int[]{0, 2, 5}) // Select specific columns
.setLimit(100000L) // Limit records
.buildWithDefaultBulkFormat();
// Use in DataStream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<RowData> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"hive-source"
);Dynamic table source implementation for Table API integration with advanced pushdown capabilities.
/**
* Dynamic table source for Hive integration with Table API
* Supports partition pushdown, projection pushdown, limit pushdown, and statistics
*/
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,
SupportsProjectionPushDown, SupportsLimitPushDown,
SupportsStatisticReport, SupportsDynamicFiltering {
/**
* Get scan runtime provider for table scanning
* @param scanContext Scan context with runtime information
* @return Provider for scan runtime
*/
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
/**
* Apply partition pruning to reduce data scanning
* @param remainingPartitions Partitions remaining after pruning
* @return Result with updated source
*/
public Result applyPartitions(List<Map<String, String>> remainingPartitions);
/**
* Apply field projection to read only required columns
* @param projectedFields Array of projected field indices
* @param producedDataType Data type after projection
* @return Result with updated source
*/
public Result applyProjection(int[][] projectedFields, DataType producedDataType);
/**
* Apply limit pushdown for result size optimization
* @param limit Maximum number of records to read
* @return Result with updated source
*/
public Result applyLimit(long limit);
/**
* Report table statistics for query optimization
* @return Table statistics including row count and column statistics
*/
public ChangelogMode getChangelogMode();
/**
* Get table statistics for cost-based optimization
* @return TableStats with row count and column statistics
*/
public TableStats reportStatistics();
}Specialized table source for lookup join operations with caching support.
/**
* Lookup table source extending HiveTableSource for join operations
* Provides temporal table lookup functionality with optional caching
*/
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
/**
* Get lookup runtime provider for join operations
* @param context Lookup context with join information
* @return Provider for lookup runtime
*/
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
}Dynamic table sink for writing data to Hive tables with partitioning and overwrite support.
/**
* Dynamic table sink for writing to Hive tables
* Supports static partitioning, overwrite mode, and partition commit policies
*/
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
/**
* Get sink runtime provider for data writing
* @param context Sink context with runtime information
* @return Provider for sink runtime
*/
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
/**
* Apply static partition specification for writing
* @param partition Static partition key-value pairs
* @return Result with updated sink
*/
public Result applyStaticPartition(Map<String, String> partition);
/**
* Enable or disable overwrite mode for existing data
* @param overwrite Whether to overwrite existing data
* @return Result with updated sink configuration
*/
public Result applyOverwrite(boolean overwrite);
/**
* Configure partition grouping behavior for writing
* @param requiresGrouping Whether partition grouping is required
* @return Result with updated sink configuration
*/
public Result requiresPartitionGrouping(boolean requiresGrouping);
/**
* Create copy of sink for plan optimization
* @return Copy of the sink
*/
public DynamicTableSink copy();
/**
* Get human-readable summary of sink configuration
* @return Summary string
*/
public String asSummaryString();
}Usage Examples:
// Table API source usage
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Register Hive catalog
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/etc/hive/conf");
tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.useCatalog("hive");
// Query with automatic source optimization
Table result = tableEnv.sqlQuery("""
SELECT user_id, event_type, COUNT(*) as event_count
FROM user_events
WHERE event_date >= '2024-01-01'
AND event_type IN ('login', 'purchase')
GROUP BY user_id, event_type
LIMIT 1000
""");
// Write to Hive table with partitioning
Table processedData = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("user_id", DataTypes.STRING()),
DataTypes.FIELD("revenue", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("month", DataTypes.STRING())
),
Row.of("user123", new BigDecimal("99.99"), "2024-01"),
Row.of("user456", new BigDecimal("149.99"), "2024-01")
);
// Insert with automatic sink configuration
processedData.executeInsert("analytics.monthly_revenue");The Hive connector seamlessly integrates with Flink's unified connector framework:
// Factory registration happens automatically via SPI
// HiveDynamicTableFactory is discovered and used by Flink
// Manual source creation for DataStream API
HiveSource<RowData> source = new HiveSourceBuilder(jobConf, config, "2.3.9", "db", "table", Map.of())
.setPartitions(specificPartitions)
.setProjectedFields(new int[]{0, 1, 3})
.buildWithDefaultBulkFormat();
// Use with Flink's Source API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<RowData> hiveStream = env.fromSource(
source,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),
"hive-source"
);The connector supports continuous monitoring of Hive tables for streaming scenarios:
// Enable streaming mode via configuration
Configuration config = new Configuration();
config.set(HiveOptions.STREAMING_SOURCE_ENABLE, true);
config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(1));
HiveSource<RowData> streamingSource = new HiveSourceBuilder(
jobConf, config, "2.3.9", "db", "streaming_table", Map.of()
).buildWithDefaultBulkFormat();Configure how partitions are committed after writing:
// Set partition commit policy
Configuration sinkConfig = new Configuration();
sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");
// Commit policy affects when partitions become visible// Enable parallelism inference for optimal performance
config.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
// Configure reader fallback for compatibility
config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);
config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);