Apache Flink SQL connector that enables seamless integration with HBase 2.2.x databases through Flink's Table API and SQL interface
—
Temporal table join functionality with both synchronous and asynchronous lookup modes, including comprehensive caching strategies for performance optimization.
Performs lookups against HBase table for enriching streaming data using temporal join semantics.
SELECT
stream_table.field,
hbase_table.column_family.qualifier
FROM stream_table s
JOIN hbase_table FOR SYSTEM_TIME AS OF s.proc_time AS h
ON s.lookup_key = h.rowkey
WHERE additional_conditions;Requirements:
FOR SYSTEM_TIME AS OF syntaxproc_time) must be available in streamUsage Examples:
-- User profile enrichment
SELECT
event.user_id,
event.action,
event.timestamp,
profile.info.name AS user_name,
profile.info.email AS user_email,
profile.preferences.language
FROM user_events event
JOIN user_profiles FOR SYSTEM_TIME AS OF event.proc_time AS profile
ON event.user_id = profile.user_id;
-- Product catalog lookup for sales
SELECT
sale.transaction_id,
sale.quantity,
catalog.basic_info.name AS product_name,
catalog.basic_info.price AS unit_price,
catalog.basic_info.price * sale.quantity AS total_amount
FROM sales_stream sale
JOIN product_catalog FOR SYSTEM_TIME AS OF sale.proc_time AS catalog
ON sale.product_id = catalog.product_id
WHERE catalog.inventory.stock_count > 0;Default lookup mode that performs blocking lookups with immediate results.
WITH (
'lookup.async' = 'false' -- Synchronous mode (default)
)Characteristics:
Usage Examples:
-- Synchronous lookup table for reference data
CREATE TABLE reference_lookup (
code STRING,
reference ROW<description STRING, category STRING, active BOOLEAN>,
PRIMARY KEY (code) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'reference_codes',
'zookeeper.quorum' = 'localhost:2181',
'lookup.async' = 'false',
'lookup.max-retries' = '3'
);
-- Using synchronous lookup in join
SELECT
transaction.id,
transaction.amount,
ref.reference.description AS transaction_type
FROM transaction_stream transaction
JOIN reference_lookup FOR SYSTEM_TIME AS OF transaction.proc_time AS ref
ON transaction.type_code = ref.code;High-performance lookup mode using non-blocking operations for improved throughput.
WITH (
'lookup.async' = 'true' -- Asynchronous mode
)Characteristics:
Usage Examples:
-- High-throughput async lookup table
CREATE TABLE async_user_lookup (
user_id STRING,
profile ROW<name STRING, email STRING, tier STRING>,
activity ROW<last_login TIMESTAMP(3), total_orders INT>,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'user_profiles',
'zookeeper.quorum' = 'localhost:2181',
'lookup.async' = 'true',
'lookup.max-retries' = '5'
);
-- Using async lookup for high-volume stream
SELECT
click.event_id,
click.page_url,
click.timestamp,
user.profile.name AS user_name,
user.profile.tier AS user_tier
FROM clickstream click
JOIN async_user_lookup FOR SYSTEM_TIME AS OF click.proc_time AS user
ON click.user_id = user.user_id;Caching strategies to reduce HBase lookup load and improve performance.
WITH (
'lookup.cache.max-rows' = '10000', -- Maximum cached entries
'lookup.cache.ttl' = '300s' -- Cache time-to-live
)Parameters:
lookup.cache.max-rows: Maximum number of lookup results to cache (-1 disables caching)lookup.cache.ttl: Cache entry expiration time (0 means no expiration)Cache Behavior:
Usage Examples:
-- Long-lived reference data with large cache
CREATE TABLE reference_cache (
country_code STRING,
info ROW<name STRING, currency STRING, timezone STRING>,
PRIMARY KEY (country_code) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'country_reference',
'zookeeper.quorum' = 'localhost:2181',
'lookup.cache.max-rows' = '50000',
'lookup.cache.ttl' = '3600s' -- 1 hour cache
);
-- Frequently changing data with short cache
CREATE TABLE price_cache (
symbol STRING,
pricing ROW<current_price DECIMAL(10,4), last_update TIMESTAMP(3)>,
PRIMARY KEY (symbol) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'stock_prices',
'zookeeper.quorum' = 'localhost:2181',
'lookup.cache.max-rows' = '1000',
'lookup.cache.ttl' = '30s' -- 30 second cache
);Error handling and retry logic for failed lookup operations.
WITH (
'lookup.max-retries' = '3' -- Maximum retry attempts (default: 3)
)Retry Behavior:
Usage Examples:
-- Robust lookup with extensive retry
CREATE TABLE robust_lookup (
id STRING,
data ROW<value STRING, timestamp BIGINT>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'critical_data',
'zookeeper.quorum' = 'localhost:2181',
'lookup.max-retries' = '10' -- High retry count for critical data
);
-- Fast-fail lookup for non-critical data
CREATE TABLE optional_lookup (
ref_id STRING,
optional_data ROW<description STRING>,
PRIMARY KEY (ref_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'optional_reference',
'zookeeper.quorum' = 'localhost:2181',
'lookup.max-retries' = '1' -- Minimal retry for optional data
);Strategies for optimizing lookup performance in different scenarios.
High Cache Hit Rate Scenario:
-- Optimize for frequently accessed reference data
CREATE TABLE frequent_lookup (
key STRING,
data ROW<value STRING>,
PRIMARY KEY (key) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'frequent_data',
'zookeeper.quorum' = 'localhost:2181',
'lookup.async' = 'true', -- Async for high throughput
'lookup.cache.max-rows' = '100000', -- Large cache
'lookup.cache.ttl' = '1800s', -- 30 min TTL
'lookup.max-retries' = '3'
);Low Cache Hit Rate Scenario:
-- Optimize for unique lookups with minimal caching
CREATE TABLE unique_lookup (
uuid STRING,
session_data ROW<user_id STRING, created_at TIMESTAMP(3)>,
PRIMARY KEY (uuid) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'session_store',
'zookeeper.quorum' = 'localhost:2181',
'lookup.async' = 'true', -- Async for throughput
'lookup.cache.max-rows' = '1000', -- Small cache
'lookup.cache.ttl' = '60s', -- Short TTL
'lookup.max-retries' = '5' -- More retries for network issues
);Common Issues and Solutions:
High Lookup Latency:
'lookup.async' = 'true''lookup.cache.max-rows' = '50000'Cache Miss Rate:
Connection Failures:
'lookup.max-retries' = '10'Memory Issues:
'lookup.cache.max-rows' = '1000'Monitoring SQL Example:
-- Monitor lookup join performance
SELECT
window_start,
window_end,
COUNT(*) AS total_lookups,
COUNT(lookup_table.rowkey) AS successful_lookups,
(COUNT(lookup_table.rowkey) * 100.0 / COUNT(*)) AS success_rate
FROM TABLE(
TUMBLE(TABLE stream_with_lookups, DESCRIPTOR(proc_time), INTERVAL '1' MINUTE)
) s
LEFT JOIN lookup_table FOR SYSTEM_TIME AS OF s.proc_time AS lookup_table
ON s.lookup_key = lookup_table.rowkey
GROUP BY window_start, window_end;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hbase-2-2-2-11