Apache Flink SQL connector for Apache Hive 3.1.2, enabling unified batch and stream processing with Hive tables.
—
Specialized lookup table source for dimension table joins in streaming applications. The HiveLookupTableSource provides caching capabilities and optimized access patterns for real-time data enrichment scenarios, enabling efficient temporal joins with Hive dimension tables.
Extends the standard HiveTableSource with lookup join capabilities, providing cached access to dimension data for streaming joins.
/**
* Lookup table source for dimension table joins with caching support
* Extends HiveTableSource with optimized lookup access patterns
*/
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
/**
* Creates lookup runtime provider for join operations
* @param lookupContext Context containing lookup configuration and key information
* @return LookupRuntimeProvider for executing lookup operations
*/
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext);
/**
* Creates a copy of this lookup table source for planning
* @return Deep copy of the lookup table source
*/
public DynamicTableSource copy();
/**
* Returns string summary of the lookup table source
* @return Human-readable description including cache configuration
*/
public String asSummaryString();
}Runtime component that executes actual lookup operations with caching and optimization.
/**
* Runtime provider for executing lookup operations
* Handles caching, key serialization, and result retrieval
*/
public interface LookupRuntimeProvider {
/**
* Performs synchronous lookup for the given key
* @param keyRow Row containing lookup key values
* @return Collection of matching rows from dimension table
*/
public Collection<RowData> lookup(RowData keyRow);
/**
* Performs asynchronous lookup for the given key
* @param keyRow Row containing lookup key values
* @return CompletableFuture containing matching rows
*/
public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);
}-- Create dimension table with lookup configuration
CREATE TABLE customer_dim (
customer_id BIGINT PRIMARY KEY NOT ENFORCED,
customer_name STRING,
customer_tier STRING,
registration_date DATE,
last_updated TIMESTAMP(3)
) WITH (
'connector' = 'hive',
-- Lookup join caching configuration
'lookup.join.cache.ttl' = '1 hour', -- Cache entries for 1 hour
'lookup.join.cache.max-size' = '10000', -- Maximum cache entries
-- Performance optimization
'table.exec.hive.infer-source-parallelism' = 'false', -- Use single task for lookup
'table.exec.hive.split-max-size' = '64MB' -- Smaller splits for lookup
);
-- Create streaming fact table
CREATE TABLE orders_stream (
order_id BIGINT,
customer_id BIGINT,
order_amount DECIMAL(10,2),
order_time TIMESTAMP(3),
proc_time AS PROCTIME() -- Processing time for temporal join
) WITH (
'connector' = 'kafka',
-- Kafka configuration...
);
-- Perform temporal join to enrich streaming data
SELECT
o.order_id,
o.order_amount,
o.order_time,
c.customer_name,
c.customer_tier,
-- Calculate discount based on customer tier
CASE
WHEN c.customer_tier = 'GOLD' THEN o.order_amount * 0.1
WHEN c.customer_tier = 'SILVER' THEN o.order_amount * 0.05
ELSE 0
END as discount_amount
FROM orders_stream o
JOIN customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id;-- Dimension table with composite key
CREATE TABLE product_inventory (
warehouse_id STRING,
product_id STRING,
available_quantity INT,
last_updated TIMESTAMP(3),
PRIMARY KEY (warehouse_id, product_id) NOT ENFORCED
) WITH (
'connector' = 'hive',
'lookup.join.cache.ttl' = '30 min',
'lookup.join.cache.max-size' = '50000'
);
-- Join with multiple lookup keys
SELECT
o.order_id,
o.product_id,
o.requested_quantity,
i.available_quantity,
CASE
WHEN i.available_quantity >= o.requested_quantity
THEN 'AVAILABLE'
ELSE 'BACKORDER'
END as fulfillment_status
FROM order_items_stream o
JOIN product_inventory FOR SYSTEM_TIME AS OF o.proc_time AS i
ON o.warehouse_id = i.warehouse_id
AND o.product_id = i.product_id;-- Handle missing dimension data gracefully
SELECT
o.order_id,
o.customer_id,
o.order_amount,
COALESCE(c.customer_name, 'UNKNOWN') as customer_name,
COALESCE(c.customer_tier, 'STANDARD') as customer_tier,
-- Apply default discount for unknown customers
CASE
WHEN c.customer_id IS NOT NULL AND c.customer_tier = 'GOLD'
THEN o.order_amount * 0.1
ELSE 0
END as discount_amount
FROM orders_stream o
LEFT JOIN customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id;// Programmatic cache configuration
Map<String, String> tableOptions = new HashMap<>();
// High-frequency lookups with stable dimension data
tableOptions.put("lookup.join.cache.ttl", "2 hours");
tableOptions.put("lookup.join.cache.max-size", "100000");
// Fast-changing dimension data
tableOptions.put("lookup.join.cache.ttl", "5 minutes");
tableOptions.put("lookup.join.cache.max-size", "5000");
// Memory-constrained environments
tableOptions.put("lookup.join.cache.ttl", "30 minutes");
tableOptions.put("lookup.join.cache.max-size", "1000");-- Partition dimension table for better lookup performance
CREATE TABLE regional_customer_dim (
customer_id BIGINT,
customer_name STRING,
customer_tier STRING,
region STRING
) PARTITIONED BY (region)
WITH (
'connector' = 'hive',
'lookup.join.cache.ttl' = '1 hour',
-- Optimize for partitioned lookups
'table.exec.hive.read-partition-with-subdirectory.enabled' = 'true'
);
-- Use partition pruning in lookup joins
SELECT
o.order_id,
o.customer_id,
o.customer_region,
c.customer_name,
c.customer_tier
FROM orders_stream o
JOIN regional_customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id
AND o.customer_region = c.region; -- Partition pruning// Access lookup cache metrics (requires custom metric collection)
public class LookupCacheMetrics {
private Counter cacheHits;
private Counter cacheMisses;
private Gauge cacheSize;
private Timer lookupLatency;
public void recordCacheHit() {
cacheHits.inc();
}
public void recordCacheMiss() {
cacheMisses.inc();
}
public void recordLookupLatency(long latencyMs) {
lookupLatency.update(latencyMs, TimeUnit.MILLISECONDS);
}
}High Cache Miss Rate:
-- Problem: Cache TTL too short for stable dimension data
-- Solution: Increase cache TTL
CREATE TABLE stable_dim (...) WITH (
'lookup.join.cache.ttl' = '4 hours', -- Increased from 1 hour
'lookup.join.cache.max-size' = '20000'
);Memory Pressure from Cache:
-- Problem: Lookup cache consuming too much memory
-- Solution: Reduce cache size and optimize TTL
CREATE TABLE large_dim (...) WITH (
'lookup.join.cache.ttl' = '30 min', -- Reduced TTL
'lookup.join.cache.max-size' = '5000', -- Reduced size
);Slow Lookup Performance:
-- Problem: Lookups taking too long due to large dimension table
-- Solution: Optimize table structure and add indices in Hive
-- In Hive:
-- CREATE INDEX customer_idx ON TABLE customer_dim (customer_id)
-- AS 'COMPACT' WITH DEFERRED REBUILD;
-- ALTER INDEX customer_idx ON customer_dim REBUILD;
-- In Flink, ensure proper key selection:
CREATE TABLE optimized_dim (
customer_id BIGINT PRIMARY KEY NOT ENFORCED, -- Explicit primary key
customer_data STRING
) WITH (
'connector' = 'hive',
'lookup.join.cache.ttl' = '1 hour'
);// Configure async lookup for better throughput
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Async lookup configuration
Configuration config = tableEnv.getConfig().getConfiguration();
config.setString("table.exec.async-lookup.buffer-capacity", "1000");
config.setString("table.exec.async-lookup.timeout", "3min");-- Use async lookup hint for high-throughput scenarios
SELECT /*+ LOOKUP('table'='customer_dim', 'async'='true') */
o.order_id,
c.customer_name
FROM orders_stream o
JOIN customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12