CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12

Apache Flink SQL connector for Apache Hive 3.1.2, enabling unified batch and stream processing with Hive tables.

Pending
Overview
Eval results
Files

lookup-joins.mddocs/

Lookup Joins

Specialized lookup table source for dimension table joins in streaming applications. The HiveLookupTableSource provides caching capabilities and optimized access patterns for real-time data enrichment scenarios, enabling efficient temporal joins with Hive dimension tables.

Capabilities

Hive Lookup Table Source

Extends the standard HiveTableSource with lookup join capabilities, providing cached access to dimension data for streaming joins.

/**
 * Lookup table source for dimension table joins with caching support
 * Extends HiveTableSource with optimized lookup access patterns
 */
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
    
    /**
     * Creates lookup runtime provider for join operations
     * @param lookupContext Context containing lookup configuration and key information
     * @return LookupRuntimeProvider for executing lookup operations
     */
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext);
    
    /**
     * Creates a copy of this lookup table source for planning
     * @return Deep copy of the lookup table source
     */
    public DynamicTableSource copy();
    
    /**
     * Returns string summary of the lookup table source
     * @return Human-readable description including cache configuration
     */
    public String asSummaryString();
}

Lookup Runtime Provider

Runtime component that executes actual lookup operations with caching and optimization.

/**
 * Runtime provider for executing lookup operations
 * Handles caching, key serialization, and result retrieval
 */
public interface LookupRuntimeProvider {
    
    /**
     * Performs synchronous lookup for the given key
     * @param keyRow Row containing lookup key values
     * @return Collection of matching rows from dimension table
     */
    public Collection<RowData> lookup(RowData keyRow);
    
    /**
     * Performs asynchronous lookup for the given key
     * @param keyRow Row containing lookup key values
     * @return CompletableFuture containing matching rows
     */
    public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);
}

Usage Patterns

Temporal Join Configuration

-- Create dimension table with lookup configuration
CREATE TABLE customer_dim (
    customer_id BIGINT PRIMARY KEY NOT ENFORCED,
    customer_name STRING,
    customer_tier STRING,
    registration_date DATE,
    last_updated TIMESTAMP(3)
) WITH (
    'connector' = 'hive',
    
    -- Lookup join caching configuration
    'lookup.join.cache.ttl' = '1 hour',          -- Cache entries for 1 hour
    'lookup.join.cache.max-size' = '10000',      -- Maximum cache entries
    
    -- Performance optimization
    'table.exec.hive.infer-source-parallelism' = 'false',  -- Use single task for lookup
    'table.exec.hive.split-max-size' = '64MB'              -- Smaller splits for lookup
);

-- Create streaming fact table
CREATE TABLE orders_stream (
    order_id BIGINT,
    customer_id BIGINT,
    order_amount DECIMAL(10,2),
    order_time TIMESTAMP(3),
    proc_time AS PROCTIME()  -- Processing time for temporal join
) WITH (
    'connector' = 'kafka',
    -- Kafka configuration...
);

-- Perform temporal join to enrich streaming data
SELECT 
    o.order_id,
    o.order_amount,
    o.order_time,
    c.customer_name,
    c.customer_tier,
    -- Calculate discount based on customer tier
    CASE 
        WHEN c.customer_tier = 'GOLD' THEN o.order_amount * 0.1
        WHEN c.customer_tier = 'SILVER' THEN o.order_amount * 0.05  
        ELSE 0
    END as discount_amount
FROM orders_stream o
JOIN customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id;

Advanced Lookup Join Scenarios

Multi-Key Lookups

-- Dimension table with composite key
CREATE TABLE product_inventory (
    warehouse_id STRING,
    product_id STRING,
    available_quantity INT,
    last_updated TIMESTAMP(3),
    PRIMARY KEY (warehouse_id, product_id) NOT ENFORCED
) WITH (
    'connector' = 'hive',
    'lookup.join.cache.ttl' = '30 min',
    'lookup.join.cache.max-size' = '50000'
);

-- Join with multiple lookup keys
SELECT 
    o.order_id,
    o.product_id,
    o.requested_quantity,
    i.available_quantity,
    CASE 
        WHEN i.available_quantity >= o.requested_quantity 
        THEN 'AVAILABLE' 
        ELSE 'BACKORDER' 
    END as fulfillment_status
FROM order_items_stream o
JOIN product_inventory FOR SYSTEM_TIME AS OF o.proc_time AS i
ON o.warehouse_id = i.warehouse_id 
   AND o.product_id = i.product_id;

Null Handling in Lookups

-- Handle missing dimension data gracefully
SELECT 
    o.order_id,
    o.customer_id,
    o.order_amount,
    COALESCE(c.customer_name, 'UNKNOWN') as customer_name,
    COALESCE(c.customer_tier, 'STANDARD') as customer_tier,
    -- Apply default discount for unknown customers
    CASE 
        WHEN c.customer_id IS NOT NULL AND c.customer_tier = 'GOLD' 
        THEN o.order_amount * 0.1
        ELSE 0
    END as discount_amount
