or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md
tile.json

lookup-functions.mddocs/

Lookup Functions

The HBase connector provides lookup function capabilities for temporal table joins, enabling real-time enrichment of streaming data with dimension data stored in HBase. This is essential for joining fast-changing stream data with slowly-changing dimension tables.

HBaseLookupFunction

A table function that performs lookups in HBase tables for temporal joins in Flink's Table API.

class HBaseLookupFunction extends TableFunction<Row> {
    public HBaseLookupFunction(Configuration configuration, String hTableName, 
        HBaseTableSchema hbaseTableSchema);
    
    // Core lookup method
    public void eval(Object rowKey);
    
    // Function lifecycle
    public void open(FunctionContext context) throws Exception;
    public void close() throws Exception;
    
    // Type information
    public TypeInformation<Row> getResultType();
}

Basic Lookup Usage

import org.apache.flink.addons.hbase.HBaseLookupFunction;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.addons.hbase.HBaseTableSource;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.hadoop.conf.Configuration;

// Configure HBase connection
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "localhost:2181");

// Define dimension table schema
HBaseTableSchema userProfileSchema = new HBaseTableSchema();
userProfileSchema.setRowKey("user_id", String.class);
userProfileSchema.addColumn("profile", "name", String.class);
userProfileSchema.addColumn("profile", "email", String.class);
userProfileSchema.addColumn("profile", "age", Integer.class);
userProfileSchema.addColumn("profile", "department", String.class);

// Create table source with lookup capability
HBaseTableSource userProfileSource = new HBaseTableSource(conf, "user_profiles");
userProfileSource.setRowKey("user_id", String.class);
userProfileSource.addColumn("profile", "name", String.class);
userProfileSource.addColumn("profile", "email", String.class);
userProfileSource.addColumn("profile", "age", Integer.class);
userProfileSource.addColumn("profile", "department", String.class);

// Register as temporal table
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
tableEnv.registerTableSource("user_profiles", userProfileSource);

Temporal Join with Lookup

-- Create the main event stream table
CREATE TABLE user_events (
    event_id STRING,
    user_id STRING,
    event_type STRING,
    event_time TIMESTAMP(3),
    event_value DOUBLE,
    proc_time AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'user-events',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);

-- Create HBase lookup table
CREATE TABLE user_profiles (
    user_id STRING,
    name STRING,
    email STRING,
    age INT,
    department STRING
) WITH (
    'connector' = 'hbase',
    'table-name' = 'user_profiles',
    'zookeeper.quorum' = 'localhost:2181'
);

-- Perform temporal join (lookup)
SELECT 
    e.event_id,
    e.user_id,
    e.event_type,
    e.event_value,
    u.name,
    u.email,
    u.department
FROM user_events e
JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;

Programmatic Lookup Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.functions.TableFunction;

// Create lookup function directly
HBaseLookupFunction lookupFunction = new HBaseLookupFunction(
    conf, "user_profiles", userProfileSchema);

// Register as user-defined function
tableEnv.registerFunction("lookup_user", lookupFunction);

// Use in SQL query
Table enrichedEvents = tableEnv.sqlQuery(
    "SELECT " +
    "    e.event_id, " +
    "    e.user_id, " +
    "    e.event_type, " +
    "    u.name, " +
    "    u.email " +
    "FROM events e, " +
    "LATERAL TABLE(lookup_user(e.user_id)) AS u(user_id, name, email, age, department)"
);

Lookup Performance Optimization

Connection Caching

The lookup function automatically manages HBase connections and implements connection pooling for better performance:

// Configure HBase client for lookup performance
Configuration lookupConf = new Configuration();
lookupConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");

// Connection pool settings
lookupConf.setInt("hbase.client.ipc.pool.size", 10);        // Connection pool size
lookupConf.setInt("hbase.client.ipc.pool.type", 1);         // RoundRobin pool type

// Timeout settings for lookups
lookupConf.setLong("hbase.rpc.timeout", 5000);              // 5 second RPC timeout
lookupConf.setLong("hbase.client.operation.timeout", 10000); // 10 second operation timeout

// Scanner settings for better lookup performance
lookupConf.setInt("hbase.client.scanner.caching", 100);     // Scanner row caching
lookupConf.setBoolean("hbase.client.scanner.async.prefetch", false); // Disable prefetch for lookups

Lookup Caching

// Enable HBase client-side caching for frequently accessed data
Configuration cachedConf = new Configuration();
cachedConf.set("hbase.zookeeper.quorum", "localhost:2181");

// Enable block cache for better read performance
cachedConf.setBoolean("hbase.client.cache.enable", true);
cachedConf.setFloat("hbase.client.cache.size", 0.25f);      // 25% of heap for cache

