The HBase connector provides lookup function capabilities for temporal table joins, enabling real-time enrichment of streaming data with dimension data stored in HBase. This is essential for joining fast-changing stream data with slowly-changing dimension tables.
A table function that performs lookups in HBase tables for temporal joins in Flink's Table API.
class HBaseLookupFunction extends TableFunction<Row> {
public HBaseLookupFunction(Configuration configuration, String hTableName,
HBaseTableSchema hbaseTableSchema);
// Core lookup method
public void eval(Object rowKey);
// Function lifecycle
public void open(FunctionContext context) throws Exception;
public void close() throws Exception;
// Type information
public TypeInformation<Row> getResultType();
}import org.apache.flink.addons.hbase.HBaseLookupFunction;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.addons.hbase.HBaseTableSource;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.hadoop.conf.Configuration;
// Configure HBase connection
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "localhost:2181");
// Define dimension table schema
HBaseTableSchema userProfileSchema = new HBaseTableSchema();
userProfileSchema.setRowKey("user_id", String.class);
userProfileSchema.addColumn("profile", "name", String.class);
userProfileSchema.addColumn("profile", "email", String.class);
userProfileSchema.addColumn("profile", "age", Integer.class);
userProfileSchema.addColumn("profile", "department", String.class);
// Create table source with lookup capability
HBaseTableSource userProfileSource = new HBaseTableSource(conf, "user_profiles");
userProfileSource.setRowKey("user_id", String.class);
userProfileSource.addColumn("profile", "name", String.class);
userProfileSource.addColumn("profile", "email", String.class);
userProfileSource.addColumn("profile", "age", Integer.class);
userProfileSource.addColumn("profile", "department", String.class);
// Register as temporal table
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
tableEnv.registerTableSource("user_profiles", userProfileSource);-- Create the main event stream table
CREATE TABLE user_events (
event_id STRING,
user_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
event_value DOUBLE,
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- Create HBase lookup table
CREATE TABLE user_profiles (
user_id STRING,
name STRING,
email STRING,
age INT,
department STRING
) WITH (
'connector' = 'hbase',
'table-name' = 'user_profiles',
'zookeeper.quorum' = 'localhost:2181'
);
-- Perform temporal join (lookup)
SELECT
e.event_id,
e.user_id,
e.event_type,
e.event_value,
u.name,
u.email,
u.department
FROM user_events e
JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.functions.TableFunction;
// Create lookup function directly
HBaseLookupFunction lookupFunction = new HBaseLookupFunction(
conf, "user_profiles", userProfileSchema);
// Register as user-defined function
tableEnv.registerFunction("lookup_user", lookupFunction);
// Use in SQL query
Table enrichedEvents = tableEnv.sqlQuery(
"SELECT " +
" e.event_id, " +
" e.user_id, " +
" e.event_type, " +
" u.name, " +
" u.email " +
"FROM events e, " +
"LATERAL TABLE(lookup_user(e.user_id)) AS u(user_id, name, email, age, department)"
);The lookup function automatically manages HBase connections and implements connection pooling for better performance:
// Configure HBase client for lookup performance
Configuration lookupConf = new Configuration();
lookupConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
// Connection pool settings
lookupConf.setInt("hbase.client.ipc.pool.size", 10); // Connection pool size
lookupConf.setInt("hbase.client.ipc.pool.type", 1); // RoundRobin pool type
// Timeout settings for lookups
lookupConf.setLong("hbase.rpc.timeout", 5000); // 5 second RPC timeout
lookupConf.setLong("hbase.client.operation.timeout", 10000); // 10 second operation timeout
// Scanner settings for better lookup performance
lookupConf.setInt("hbase.client.scanner.caching", 100); // Scanner row caching
lookupConf.setBoolean("hbase.client.scanner.async.prefetch", false); // Disable prefetch for lookups// Enable HBase client-side caching for frequently accessed data
Configuration cachedConf = new Configuration();
cachedConf.set("hbase.zookeeper.quorum", "localhost:2181");
// Enable block cache for better read performance
cachedConf.setBoolean("hbase.client.cache.enable", true);
cachedConf.setFloat("hbase.client.cache.size", 0.25f); // 25% of heap for cache
// Configure region cache
cachedConf.setInt("hbase.client.meta.cache.size", 1000); // Meta cache size
cachedConf.setLong("hbase.client.meta.cache.ttl", 60000); // 1 minute TTL// Schema for composite key lookups
HBaseTableSchema compositeKeySchema = new HBaseTableSchema();
compositeKeySchema.setRowKey("composite_key", String.class); // "userId:timestamp" format
compositeKeySchema.addColumn("data", "value", String.class);
compositeKeySchema.addColumn("data", "status", String.class);
// Custom lookup function for composite keys
public class CompositeKeyLookupFunction extends TableFunction<Row> {
private HBaseLookupFunction baseLookupFunction;
public CompositeKeyLookupFunction(Configuration conf, String tableName,
HBaseTableSchema schema) {
this.baseLookupFunction = new HBaseLookupFunction(conf, tableName, schema);
}
public void eval(String userId, Long timestamp) {
// Create composite key
String compositeKey = userId + ":" + timestamp;
// Delegate to base lookup function
baseLookupFunction.eval(compositeKey);
// Forward results
// Note: This is conceptual - actual implementation would need to handle result collection
}
}-- Lookup with conditions
SELECT
e.event_id,
e.user_id,
e.event_type,
CASE
WHEN u.user_id IS NOT NULL THEN u.name
ELSE 'Unknown User'
END as user_name,
CASE
WHEN u.age >= 18 THEN 'Adult'
ELSE 'Minor'
END as age_category
FROM user_events e
LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;// Custom lookup function with data transformation
public class TransformingLookupFunction extends TableFunction<Row> {
private HBaseLookupFunction baseLookup;
public void eval(String userId) {
// Perform base lookup
baseLookup.eval(userId);
// Transform and emit results (conceptual)
// In practice, this would involve collecting results from baseLookup
// and transforming them before emitting
}
// Transform user data
private Row transformUserData(Row originalRow) {
String name = (String) originalRow.getField(1);
Integer age = (Integer) originalRow.getField(3);
String department = (String) originalRow.getField(4);
// Add computed fields
String displayName = formatDisplayName(name);
String ageGroup = categorizeAge(age);
String departmentCode = getDepartmentCode(department);
return Row.of(
originalRow.getField(0), // user_id
displayName,
originalRow.getField(2), // email
ageGroup,
departmentCode
);
}
}// Configure retry and timeout behavior for lookups
Configuration resilientConf = new Configuration();
resilientConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
// Retry configuration
resilientConf.setInt("hbase.client.retries.number", 5); // Max 5 retries
resilientConf.setLong("hbase.client.pause", 1000); // 1 second retry pause
resilientConf.setInt("hbase.client.rpc.retry.sleep", 100); // Base retry sleep
// Circuit breaker style configuration
resilientConf.setLong("hbase.client.operation.timeout", 30000); // 30 second timeout
// Create resilient lookup function
HBaseLookupFunction resilientLookup = new HBaseLookupFunction(
resilientConf, "user_profiles", userProfileSchema);-- Handle lookup failures gracefully
SELECT
e.event_id,
e.user_id,
e.event_type,
COALESCE(u.name, 'UNKNOWN') as user_name,
COALESCE(u.email, 'no-email@domain.com') as user_email,
COALESCE(u.department, 'UNASSIGNED') as department
FROM user_events e
LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;// Custom lookup function with metrics
public class MonitoredLookupFunction extends TableFunction<Row> {
private transient Counter lookupCount;
private transient Counter lookupFailures;
private transient Histogram lookupLatency;
private HBaseLookupFunction delegate;
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
// Initialize metrics
lookupCount = context.getMetricGroup().counter("lookup_count");
lookupFailures = context.getMetricGroup().counter("lookup_failures");
lookupLatency = context.getMetricGroup().histogram("lookup_latency");
// Initialize delegate
delegate = new HBaseLookupFunction(conf, tableName, schema);
delegate.open(context);
}
public void eval(Object rowKey) {
long startTime = System.currentTimeMillis();
lookupCount.inc();
try {
delegate.eval(rowKey);
lookupLatency.update(System.currentTimeMillis() - startTime);
} catch (Exception e) {
lookupFailures.inc();
// Log error but don't fail the job
LOG.warn("Lookup failed for key: {}", rowKey, e);
// Emit empty result or default values
collect(Row.of(rowKey, null, null, null, null));
}
}
}-- Enrich transaction events with customer information
CREATE TABLE transactions (
transaction_id STRING,
customer_id STRING,
amount DECIMAL(10,2),
transaction_time TIMESTAMP(3),
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'transactions'
);
CREATE TABLE customers (
customer_id STRING,
name STRING,
tier STRING,
region STRING,
credit_limit DECIMAL(10,2)
) WITH (
'connector' = 'hbase',
'table-name' = 'customer_profiles',
'zookeeper.quorum' = 'localhost:2181'
);
-- Enriched transaction stream
SELECT
t.transaction_id,
t.customer_id,
t.amount,
c.name as customer_name,
c.tier as customer_tier,
c.region,
CASE
WHEN t.amount > c.credit_limit THEN 'OVERLIMIT'
ELSE 'NORMAL'
END as transaction_status
FROM transactions t
JOIN customers FOR SYSTEM_TIME AS OF t.proc_time AS c
ON t.customer_id = c.customer_id;-- Multiple lookup joins for complex enrichment
CREATE TABLE events (
event_id STRING,
user_id STRING,
product_id STRING,
action STRING,
event_time TIMESTAMP(3),
proc_time AS PROCTIME()
) WITH ('connector' = 'kafka', 'topic' = 'user-events');
CREATE TABLE users (
user_id STRING,
name STRING,
segment STRING
) WITH ('connector' = 'hbase', 'table-name' = 'users', 'zookeeper.quorum' = 'localhost:2181');
CREATE TABLE products (
product_id STRING,
name STRING,
category STRING,
price DECIMAL(10,2)
) WITH ('connector' = 'hbase', 'table-name' = 'products', 'zookeeper.quorum' = 'localhost:2181');
-- Multi-level enriched stream
SELECT
e.event_id,
e.action,
u.name as user_name,
u.segment as user_segment,
p.name as product_name,
p.category as product_category,
p.price as product_price
FROM events e
JOIN users FOR SYSTEM_TIME AS OF e.proc_time AS u ON e.user_id = u.user_id
JOIN products FOR SYSTEM_TIME AS OF e.proc_time AS p ON e.product_id = p.product_id;// Design schema for efficient lookups
HBaseTableSchema efficientLookupSchema = new HBaseTableSchema();
// Use meaningful row key
efficientLookupSchema.setRowKey("user_id", String.class);
// Group related data in same column family for locality
efficientLookupSchema.addColumn("profile", "name", String.class);
efficientLookupSchema.addColumn("profile", "email", String.class);
efficientLookupSchema.addColumn("profile", "department", String.class);
// Separate frequently changing data
efficientLookupSchema.addColumn("activity", "last_login", java.sql.Timestamp.class);
efficientLookupSchema.addColumn("activity", "login_count", Long.class);
// Use appropriate data types
efficientLookupSchema.addColumn("settings", "preferences", String.class); // JSON as string
efficientLookupSchema.addColumn("binary", "avatar", byte[].class); // Binary data// Implement robust error handling
public class RobustLookupFunction extends TableFunction<Row> {
private static final int MAX_RETRIES = 3;
private static final long RETRY_DELAY_MS = 1000;
public void eval(Object rowKey) {
int attempts = 0;
Exception lastException = null;
while (attempts < MAX_RETRIES) {
try {
// Perform lookup
performLookup(rowKey);
return; // Success
} catch (Exception e) {
lastException = e;
attempts++;
if (attempts < MAX_RETRIES) {
try {
Thread.sleep(RETRY_DELAY_MS * attempts); // Exponential backoff
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
// All retries failed - emit default/empty result
LOG.error("Lookup failed after {} attempts for key: {}", MAX_RETRIES, rowKey, lastException);
emitDefaultResult(rowKey);
}
private void emitDefaultResult(Object rowKey) {
// Emit row with null values for missing data
collect(Row.of(rowKey, null, null, null, null));
}
}