CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-hbase-1-4-2-11

Apache Flink HBase 1.4 Connector provides bidirectional data integration between Flink streaming applications and HBase NoSQL database.

Pending
Overview
Eval results
Files

lookup-options.mddocs/

Lookup Options and Caching

Configuration for lookup join operations with caching, retry mechanisms, and async processing options in the Apache Flink HBase 1.4 Connector.

Capabilities

HBaseLookupOptions

Configuration class that encapsulates all lookup-related settings for optimizing dimension table joins and caching strategies.

/**
 * Options for HBase lookup operations
 * Provides configuration for caching, retries, and async processing
 */
@Internal
public class HBaseLookupOptions implements Serializable {
    
    /**
     * Returns the maximum number of entries in the lookup cache
     * @return Maximum cache size (-1 for unlimited, 0 for disabled)
     */
    public long getCacheMaxSize();
    
    /**
     * Returns the cache entry expiration time in milliseconds
     * @return Time-to-live in milliseconds (0 for no expiration)
     */
    public long getCacheExpireMs();
    
    /**
     * Returns the maximum number of retry attempts for failed lookups
     * @return Maximum retry count (default: 3)
     */
    public int getMaxRetryTimes();
    
    /**
     * Returns whether async lookup processing is enabled
     * @return true if async lookups are enabled (default: false)
     */
    public boolean getLookupAsync();
    
    /**
     * Creates a new builder for configuring lookup options
     * @return Builder instance for fluent configuration
     */
    public static Builder builder();
}

HBaseLookupOptions.Builder

Builder class providing fluent API for configuring lookup options with method chaining.

/**
 * Builder for HBaseLookupOptions using fluent interface pattern
 * Allows step-by-step configuration of all lookup parameters
 */
public static class Builder {
    
    /**
     * Sets the maximum cache size for lookup entries
     * @param cacheMaxSize Maximum number of entries to cache (-1 for unlimited, default: -1)
     * @return Builder instance for method chaining
     */
    public Builder setCacheMaxSize(long cacheMaxSize);
    
    /**
     * Sets the cache entry expiration time in milliseconds
     * @param cacheExpireMs Time-to-live in milliseconds (0 for no expiration, default: 0)
     * @return Builder instance for method chaining
     */
    public Builder setCacheExpireMs(long cacheExpireMs);
    
    /**
     * Sets the maximum number of retry attempts for failed lookups
     * @param maxRetryTimes Maximum retry count (default: 3)
     * @return Builder instance for method chaining
     */
    public Builder setMaxRetryTimes(int maxRetryTimes);
    
    /**
     * Sets whether to enable async lookup processing
     * @param lookupAsync true to enable async lookups (default: false)
     * @return Builder instance for method chaining
     */
    public Builder setLookupAsync(boolean lookupAsync);
    
    /**
     * Creates a new HBaseLookupOptions instance with configured settings
     * @return Configured HBaseLookupOptions instance
     */
    public HBaseLookupOptions build();
}

Usage Example:

// Example: High-performance lookup configuration with caching
HBaseLookupOptions cachedLookup = HBaseLookupOptions.builder()
    .setCacheMaxSize(100000)                         // Cache up to 100K entries
    .setCacheExpireMs(300000)                        // 5 minute TTL
    .setMaxRetryTimes(5)                             // 5 retry attempts
    .setLookupAsync(true)                            // Enable async processing
    .build();

// Example: Memory-efficient lookup configuration
HBaseLookupOptions memoryEfficient = HBaseLookupOptions.builder()
    .setCacheMaxSize(10000)                          // Smaller cache size
    .setCacheExpireMs(60000)                         // 1 minute TTL
    .setMaxRetryTimes(3)                             // Standard retry count
    .setLookupAsync(false)                           // Synchronous processing
    .build();

Lookup Join Operations

Dimension Table Pattern

HBase tables are commonly used as dimension tables in stream processing applications for data enrichment through lookup joins.

-- Example: User activity stream enriched with user profile data
CREATE TABLE user_activity_stream (
    user_id STRING,
    activity_type STRING,
    timestamp_val TIMESTAMP(3),
    proc_time AS PROCTIME()  -- Processing time for temporal join
) WITH (
    'connector' = 'kafka',
    'topic' = 'user-activities'
);