// Configure region cache
cachedConf.setInt("hbase.client.meta.cache.size", 1000);    // Meta cache size
cachedConf.setLong("hbase.client.meta.cache.ttl", 60000);   // 1 minute TTL

Advanced Lookup Patterns

Multi-Key Lookups

// Schema for composite key lookups
HBaseTableSchema compositeKeySchema = new HBaseTableSchema();
compositeKeySchema.setRowKey("composite_key", String.class); // "userId:timestamp" format
compositeKeySchema.addColumn("data", "value", String.class);
compositeKeySchema.addColumn("data", "status", String.class);

// Custom lookup function for composite keys
public class CompositeKeyLookupFunction extends TableFunction<Row> {
    private HBaseLookupFunction baseLookupFunction;
    
    public CompositeKeyLookupFunction(Configuration conf, String tableName, 
                                    HBaseTableSchema schema) {
        this.baseLookupFunction = new HBaseLookupFunction(conf, tableName, schema);
    }
    
    public void eval(String userId, Long timestamp) {
        // Create composite key
        String compositeKey = userId + ":" + timestamp;
        
        // Delegate to base lookup function
        baseLookupFunction.eval(compositeKey);
        
        // Forward results
        // Note: This is conceptual - actual implementation would need to handle result collection
    }
}

Conditional Lookups

-- Lookup with conditions
SELECT 
    e.event_id,
    e.user_id,
    e.event_type,
    CASE 
        WHEN u.user_id IS NOT NULL THEN u.name
        ELSE 'Unknown User'
    END as user_name,
    CASE
        WHEN u.age >= 18 THEN 'Adult'
        ELSE 'Minor'
    END as age_category
FROM user_events e
LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;

Lookup with Data Transformation

// Custom lookup function with data transformation
public class TransformingLookupFunction extends TableFunction<Row> {
    private HBaseLookupFunction baseLookup;
    
    public void eval(String userId) {
        // Perform base lookup
        baseLookup.eval(userId);
        
        // Transform and emit results (conceptual)
        // In practice, this would involve collecting results from baseLookup
        // and transforming them before emitting
    }
    
    // Transform user data
    private Row transformUserData(Row originalRow) {
        String name = (String) originalRow.getField(1);
        Integer age = (Integer) originalRow.getField(3);
        String department = (String) originalRow.getField(4);
        
        // Add computed fields
        String displayName = formatDisplayName(name);
        String ageGroup = categorizeAge(age);
        String departmentCode = getDepartmentCode(department);
        
        return Row.of(
            originalRow.getField(0), // user_id
            displayName,
            originalRow.getField(2), // email
            ageGroup,
            departmentCode
        );
    }
}

Error Handling and Resilience

Lookup Failure Handling

// Configure retry and timeout behavior for lookups
Configuration resilientConf = new Configuration();
resilientConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");

// Retry configuration
resilientConf.setInt("hbase.client.retries.number", 5);     // Max 5 retries
resilientConf.setLong("hbase.client.pause", 1000);         // 1 second retry pause
resilientConf.setInt("hbase.client.rpc.retry.sleep", 100); // Base retry sleep

// Circuit breaker style configuration
resilientConf.setLong("hbase.client.operation.timeout", 30000); // 30 second timeout

// Create resilient lookup function
HBaseLookupFunction resilientLookup = new HBaseLookupFunction(
    resilientConf, "user_profiles", userProfileSchema);

Graceful Degradation

-- Handle lookup failures gracefully
SELECT 
    e.event_id,
    e.user_id,
    e.event_type,
    COALESCE(u.name, 'UNKNOWN') as user_name,
    COALESCE(u.email, 'no-email@domain.com') as user_email,
    COALESCE(u.department, 'UNASSIGNED') as department
FROM user_events e
LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;

Monitoring and Metrics

Lookup Performance Monitoring

// Custom lookup function with metrics
public class MonitoredLookupFunction extends TableFunction<Row> {
    private transient Counter lookupCount;
    private transient Counter lookupFailures;
    private transient Histogram lookupLatency;
    private HBaseLookupFunction delegate;
    
    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
        
        // Initialize metrics
        lookupCount = context.getMetricGroup().counter("lookup_count");
        lookupFailures = context.getMetricGroup().counter("lookup_failures");
        lookupLatency = context.getMetricGroup().histogram("lookup_latency");
        
        // Initialize delegate
        delegate = new HBaseLookupFunction(conf, tableName, schema);
        delegate.open(context);
    }
    
    public void eval(Object rowKey) {
        long startTime = System.currentTimeMillis();
        lookupCount.inc();
        
        try {
            delegate.eval(rowKey);
            lookupLatency.update(System.currentTimeMillis() - startTime);
        } catch (Exception e) {
            lookupFailures.inc();
            // Log error but don't fail the job
            LOG.warn("Lookup failed for key: {}", rowKey, e);
            // Emit empty result or default values
            collect(Row.of(rowKey, null, null, null, null));
        }
    }
}

