CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-connector-hbase-2-2-2-11

Apache Flink SQL connector that enables seamless integration with HBase 2.2.x databases through Flink's Table API and SQL interface

Pending
Overview
Eval results
Files

lookup-operations.mddocs/

Lookup Operations and Caching

Temporal table join functionality with both synchronous and asynchronous lookup modes, including comprehensive caching strategies for performance optimization.

Capabilities

Temporal Table Joins

Performs lookups against HBase table for enriching streaming data using temporal join semantics.

SELECT 
    stream_table.field,
    hbase_table.column_family.qualifier
FROM stream_table s
JOIN hbase_table FOR SYSTEM_TIME AS OF s.proc_time AS h
ON s.lookup_key = h.rowkey
WHERE additional_conditions;

Requirements:

  • Lookup join must use FOR SYSTEM_TIME AS OF syntax
  • Join condition must be based on row key equality
  • Processing time field (proc_time) must be available in stream

Usage Examples:

-- User profile enrichment
SELECT 
    event.user_id,
    event.action,
    event.timestamp,
    profile.info.name AS user_name,
    profile.info.email AS user_email,
    profile.preferences.language
FROM user_events event
JOIN user_profiles FOR SYSTEM_TIME AS OF event.proc_time AS profile
ON event.user_id = profile.user_id;

-- Product catalog lookup for sales
SELECT 
    sale.transaction_id,
    sale.quantity,
    catalog.basic_info.name AS product_name,
    catalog.basic_info.price AS unit_price,
    catalog.basic_info.price * sale.quantity AS total_amount
FROM sales_stream sale
JOIN product_catalog FOR SYSTEM_TIME AS OF sale.proc_time AS catalog
ON sale.product_id = catalog.product_id
WHERE catalog.inventory.stock_count > 0;

Synchronous Lookup Mode

Default lookup mode that performs blocking lookups with immediate results.

WITH (
    'lookup.async' = 'false'                       -- Synchronous mode (default)
)

Characteristics:

  • Blocking operation that waits for HBase response
  • Lower throughput but predictable latency
  • Simpler error handling and debugging
  • Suitable for low-to-medium volume streams

Usage Examples:

-- Synchronous lookup table for reference data
CREATE TABLE reference_lookup (
    code STRING,
    reference ROW<description STRING, category STRING, active BOOLEAN>,
    PRIMARY KEY (code) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'reference_codes',
    'zookeeper.quorum' = 'localhost:2181',
    'lookup.async' = 'false',
    'lookup.max-retries' = '3'
);

-- Using synchronous lookup in join
SELECT 
    transaction.id,
    transaction.amount,
    ref.reference.description AS transaction_type
FROM transaction_stream transaction
JOIN reference_lookup FOR SYSTEM_TIME AS OF transaction.proc_time AS ref
ON transaction.type_code = ref.code;

Asynchronous Lookup Mode

High-performance lookup mode using non-blocking operations for improved throughput.

WITH (
    'lookup.async' = 'true'                        -- Asynchronous mode
)

Characteristics:

  • Non-blocking operations with callback-based results
  • Higher throughput for high-volume streams
  • More complex error handling
  • Requires proper backpressure management

Usage Examples:

-- High-throughput async lookup table
CREATE TABLE async_user_lookup (
    user_id STRING,
    profile ROW<name STRING, email STRING, tier STRING>,
    activity ROW<last_login TIMESTAMP(3), total_orders INT>,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'user_profiles',
    'zookeeper.quorum' = 'localhost:2181',
    'lookup.async' = 'true',
    'lookup.max-retries' = '5'
);

-- Using async lookup for high-volume stream
SELECT 
    click.event_id,
    click.page_url,
    click.timestamp,
    user.profile.name AS user_name,
    user.profile.tier AS user_tier
FROM clickstream click
JOIN async_user_lookup FOR SYSTEM_TIME AS OF click.proc_time AS user
ON click.user_id = user.user_id;

Caching Configuration

Caching strategies to reduce HBase lookup load and improve performance.

WITH (
    'lookup.cache.max-rows' = '10000',             -- Maximum cached entries
    'lookup.cache.ttl' = '300s'                    -- Cache time-to-live
)

Parameters:

  • lookup.cache.max-rows: Maximum number of lookup results to cache (-1 disables caching)
  • lookup.cache.ttl: Cache entry expiration time (0 means no expiration)

Cache Behavior:

  • LRU (Least Recently Used) eviction when cache is full
  • TTL-based expiration for data freshness
  • Cache is per lookup function instance (per task)
  • Memory usage proportional to cache size and record size

Usage Examples:

