Apache Flink HBase connector library that enables seamless integration between Flink streaming and batch processing applications with Apache HBase.
The HBase connector provides lookup function capabilities for temporal table joins, enabling real-time enrichment of streaming data with dimension data stored in HBase. This is essential for joining fast-changing stream data with slowly-changing dimension tables.
A table function that performs lookups in HBase tables for temporal joins in Flink's Table API.
class HBaseLookupFunction extends TableFunction<Row> {
public HBaseLookupFunction(Configuration configuration, String hTableName,
HBaseTableSchema hbaseTableSchema);
// Core lookup method
public void eval(Object rowKey);
// Function lifecycle
public void open(FunctionContext context) throws Exception;
public void close() throws Exception;
// Type information
public TypeInformation<Row> getResultType();
}import org.apache.flink.addons.hbase.HBaseLookupFunction;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.addons.hbase.HBaseTableSource;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.hadoop.conf.Configuration;
// Configure HBase connection
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "localhost:2181");
// Define dimension table schema
HBaseTableSchema userProfileSchema = new HBaseTableSchema();
userProfileSchema.setRowKey("user_id", String.class);
userProfileSchema.addColumn("profile", "name", String.class);
userProfileSchema.addColumn("profile", "email", String.class);
userProfileSchema.addColumn("profile", "age", Integer.class);
userProfileSchema.addColumn("profile", "department", String.class);
// Create table source with lookup capability
HBaseTableSource userProfileSource = new HBaseTableSource(conf, "user_profiles");
userProfileSource.setRowKey("user_id", String.class);
userProfileSource.addColumn("profile", "name", String.class);
userProfileSource.addColumn("profile", "email", String.class);
userProfileSource.addColumn("profile", "age", Integer.class);
userProfileSource.addColumn("profile", "department", String.class);
// Register as temporal table
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
tableEnv.registerTableSource("user_profiles", userProfileSource);-- Create the main event stream table
CREATE TABLE user_events (
event_id STRING,
user_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
event_value DOUBLE,
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- Create HBase lookup table
CREATE TABLE user_profiles (
user_id STRING,
name STRING,
email STRING,
age INT,
department STRING
) WITH (
'connector' = 'hbase',
'table-name' = 'user_profiles',
'zookeeper.quorum' = 'localhost:2181'
);
-- Perform temporal join (lookup)
SELECT
e.event_id,
e.user_id,
e.event_type,
e.event_value,
u.name,
u.email,
u.department
FROM user_events e
JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.functions.TableFunction;
// Create lookup function directly
HBaseLookupFunction lookupFunction = new HBaseLookupFunction(
conf, "user_profiles", userProfileSchema);
// Register as user-defined function
tableEnv.registerFunction("lookup_user", lookupFunction);
// Use in SQL query
Table enrichedEvents = tableEnv.sqlQuery(
"SELECT " +
" e.event_id, " +
" e.user_id, " +
" e.event_type, " +
" u.name, " +
" u.email " +
"FROM events e, " +
"LATERAL TABLE(lookup_user(e.user_id)) AS u(user_id, name, email, age, department)"
);The lookup function automatically manages HBase connections and implements connection pooling for better performance:
// Configure HBase client for lookup performance
Configuration lookupConf = new Configuration();
lookupConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
// Connection pool settings
lookupConf.setInt("hbase.client.ipc.pool.size", 10); // Connection pool size
lookupConf.setInt("hbase.client.ipc.pool.type", 1); // RoundRobin pool type
// Timeout settings for lookups
lookupConf.setLong("hbase.rpc.timeout", 5000); // 5 second RPC timeout
lookupConf.setLong("hbase.client.operation.timeout", 10000); // 10 second operation timeout
// Scanner settings for better lookup performance
lookupConf.setInt("hbase.client.scanner.caching", 100); // Scanner row caching
lookupConf.setBoolean("hbase.client.scanner.async.prefetch", false); // Disable prefetch for lookups// Enable HBase client-side caching for frequently accessed data
Configuration cachedConf = new Configuration();
cachedConf.set("hbase.zookeeper.quorum", "localhost:2181");
// Enable block cache for better read performance
cachedConf.setBoolean("hbase.client.cache.enable", true);
cachedConf.setFloat("hbase.client.cache.size", 0.25f); // 25% of heap for cache
// Configure region cache
cachedConf.setInt("hbase.client.meta.cache.size", 1000); // Meta cache size
cachedConf.setLong("hbase.client.meta.cache.ttl", 60000); // 1 minute TTL// Schema for composite key lookups
HBaseTableSchema compositeKeySchema = new HBaseTableSchema();
compositeKeySchema.setRowKey("composite_key", String.class); // "userId:timestamp" format
compositeKeySchema.addColumn("data", "value", String.class);
compositeKeySchema.addColumn("data", "status", String.class);
// Custom lookup function for composite keys
public class CompositeKeyLookupFunction extends TableFunction<Row> {
private HBaseLookupFunction baseLookupFunction;
public CompositeKeyLookupFunction(Configuration conf, String tableName,
HBaseTableSchema schema) {
this.baseLookupFunction = new HBaseLookupFunction(conf, tableName, schema);
}
public void eval(String userId, Long timestamp) {
// Create composite key
String compositeKey = userId + ":" + timestamp;
// Delegate to base lookup function
baseLookupFunction.eval(compositeKey);
// Forward results
// Note: This is conceptual - actual implementation would need to handle result collection
}
}-- Lookup with conditions
SELECT
e.event_id,
e.user_id,
e.event_type,
CASE
WHEN u.user_id IS NOT NULL THEN u.name
ELSE 'Unknown User'
END as user_name,
CASE
WHEN u.age >= 18 THEN 'Adult'
ELSE 'Minor'
END as age_category
FROM user_events e
LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;// Custom lookup function with data transformation
public class TransformingLookupFunction extends TableFunction<Row> {
private HBaseLookupFunction baseLookup;
public void eval(String userId) {
// Perform base lookup
baseLookup.eval(userId);
// Transform and emit results (conceptual)
// In practice, this would involve collecting results from baseLookup
// and transforming them before emitting
}
// Transform user data
private Row transformUserData(Row originalRow) {
String name = (String) originalRow.getField(1);
Integer age = (Integer) originalRow.getField(3);
String department = (String) originalRow.getField(4);
// Add computed fields
String displayName = formatDisplayName(name);
String ageGroup = categorizeAge(age);
String departmentCode = getDepartmentCode(department);
return Row.of(
originalRow.getField(0), // user_id
displayName,
originalRow.getField(2), // email
ageGroup,
departmentCode
);
}
}// Configure retry and timeout behavior for lookups
Configuration resilientConf = new Configuration();
resilientConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
// Retry configuration
resilientConf.setInt("hbase.client.retries.number", 5); // Max 5 retries
resilientConf.setLong("hbase.client.pause", 1000); // 1 second retry pause
resilientConf.setInt("hbase.client.rpc.retry.sleep", 100); // Base retry sleep
// Circuit breaker style configuration
resilientConf.setLong("hbase.client.operation.timeout", 30000); // 30 second timeout
// Create resilient lookup function
HBaseLookupFunction resilientLookup = new HBaseLookupFunction(
resilientConf, "user_profiles", userProfileSchema);-- Handle lookup failures gracefully
SELECT
e.event_id,
e.user_id,
e.event_type,
COALESCE(u.name, 'UNKNOWN') as user_name,
COALESCE(u.email, 'no-email@domain.com') as user_email,
COALESCE(u.department, 'UNASSIGNED') as department
FROM user_events e
LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;// Custom lookup function with metrics
public class MonitoredLookupFunction extends TableFunction<Row> {
private transient Counter lookupCount;
private transient Counter lookupFailures;
private transient Histogram lookupLatency;
private HBaseLookupFunction delegate;
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
// Initialize metrics
lookupCount = context.getMetricGroup().counter("lookup_count");
lookupFailures = context.getMetricGroup().counter("lookup_failures");
lookupLatency = context.getMetricGroup().histogram("lookup_latency");
// Initialize delegate
delegate = new HBaseLookupFunction(conf, tableName, schema);
delegate.open(context);
}
public void eval(Object rowKey) {
long startTime = System.currentTimeMillis();
lookupCount.inc();
try {
delegate.eval(rowKey);
lookupLatency.update(System.currentTimeMillis() - startTime);
} catch (Exception e) {
lookupFailures.inc();
// Log error but don't fail the job
LOG.warn("Lookup failed for key: {}", rowKey, e);
// Emit empty result or default values
collect(Row.of(rowKey, null, null, null, null));
}
}
}-- Enrich transaction events with customer information
CREATE TABLE transactions (
transaction_id STRING,
customer_id STRING,
amount DECIMAL(10,2),
transaction_time TIMESTAMP(3),
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'transactions'
);
CREATE TABLE customers (
customer_id STRING,
name STRING,
tier STRING,
region STRING,
credit_limit DECIMAL(10,2)
) WITH (
'connector' = 'hbase',
'table-name' = 'customer_profiles',
'zookeeper.quorum' = 'localhost:2181'
);
-- Enriched transaction stream
SELECT
t.transaction_id,
t.customer_id,
t.amount,
c.name as customer_name,
c.tier as customer_tier,
c.region,
CASE
WHEN t.amount > c.credit_limit THEN 'OVERLIMIT'
ELSE 'NORMAL'
END as transaction_status
FROM transactions t
JOIN customers FOR SYSTEM_TIME AS OF t.proc_time AS c
ON t.customer_id = c.customer_id;-- Multiple lookup joins for complex enrichment
CREATE TABLE events (
event_id STRING,
user_id STRING,
product_id STRING,
action STRING,
event_time TIMESTAMP(3),
proc_time AS PROCTIME()
) WITH ('connector' = 'kafka', 'topic' = 'user-events');
CREATE TABLE users (
user_id STRING,
name STRING,
segment STRING
) WITH ('connector' = 'hbase', 'table-name' = 'users', 'zookeeper.quorum' = 'localhost:2181');
CREATE TABLE products (
product_id STRING,
name STRING,
category STRING,
price DECIMAL(10,2)
) WITH ('connector' = 'hbase', 'table-name' = 'products', 'zookeeper.quorum' = 'localhost:2181');
-- Multi-level enriched stream
SELECT
e.event_id,
e.action,
u.name as user_name,
u.segment as user_segment,
p.name as product_name,
p.category as product_category,
p.price as product_price
FROM events e
JOIN users FOR SYSTEM_TIME AS OF e.proc_time AS u ON e.user_id = u.user_id
JOIN products FOR SYSTEM_TIME AS OF e.proc_time AS p ON e.product_id = p.product_id;// Design schema for efficient lookups
HBaseTableSchema efficientLookupSchema = new HBaseTableSchema();
// Use meaningful row key
efficientLookupSchema.setRowKey("user_id", String.class);
// Group related data in same column family for locality
efficientLookupSchema.addColumn("profile", "name", String.class);
efficientLookupSchema.addColumn("profile", "email", String.class);
efficientLookupSchema.addColumn("profile", "department", String.class);
// Separate frequently changing data
efficientLookupSchema.addColumn("activity", "last_login", java.sql.Timestamp.class);
efficientLookupSchema.addColumn("activity", "login_count", Long.class);
// Use appropriate data types
efficientLookupSchema.addColumn("settings", "preferences", String.class); // JSON as string
efficientLookupSchema.addColumn("binary", "avatar", byte[].class); // Binary data// Implement robust error handling
public class RobustLookupFunction extends TableFunction<Row> {
private static final int MAX_RETRIES = 3;
private static final long RETRY_DELAY_MS = 1000;
public void eval(Object rowKey) {
int attempts = 0;
Exception lastException = null;
while (attempts < MAX_RETRIES) {
try {
// Perform lookup
performLookup(rowKey);
return; // Success
} catch (Exception e) {
lastException = e;
attempts++;
if (attempts < MAX_RETRIES) {
try {
Thread.sleep(RETRY_DELAY_MS * attempts); // Exponential backoff
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
// All retries failed - emit default/empty result
LOG.error("Lookup failed after {} attempts for key: {}", MAX_RETRIES, rowKey, lastException);
emitDefaultResult(rowKey);
}
private void emitDefaultResult(Object rowKey) {
// Emit row with null values for missing data
collect(Row.of(rowKey, null, null, null, null));
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hbase-2-11