Apache Flink HBase 1.4 Connector provides bidirectional data integration between Flink streaming applications and HBase NoSQL database.
—
Configuration for lookup join operations with caching, retry mechanisms, and async processing options in the Apache Flink HBase 1.4 Connector.
Configuration class that encapsulates all lookup-related settings for optimizing dimension table joins and caching strategies.
/**
* Options for HBase lookup operations
* Provides configuration for caching, retries, and async processing
*/
@Internal
public class HBaseLookupOptions implements Serializable {
/**
* Returns the maximum number of entries in the lookup cache
* @return Maximum cache size (-1 for unlimited, 0 for disabled)
*/
public long getCacheMaxSize();
/**
* Returns the cache entry expiration time in milliseconds
* @return Time-to-live in milliseconds (0 for no expiration)
*/
public long getCacheExpireMs();
/**
* Returns the maximum number of retry attempts for failed lookups
* @return Maximum retry count (default: 3)
*/
public int getMaxRetryTimes();
/**
* Returns whether async lookup processing is enabled
* @return true if async lookups are enabled (default: false)
*/
public boolean getLookupAsync();
/**
* Creates a new builder for configuring lookup options
* @return Builder instance for fluent configuration
*/
public static Builder builder();
}Builder class providing fluent API for configuring lookup options with method chaining.
/**
* Builder for HBaseLookupOptions using fluent interface pattern
* Allows step-by-step configuration of all lookup parameters
*/
public static class Builder {
/**
* Sets the maximum cache size for lookup entries
* @param cacheMaxSize Maximum number of entries to cache (-1 for unlimited, default: -1)
* @return Builder instance for method chaining
*/
public Builder setCacheMaxSize(long cacheMaxSize);
/**
* Sets the cache entry expiration time in milliseconds
* @param cacheExpireMs Time-to-live in milliseconds (0 for no expiration, default: 0)
* @return Builder instance for method chaining
*/
public Builder setCacheExpireMs(long cacheExpireMs);
/**
* Sets the maximum number of retry attempts for failed lookups
* @param maxRetryTimes Maximum retry count (default: 3)
* @return Builder instance for method chaining
*/
public Builder setMaxRetryTimes(int maxRetryTimes);
/**
* Sets whether to enable async lookup processing
* @param lookupAsync true to enable async lookups (default: false)
* @return Builder instance for method chaining
*/
public Builder setLookupAsync(boolean lookupAsync);
/**
* Creates a new HBaseLookupOptions instance with configured settings
* @return Configured HBaseLookupOptions instance
*/
public HBaseLookupOptions build();
}Usage Example:
// Example: High-performance lookup configuration with caching
HBaseLookupOptions cachedLookup = HBaseLookupOptions.builder()
.setCacheMaxSize(100000) // Cache up to 100K entries
.setCacheExpireMs(300000) // 5 minute TTL
.setMaxRetryTimes(5) // 5 retry attempts
.setLookupAsync(true) // Enable async processing
.build();
// Example: Memory-efficient lookup configuration
HBaseLookupOptions memoryEfficient = HBaseLookupOptions.builder()
.setCacheMaxSize(10000) // Smaller cache size
.setCacheExpireMs(60000) // 1 minute TTL
.setMaxRetryTimes(3) // Standard retry count
.setLookupAsync(false) // Synchronous processing
.build();HBase tables are commonly used as dimension tables in stream processing applications for data enrichment through lookup joins.
-- Example: User activity stream enriched with user profile data
CREATE TABLE user_activity_stream (
user_id STRING,
activity_type STRING,
timestamp_val TIMESTAMP(3),
proc_time AS PROCTIME() -- Processing time for temporal join
) WITH (
'connector' = 'kafka',
'topic' = 'user-activities'
);
CREATE TABLE user_profiles (
user_id STRING,
profile ROW<name STRING, segment STRING, country STRING, created_date DATE>,
preferences ROW<language STRING, timezone STRING, notifications BOOLEAN>,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'user_dim',
'zookeeper.quorum' = 'localhost:2181',
'lookup.cache.max-rows' = '50000',
'lookup.cache.ttl' = '10min',
'lookup.max-retries' = '3'
);
-- Enrichment query using temporal join
SELECT
a.user_id,
a.activity_type,
a.timestamp_val,
u.profile.name,
u.profile.segment,
u.preferences.language
FROM user_activity_stream a
JOIN user_profiles FOR SYSTEM_TIME AS OF a.proc_time AS u
ON a.user_id = u.user_id;Enable async lookup processing for improved performance with high-volume streams.
-- High-volume stream with async lookups
CREATE TABLE product_events (
product_id STRING,
event_data ROW<event_type STRING, value DOUBLE, user_id STRING>,
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'product-events'
);
CREATE TABLE product_catalog (
product_id STRING,
product_info ROW<name STRING, category STRING, price DECIMAL(10,2)>,
inventory ROW<stock_level INT, warehouse_id STRING>,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'products',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
'lookup.async' = 'true', -- Enable async lookups
'lookup.cache.max-rows' = '200000', -- Large cache for products
'lookup.cache.ttl' = '30min', -- 30 minute cache TTL
'lookup.max-retries' = '5' -- Higher retry count
);Configure cache size based on data volume and memory constraints.
// Small dimension table (< 10K records)
HBaseLookupOptions smallDimension = HBaseLookupOptions.builder()
.setCacheMaxSize(15000) // Cache all records plus buffer
.setCacheExpireMs(3600000) // 1 hour TTL
.build();
// Medium dimension table (10K-100K records)
HBaseLookupOptions mediumDimension = HBaseLookupOptions.builder()
.setCacheMaxSize(50000) // Cache hot subset
.setCacheExpireMs(1800000) // 30 minute TTL
.build();
// Large dimension table (> 100K records)
HBaseLookupOptions largeDimension = HBaseLookupOptions.builder()
.setCacheMaxSize(100000) // Cache only hottest data
.setCacheExpireMs(600000) // 10 minute TTL
.build();Configure time-to-live based on data freshness requirements.
// Real-time data (frequent updates)
HBaseLookupOptions realTime = HBaseLookupOptions.builder()
.setCacheMaxSize(20000)
.setCacheExpireMs(60000) // 1 minute TTL
.build();
// Reference data (infrequent updates)
HBaseLookupOptions reference = HBaseLookupOptions.builder()
.setCacheMaxSize(100000)
.setCacheExpireMs(7200000) // 2 hour TTL
.build();
// Static data (rare updates)
HBaseLookupOptions staticData = HBaseLookupOptions.builder()
.setCacheMaxSize(500000)
.setCacheExpireMs(86400000) // 24 hour TTL
.build();The lookup cache uses LRU (Least Recently Used) eviction when the maximum size is reached.
Cache Behavior:
Configure retry behavior for handling transient HBase failures.
// Aggressive retry for critical lookups
HBaseLookupOptions criticalLookups = HBaseLookupOptions.builder()
.setCacheMaxSize(25000)
.setCacheExpireMs(300000)
.setMaxRetryTimes(10) // High retry count
.setLookupAsync(true) // Async for better resilience
.build();
// Conservative retry for best-effort lookups
HBaseLookupOptions bestEffort = HBaseLookupOptions.builder()
.setCacheMaxSize(10000)
.setCacheExpireMs(120000)
.setMaxRetryTimes(1) // Minimal retries
.setLookupAsync(false) // Synchronous processing
.build();Different approaches for handling lookup failures:
-- Example: Graceful handling of lookup failures
SELECT
e.event_id,
e.user_id,
COALESCE(u.profile.name, 'Unknown User') as user_name,
COALESCE(u.profile.segment, 'default') as user_segment
FROM events e
LEFT JOIN user_dim FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;Monitor and optimize cache hit rates for maximum performance.
Key Metrics:
Optimization Strategies:
// Hot data optimization - smaller cache, shorter TTL
HBaseLookupOptions hotData = HBaseLookupOptions.builder()
.setCacheMaxSize(30000) // Size for working set
.setCacheExpireMs(300000) // 5 minute TTL
.setMaxRetryTimes(3)
.build();
// Cold data optimization - larger cache, longer TTL
HBaseLookupOptions coldData = HBaseLookupOptions.builder()
.setCacheMaxSize(100000) // Large cache
.setCacheExpireMs(3600000) // 1 hour TTL
.setMaxRetryTimes(2)
.build();Balance cache size with available memory to avoid OOM errors.
Memory Estimation:
// Memory-constrained environment
HBaseLookupOptions memoryConstrained = HBaseLookupOptions.builder()
.setCacheMaxSize(5000) // Small cache size
.setCacheExpireMs(900000) // 15 minute TTL
.setMaxRetryTimes(2)
.setLookupAsync(false) // Reduce memory overhead
.build();CREATE TABLE customer_dimension (
customer_id STRING,
customer_data ROW<
name STRING,
segment STRING,
country STRING,
lifetime_value DECIMAL(12,2),
created_date DATE
>,
preferences ROW<
communication_channel STRING,
language STRING,
currency STRING
>,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'customers',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
'lookup.async' = 'true', -- Async processing
'lookup.cache.max-rows' = '100000', -- 100K cache entries
'lookup.cache.ttl' = '15min', -- 15 minute TTL
'lookup.max-retries' = '5' -- 5 retry attempts
);CREATE TABLE product_categories (
category_id STRING,
category_info ROW<name STRING, parent_id STRING, level INT>,
PRIMARY KEY (category_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'categories',
'zookeeper.quorum' = 'localhost:2181',
'lookup.async' = 'false', -- Sync processing
'lookup.cache.max-rows' = '5000', -- Smaller cache
'lookup.cache.ttl' = '1h', -- 1 hour TTL
'lookup.max-retries' = '2' -- Fewer retries
);CREATE TABLE real_time_prices (
symbol STRING,
price_data ROW<current_price DECIMAL(10,4), last_update TIMESTAMP(3)>,
PRIMARY KEY (symbol) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'live_prices',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
'lookup.async' = 'true', -- Async for performance
'lookup.cache.max-rows' = '10000', -- Cache for active symbols
'lookup.cache.ttl' = '30s', -- Short TTL for freshness
'lookup.max-retries' = '3' -- Standard retry count
);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-hbase-1-4-2-11