-- Long-lived reference data with large cache
CREATE TABLE reference_cache (
    country_code STRING,
    info ROW<name STRING, currency STRING, timezone STRING>,
    PRIMARY KEY (country_code) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'country_reference',
    'zookeeper.quorum' = 'localhost:2181',
    'lookup.cache.max-rows' = '50000',
    'lookup.cache.ttl' = '3600s'  -- 1 hour cache
);

-- Frequently changing data with short cache
CREATE TABLE price_cache (
    symbol STRING,
    pricing ROW<current_price DECIMAL(10,4), last_update TIMESTAMP(3)>,
    PRIMARY KEY (symbol) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'stock_prices',
    'zookeeper.quorum' = 'localhost:2181',
    'lookup.cache.max-rows' = '1000',
    'lookup.cache.ttl' = '30s'  -- 30 second cache
);

Retry Configuration

Error handling and retry logic for failed lookup operations.

WITH (
    'lookup.max-retries' = '3'                     -- Maximum retry attempts (default: 3)
)

Retry Behavior:

  • Exponential backoff between retry attempts
  • Retries are attempted for transient failures (connection issues, timeouts)
  • Non-retryable errors (table not found, invalid row key) fail immediately
  • Failed lookups after all retries result in null values

Usage Examples:

-- Robust lookup with extensive retry
CREATE TABLE robust_lookup (
    id STRING,
    data ROW<value STRING, timestamp BIGINT>,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'critical_data',
    'zookeeper.quorum' = 'localhost:2181',
    'lookup.max-retries' = '10'  -- High retry count for critical data
);

-- Fast-fail lookup for non-critical data
CREATE TABLE optional_lookup (
    ref_id STRING,
    optional_data ROW<description STRING>,
    PRIMARY KEY (ref_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'optional_reference',
    'zookeeper.quorum' = 'localhost:2181',
    'lookup.max-retries' = '1'   -- Minimal retry for optional data
);

Lookup Performance Optimization

Strategies for optimizing lookup performance in different scenarios.

High Cache Hit Rate Scenario:

-- Optimize for frequently accessed reference data
CREATE TABLE frequent_lookup (
    key STRING,
    data ROW<value STRING>,
    PRIMARY KEY (key) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'frequent_data',
    'zookeeper.quorum' = 'localhost:2181',
    'lookup.async' = 'true',           -- Async for high throughput
    'lookup.cache.max-rows' = '100000', -- Large cache
    'lookup.cache.ttl' = '1800s',      -- 30 min TTL
    'lookup.max-retries' = '3'
);

Low Cache Hit Rate Scenario:

-- Optimize for unique lookups with minimal caching
CREATE TABLE unique_lookup (
    uuid STRING,
    session_data ROW<user_id STRING, created_at TIMESTAMP(3)>,
    PRIMARY KEY (uuid) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'session_store',
    'zookeeper.quorum' = 'localhost:2181',
    'lookup.async' = 'true',           -- Async for throughput
    'lookup.cache.max-rows' = '1000',  -- Small cache
    'lookup.cache.ttl' = '60s',        -- Short TTL
    'lookup.max-retries' = '5'         -- More retries for network issues
);

Lookup Monitoring and Troubleshooting

Common Issues and Solutions:

  1. High Lookup Latency:

    • Enable async mode: 'lookup.async' = 'true'
    • Increase cache size: 'lookup.cache.max-rows' = '50000'
    • Optimize HBase region distribution
  2. Cache Miss Rate:

    • Analyze lookup key distribution
    • Adjust cache size based on working set
    • Consider TTL reduction for frequently changing data
  3. Connection Failures:

    • Increase retry count: 'lookup.max-retries' = '10'
    • Verify Zookeeper connectivity
    • Check HBase region server health
  4. Memory Issues:

    • Reduce cache size: 'lookup.cache.max-rows' = '1000'
    • Monitor task heap usage
    • Consider cache TTL reduction

Monitoring SQL Example:

-- Monitor lookup join performance
SELECT 
    window_start,
    window_end,
    COUNT(*) AS total_lookups,
    COUNT(lookup_table.rowkey) AS successful_lookups,
    (COUNT(lookup_table.rowkey) * 100.0 / COUNT(*)) AS success_rate
FROM TABLE(
    TUMBLE(TABLE stream_with_lookups, DESCRIPTOR(proc_time), INTERVAL '1' MINUTE)
) s
LEFT JOIN lookup_table FOR SYSTEM_TIME AS OF s.proc_time AS lookup_table
    ON s.lookup_key = lookup_table.rowkey
GROUP BY window_start, window_end;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hbase-2-2-2-11

docs

connection-config.md

data-types.md

index.md

lookup-operations.md

sink-operations.md

sql-ddl.md

tile.json