Lookup Join Patterns

Dimension Table Enrichment

-- Enrich transaction events with customer information
CREATE TABLE transactions (
    transaction_id STRING,
    customer_id STRING,
    amount DECIMAL(10,2),
    transaction_time TIMESTAMP(3),
    proc_time AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'transactions'
);

CREATE TABLE customers (
    customer_id STRING,
    name STRING,
    tier STRING,
    region STRING,
    credit_limit DECIMAL(10,2)
) WITH (
    'connector' = 'hbase',
    'table-name' = 'customer_profiles',
    'zookeeper.quorum' = 'localhost:2181'
);

-- Enriched transaction stream
SELECT 
    t.transaction_id,
    t.customer_id,
    t.amount,
    c.name as customer_name,
    c.tier as customer_tier,
    c.region,
    CASE 
        WHEN t.amount > c.credit_limit THEN 'OVERLIMIT'
        ELSE 'NORMAL'
    END as transaction_status
FROM transactions t
JOIN customers FOR SYSTEM_TIME AS OF t.proc_time AS c
ON t.customer_id = c.customer_id;

Multi-Level Lookups

-- Multiple lookup joins for complex enrichment
CREATE TABLE events (
    event_id STRING,
    user_id STRING,
    product_id STRING,
    action STRING,
    event_time TIMESTAMP(3),
    proc_time AS PROCTIME()
) WITH ('connector' = 'kafka', 'topic' = 'user-events');

CREATE TABLE users (
    user_id STRING,
    name STRING,
    segment STRING
) WITH ('connector' = 'hbase', 'table-name' = 'users', 'zookeeper.quorum' = 'localhost:2181');

CREATE TABLE products (
    product_id STRING,
    name STRING,
    category STRING,
    price DECIMAL(10,2)
) WITH ('connector' = 'hbase', 'table-name' = 'products', 'zookeeper.quorum' = 'localhost:2181');

-- Multi-level enriched stream
SELECT 
    e.event_id,
    e.action,
    u.name as user_name,
    u.segment as user_segment,
    p.name as product_name,
    p.category as product_category,
    p.price as product_price
FROM events e
JOIN users FOR SYSTEM_TIME AS OF e.proc_time AS u ON e.user_id = u.user_id
JOIN products FOR SYSTEM_TIME AS OF e.proc_time AS p ON e.product_id = p.product_id;

Best Practices

Lookup Performance

  1. Use appropriate row key design: Ensure row keys are well-distributed for even load
  2. Configure proper timeouts: Set reasonable RPC and operation timeouts
  3. Enable connection pooling: Use connection pools for better resource utilization
  4. Monitor lookup latency: Track lookup performance with metrics
  5. Consider caching: Enable HBase client-side caching for hot data

Schema Design for Lookups

// Design schema for efficient lookups
HBaseTableSchema efficientLookupSchema = new HBaseTableSchema();

// Use meaningful row key
efficientLookupSchema.setRowKey("user_id", String.class);

// Group related data in same column family for locality
efficientLookupSchema.addColumn("profile", "name", String.class);
efficientLookupSchema.addColumn("profile", "email", String.class);
efficientLookupSchema.addColumn("profile", "department", String.class);

// Separate frequently changing data
efficientLookupSchema.addColumn("activity", "last_login", java.sql.Timestamp.class);
efficientLookupSchema.addColumn("activity", "login_count", Long.class);

// Use appropriate data types
efficientLookupSchema.addColumn("settings", "preferences", String.class); // JSON as string
efficientLookupSchema.addColumn("binary", "avatar", byte[].class);        // Binary data

Error Recovery

// Implement robust error handling
public class RobustLookupFunction extends TableFunction<Row> {
    private static final int MAX_RETRIES = 3;
    private static final long RETRY_DELAY_MS = 1000;
    
    public void eval(Object rowKey) {
        int attempts = 0;
        Exception lastException = null;
        
        while (attempts < MAX_RETRIES) {
            try {
                // Perform lookup
                performLookup(rowKey);
                return; // Success
            } catch (Exception e) {
                lastException = e;
                attempts++;
                
                if (attempts < MAX_RETRIES) {
                    try {
                        Thread.sleep(RETRY_DELAY_MS * attempts); // Exponential backoff
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
        
        // All retries failed - emit default/empty result
        LOG.error("Lookup failed after {} attempts for key: {}", MAX_RETRIES, rowKey, lastException);
        emitDefaultResult(rowKey);
    }
    
    private void emitDefaultResult(Object rowKey) {
        // Emit row with null values for missing data
        collect(Row.of(rowKey, null, null, null, null));
    }
}