FROM orders_stream o
LEFT JOIN customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id;

Performance Optimization

Cache Configuration Tuning

// Programmatic cache configuration
Map<String, String> tableOptions = new HashMap<>();

// High-frequency lookups with stable dimension data
tableOptions.put("lookup.join.cache.ttl", "2 hours");
tableOptions.put("lookup.join.cache.max-size", "100000");

// Fast-changing dimension data
tableOptions.put("lookup.join.cache.ttl", "5 minutes"); 
tableOptions.put("lookup.join.cache.max-size", "5000");

// Memory-constrained environments
tableOptions.put("lookup.join.cache.ttl", "30 minutes");
tableOptions.put("lookup.join.cache.max-size", "1000");

Partitioning for Lookup Performance

-- Partition dimension table for better lookup performance
CREATE TABLE regional_customer_dim (
    customer_id BIGINT,
    customer_name STRING,
    customer_tier STRING, 
    region STRING
) PARTITIONED BY (region)
WITH (
    'connector' = 'hive',
    'lookup.join.cache.ttl' = '1 hour',
    
    -- Optimize for partitioned lookups
    'table.exec.hive.read-partition-with-subdirectory.enabled' = 'true'
);

-- Use partition pruning in lookup joins
SELECT 
    o.order_id,
    o.customer_id,
    o.customer_region,
    c.customer_name,
    c.customer_tier
FROM orders_stream o
JOIN regional_customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id 
   AND o.customer_region = c.region;  -- Partition pruning

Monitoring and Debugging

Cache Metrics

// Access lookup cache metrics (requires custom metric collection)
public class LookupCacheMetrics {
    private Counter cacheHits;
    private Counter cacheMisses; 
    private Gauge cacheSize;
    private Timer lookupLatency;
    
    public void recordCacheHit() {
        cacheHits.inc();
    }
    
    public void recordCacheMiss() {
        cacheMisses.inc();
    }
    
    public void recordLookupLatency(long latencyMs) {
        lookupLatency.update(latencyMs, TimeUnit.MILLISECONDS);
    }
}

Troubleshooting Common Issues

High Cache Miss Rate:

-- Problem: Cache TTL too short for stable dimension data
-- Solution: Increase cache TTL
CREATE TABLE stable_dim (...) WITH (
    'lookup.join.cache.ttl' = '4 hours',  -- Increased from 1 hour
    'lookup.join.cache.max-size' = '20000'
);

Memory Pressure from Cache:

-- Problem: Lookup cache consuming too much memory
-- Solution: Reduce cache size and optimize TTL
CREATE TABLE large_dim (...) WITH (
    'lookup.join.cache.ttl' = '30 min',    -- Reduced TTL
    'lookup.join.cache.max-size' = '5000', -- Reduced size
);

Slow Lookup Performance:

-- Problem: Lookups taking too long due to large dimension table
-- Solution: Optimize table structure and add indices in Hive
-- In Hive:
-- CREATE INDEX customer_idx ON TABLE customer_dim (customer_id) 
--   AS 'COMPACT' WITH DEFERRED REBUILD;
-- ALTER INDEX customer_idx ON customer_dim REBUILD;

-- In Flink, ensure proper key selection:
CREATE TABLE optimized_dim (
    customer_id BIGINT PRIMARY KEY NOT ENFORCED,  -- Explicit primary key
    customer_data STRING
) WITH (
    'connector' = 'hive',
    'lookup.join.cache.ttl' = '1 hour'
);

Async vs Sync Lookup

// Configure async lookup for better throughput
TableEnvironment tableEnv = TableEnvironment.create(settings);

// Async lookup configuration
Configuration config = tableEnv.getConfig().getConfiguration();
config.setString("table.exec.async-lookup.buffer-capacity", "1000");
config.setString("table.exec.async-lookup.timeout", "3min");
-- Use async lookup hint for high-throughput scenarios
SELECT /*+ LOOKUP('table'='customer_dim', 'async'='true') */
    o.order_id,
    c.customer_name
FROM orders_stream o
JOIN customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id;

Best Practices

Design Guidelines

  1. Key Selection: Use selective keys that minimize lookup result sets
  2. Cache Sizing: Size cache based on working set, not total dimension size
  3. TTL Configuration: Balance freshness requirements with performance
  4. Partitioning: Partition large dimension tables for better pruning
  5. Schema Design: Include timestamp columns for temporal validity checking

Operational Considerations

  1. Monitoring: Track cache hit rates and lookup latencies
  2. Resource Planning: Account for cache memory in task manager sizing
  3. Data Freshness: Coordinate dimension updates with cache TTL settings
  4. Fault Tolerance: Design for graceful degradation when lookups fail
  5. Testing: Validate lookup behavior with realistic data volumes and patterns

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-3-1-2-2-12

docs

catalog-integration.md

configuration-management.md

factory-registration.md

function-module.md

index.md

lookup-joins.md

table-sources-sinks.md

tile.json