CREATE TABLE user_profiles (
    user_id STRING,
    profile ROW<name STRING, segment STRING, country STRING, created_date DATE>,
    preferences ROW<language STRING, timezone STRING, notifications BOOLEAN>,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'user_dim',
    'zookeeper.quorum' = 'localhost:2181',
    'lookup.cache.max-rows' = '50000',
    'lookup.cache.ttl' = '10min',
    'lookup.max-retries' = '3'
);

-- Enrichment query using temporal join
SELECT 
    a.user_id,
    a.activity_type,
    a.timestamp_val,
    u.profile.name,
    u.profile.segment,
    u.preferences.language
FROM user_activity_stream a
JOIN user_profiles FOR SYSTEM_TIME AS OF a.proc_time AS u
ON a.user_id = u.user_id;

Async Lookup Processing

Enable async lookup processing for improved performance with high-volume streams.

-- High-volume stream with async lookups
CREATE TABLE product_events (
    product_id STRING,
    event_data ROW<event_type STRING, value DOUBLE, user_id STRING>,
    proc_time AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'product-events'
);

CREATE TABLE product_catalog (
    product_id STRING,
    product_info ROW<name STRING, category STRING, price DECIMAL(10,2)>,
    inventory ROW<stock_level INT, warehouse_id STRING>,
    PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'products',
    'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
    'lookup.async' = 'true',                         -- Enable async lookups
    'lookup.cache.max-rows' = '200000',              -- Large cache for products
    'lookup.cache.ttl' = '30min',                    -- 30 minute cache TTL
    'lookup.max-retries' = '5'                       -- Higher retry count
);

Caching Strategies

Cache Size Configuration

Configure cache size based on data volume and memory constraints.

// Small dimension table (< 10K records)
HBaseLookupOptions smallDimension = HBaseLookupOptions.builder()
    .setCacheMaxSize(15000)                          // Cache all records plus buffer
    .setCacheExpireMs(3600000)                       // 1 hour TTL
    .build();

// Medium dimension table (10K-100K records)  
HBaseLookupOptions mediumDimension = HBaseLookupOptions.builder()
    .setCacheMaxSize(50000)                          // Cache hot subset
    .setCacheExpireMs(1800000)                       // 30 minute TTL
    .build();

// Large dimension table (> 100K records)
HBaseLookupOptions largeDimension = HBaseLookupOptions.builder()
    .setCacheMaxSize(100000)                         // Cache only hottest data
    .setCacheExpireMs(600000)                        // 10 minute TTL
    .build();

Cache TTL Strategies

Configure time-to-live based on data freshness requirements.

// Real-time data (frequent updates)
HBaseLookupOptions realTime = HBaseLookupOptions.builder()
    .setCacheMaxSize(20000)
    .setCacheExpireMs(60000)                         // 1 minute TTL
    .build();

// Reference data (infrequent updates)
HBaseLookupOptions reference = HBaseLookupOptions.builder()
    .setCacheMaxSize(100000)
    .setCacheExpireMs(7200000)                       // 2 hour TTL
    .build();

// Static data (rare updates)
HBaseLookupOptions staticData = HBaseLookupOptions.builder()
    .setCacheMaxSize(500000)
    .setCacheExpireMs(86400000)                      // 24 hour TTL
    .build();

Cache Eviction Policies

The lookup cache uses LRU (Least Recently Used) eviction when the maximum size is reached.

Cache Behavior:

  • Hit: Return cached value, update access time
  • Miss: Query HBase, cache result if space available
  • Eviction: Remove least recently accessed entries when cache is full
  • Expiration: Remove entries older than TTL regardless of access

Error Handling and Resilience

Retry Configuration

Configure retry behavior for handling transient HBase failures.

// Aggressive retry for critical lookups
HBaseLookupOptions criticalLookups = HBaseLookupOptions.builder()
    .setCacheMaxSize(25000)
    .setCacheExpireMs(300000)
    .setMaxRetryTimes(10)                            // High retry count
    .setLookupAsync(true)                            // Async for better resilience
    .build();

// Conservative retry for best-effort lookups
HBaseLookupOptions bestEffort = HBaseLookupOptions.builder()
    .setCacheMaxSize(10000)
    .setCacheExpireMs(120000)
    .setMaxRetryTimes(1)                             // Minimal retries
    .setLookupAsync(false)                           // Synchronous processing
    .build();

Failure Handling Strategies

Different approaches for handling lookup failures:

  1. Fail Fast: Fail the job on lookup errors (default behavior)
  2. Null Result: Return null for failed lookups (requires null-safe processing)
  3. Default Values: Return configured default values for failures
  4. Cache Fallback: Use stale cached values on HBase failures
-- Example: Graceful handling of lookup failures
SELECT 
    e.event_id,
    e.user_id,
    COALESCE(u.profile.name, 'Unknown User') as user_name,
    COALESCE(u.profile.segment, 'default') as user_segment
FROM events e
LEFT JOIN user_dim FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;

Performance Optimization

Cache Hit Rate Optimization

Monitor and optimize cache hit rates for maximum performance.

Key Metrics:

  • Cache hit rate (target: > 90% for most workloads)
  • Cache size utilization (target: 70-90% of max size)
  • Average lookup latency (target: < 10ms for cached, < 100ms for uncached)

Optimization Strategies:

// Hot data optimization - smaller cache, shorter TTL
HBaseLookupOptions hotData = HBaseLookupOptions.builder()
    .setCacheMaxSize(30000)                          // Size for working set
    .setCacheExpireMs(300000)                        // 5 minute TTL
    .setMaxRetryTimes(3)
    .build();

// Cold data optimization - larger cache, longer TTL  
HBaseLookupOptions coldData = HBaseLookupOptions.builder()
    .setCacheMaxSize(100000)                         // Large cache
    .setCacheExpireMs(3600000)                       // 1 hour TTL
    .setMaxRetryTimes(2)
    .build();

Memory Management

Balance cache size with available memory to avoid OOM errors.

Memory Estimation:

  • Average record size × cache max size = approximate memory usage
  • Include overhead for cache metadata and JVM objects
  • Reserve memory for other Flink operations
// Memory-constrained environment
HBaseLookupOptions memoryConstrained = HBaseLookupOptions.builder()
    .setCacheMaxSize(5000)                           // Small cache size
    .setCacheExpireMs(900000)                        // 15 minute TTL
    .setMaxRetryTimes(2)
    .setLookupAsync(false)                           // Reduce memory overhead
    .build();

SQL Configuration Examples

High-Performance Lookup Configuration

CREATE TABLE customer_dimension (
    customer_id STRING,
    customer_data ROW<
        name STRING,
        segment STRING, 
        country STRING,
        lifetime_value DECIMAL(12,2),
        created_date DATE
    >,
    preferences ROW<
        communication_channel STRING,
        language STRING,
        currency STRING
    >,
    PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'customers',
    'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
    'lookup.async' = 'true',                         -- Async processing
    'lookup.cache.max-rows' = '100000',              -- 100K cache entries
    'lookup.cache.ttl' = '15min',                    -- 15 minute TTL
    'lookup.max-retries' = '5'                       -- 5 retry attempts
);

Memory-Efficient Lookup Configuration

CREATE TABLE product_categories (
    category_id STRING,
    category_info ROW<name STRING, parent_id STRING, level INT>,
    PRIMARY KEY (category_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'categories',
    'zookeeper.quorum' = 'localhost:2181',
    'lookup.async' = 'false',                        -- Sync processing
    'lookup.cache.max-rows' = '5000',                -- Smaller cache
    'lookup.cache.ttl' = '1h',                       -- 1 hour TTL
    'lookup.max-retries' = '2'                       -- Fewer retries
);

Real-Time Lookup Configuration

CREATE TABLE real_time_prices (
    symbol STRING,
    price_data ROW<current_price DECIMAL(10,4), last_update TIMESTAMP(3)>,
    PRIMARY KEY (symbol) NOT ENFORCED
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'live_prices',
    'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
    'lookup.async' = 'true',                         -- Async for performance
    'lookup.cache.max-rows' = '10000',               -- Cache for active symbols
    'lookup.cache.ttl' = '30s',                      -- Short TTL for freshness
    'lookup.max-retries' = '3'                       -- Standard retry count
);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-hbase-1-4-2-11

docs

index.md

lookup-options.md

sink-operations.md

source-operations.md

table-factory.md

write-options.md

tile.json