0
# Lookup Functions
1
2
The HBase connector provides lookup function capabilities for temporal table joins, enabling real-time enrichment of streaming data with dimension data stored in HBase. This is essential for joining fast-changing stream data with slowly-changing dimension tables.
3
4
## HBaseLookupFunction
5
6
A table function that performs lookups in HBase tables for temporal joins in Flink's Table API.
7
8
```java { .api }
9
class HBaseLookupFunction extends TableFunction<Row> {
10
public HBaseLookupFunction(Configuration configuration, String hTableName,
11
HBaseTableSchema hbaseTableSchema);
12
13
// Core lookup method
14
public void eval(Object rowKey);
15
16
// Function lifecycle
17
public void open(FunctionContext context) throws Exception;
18
public void close() throws Exception;
19
20
// Type information
21
public TypeInformation<Row> getResultType();
22
}
23
```
24
25
### Basic Lookup Usage
26
27
```java
28
import org.apache.flink.addons.hbase.HBaseLookupFunction;
29
import org.apache.flink.addons.hbase.HBaseTableSchema;
30
import org.apache.flink.addons.hbase.HBaseTableSource;
31
import org.apache.flink.table.api.EnvironmentSettings;
32
import org.apache.flink.table.api.TableEnvironment;
33
import org.apache.hadoop.conf.Configuration;
34
35
// Configure HBase connection
36
Configuration conf = new Configuration();
37
conf.set("hbase.zookeeper.quorum", "localhost:2181");
38
39
// Define dimension table schema
40
HBaseTableSchema userProfileSchema = new HBaseTableSchema();
41
userProfileSchema.setRowKey("user_id", String.class);
42
userProfileSchema.addColumn("profile", "name", String.class);
43
userProfileSchema.addColumn("profile", "email", String.class);
44
userProfileSchema.addColumn("profile", "age", Integer.class);
45
userProfileSchema.addColumn("profile", "department", String.class);
46
47
// Create table source with lookup capability
48
HBaseTableSource userProfileSource = new HBaseTableSource(conf, "user_profiles");
49
userProfileSource.setRowKey("user_id", String.class);
50
userProfileSource.addColumn("profile", "name", String.class);
51
userProfileSource.addColumn("profile", "email", String.class);
52
userProfileSource.addColumn("profile", "age", Integer.class);
53
userProfileSource.addColumn("profile", "department", String.class);
54
55
// Register as temporal table
56
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
57
tableEnv.registerTableSource("user_profiles", userProfileSource);
58
```
59
60
### Temporal Join with Lookup
61
62
```sql
63
-- Create the main event stream table
64
CREATE TABLE user_events (
65
event_id STRING,
66
user_id STRING,
67
event_type STRING,
68
event_time TIMESTAMP(3),
69
event_value DOUBLE,
70
proc_time AS PROCTIME()
71
) WITH (
72
'connector' = 'kafka',
73
'topic' = 'user-events',
74
'properties.bootstrap.servers' = 'localhost:9092',
75
'format' = 'json'
76
);
77
78
-- Create HBase lookup table
79
CREATE TABLE user_profiles (
80
user_id STRING,
81
name STRING,
82
email STRING,
83
age INT,
84
department STRING
85
) WITH (
86
'connector' = 'hbase',
87
'table-name' = 'user_profiles',
88
'zookeeper.quorum' = 'localhost:2181'
89
);
90
91
-- Perform temporal join (lookup)
92
SELECT
93
e.event_id,
94
e.user_id,
95
e.event_type,
96
e.event_value,
97
u.name,
98
u.email,
99
u.department
100
FROM user_events e
101
JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
102
ON e.user_id = u.user_id;
103
```
104
105
### Programmatic Lookup Usage
106
107
```java
108
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
109
import org.apache.flink.table.functions.TableFunction;
110
111
// Create lookup function directly
112
HBaseLookupFunction lookupFunction = new HBaseLookupFunction(
113
conf, "user_profiles", userProfileSchema);
114
115
// Register as user-defined function
116
tableEnv.registerFunction("lookup_user", lookupFunction);
117
118
// Use in SQL query
119
Table enrichedEvents = tableEnv.sqlQuery(
120
"SELECT " +
121
" e.event_id, " +
122
" e.user_id, " +
123
" e.event_type, " +
124
" u.name, " +
125
" u.email " +
126
"FROM events e, " +
127
"LATERAL TABLE(lookup_user(e.user_id)) AS u(user_id, name, email, age, department)"
128
);
129
```
130
131
## Lookup Performance Optimization
132
133
### Connection Caching
134
135
The lookup function automatically manages HBase connections and implements connection pooling for better performance:
136
137
```java
138
// Configure HBase client for lookup performance
139
Configuration lookupConf = new Configuration();
140
lookupConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
141
142
// Connection pool settings
143
lookupConf.setInt("hbase.client.ipc.pool.size", 10); // Connection pool size
144
lookupConf.setInt("hbase.client.ipc.pool.type", 1); // RoundRobin pool type
145
146
// Timeout settings for lookups
147
lookupConf.setLong("hbase.rpc.timeout", 5000); // 5 second RPC timeout
148
lookupConf.setLong("hbase.client.operation.timeout", 10000); // 10 second operation timeout
149
150
// Scanner settings for better lookup performance
151
lookupConf.setInt("hbase.client.scanner.caching", 100); // Scanner row caching
152
lookupConf.setBoolean("hbase.client.scanner.async.prefetch", false); // Disable prefetch for lookups
153
```
154
155
### Lookup Caching
156
157
```java
158
// Enable HBase client-side caching for frequently accessed data
159
Configuration cachedConf = new Configuration();
160
cachedConf.set("hbase.zookeeper.quorum", "localhost:2181");
161
162
// Enable block cache for better read performance
163
cachedConf.setBoolean("hbase.client.cache.enable", true);
164
cachedConf.setFloat("hbase.client.cache.size", 0.25f); // 25% of heap for cache
165
166
// Configure region cache
167
cachedConf.setInt("hbase.client.meta.cache.size", 1000); // Meta cache size
168
cachedConf.setLong("hbase.client.meta.cache.ttl", 60000); // 1 minute TTL
169
```
170
171
## Advanced Lookup Patterns
172
173
### Multi-Key Lookups
174
175
```java
176
// Schema for composite key lookups
177
HBaseTableSchema compositeKeySchema = new HBaseTableSchema();
178
compositeKeySchema.setRowKey("composite_key", String.class); // "userId:timestamp" format
179
compositeKeySchema.addColumn("data", "value", String.class);
180
compositeKeySchema.addColumn("data", "status", String.class);
181
182
// Custom lookup function for composite keys
183
public class CompositeKeyLookupFunction extends TableFunction<Row> {
184
private HBaseLookupFunction baseLookupFunction;
185
186
public CompositeKeyLookupFunction(Configuration conf, String tableName,
187
HBaseTableSchema schema) {
188
this.baseLookupFunction = new HBaseLookupFunction(conf, tableName, schema);
189
}
190
191
public void eval(String userId, Long timestamp) {
192
// Create composite key
193
String compositeKey = userId + ":" + timestamp;
194
195
// Delegate to base lookup function
196
baseLookupFunction.eval(compositeKey);
197
198
// Forward results
199
// Note: This is conceptual - actual implementation would need to handle result collection
200
}
201
}
202
```
203
204
### Conditional Lookups
205
206
```sql
207
-- Lookup with conditions
208
SELECT
209
e.event_id,
210
e.user_id,
211
e.event_type,
212
CASE
213
WHEN u.user_id IS NOT NULL THEN u.name
214
ELSE 'Unknown User'
215
END as user_name,
216
CASE
217
WHEN u.age >= 18 THEN 'Adult'
218
ELSE 'Minor'
219
END as age_category
220
FROM user_events e
221
LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
222
ON e.user_id = u.user_id;
223
```
224
225
### Lookup with Data Transformation
226
227
```java
228
// Custom lookup function with data transformation
229
public class TransformingLookupFunction extends TableFunction<Row> {
230
private HBaseLookupFunction baseLookup;
231
232
public void eval(String userId) {
233
// Perform base lookup
234
baseLookup.eval(userId);
235
236
// Transform and emit results (conceptual)
237
// In practice, this would involve collecting results from baseLookup
238
// and transforming them before emitting
239
}
240
241
// Transform user data
242
private Row transformUserData(Row originalRow) {
243
String name = (String) originalRow.getField(1);
244
Integer age = (Integer) originalRow.getField(3);
245
String department = (String) originalRow.getField(4);
246
247
// Add computed fields
248
String displayName = formatDisplayName(name);
249
String ageGroup = categorizeAge(age);
250
String departmentCode = getDepartmentCode(department);
251
252
return Row.of(
253
originalRow.getField(0), // user_id
254
displayName,
255
originalRow.getField(2), // email
256
ageGroup,
257
departmentCode
258
);
259
}
260
}
261
```
262
263
## Error Handling and Resilience
264
265
### Lookup Failure Handling
266
267
```java
268
// Configure retry and timeout behavior for lookups
269
Configuration resilientConf = new Configuration();
270
resilientConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
271
272
// Retry configuration
273
resilientConf.setInt("hbase.client.retries.number", 5); // Max 5 retries
274
resilientConf.setLong("hbase.client.pause", 1000); // 1 second retry pause
275
resilientConf.setInt("hbase.client.rpc.retry.sleep", 100); // Base retry sleep
276
277
// Circuit breaker style configuration
278
resilientConf.setLong("hbase.client.operation.timeout", 30000); // 30 second timeout
279
280
// Create resilient lookup function
281
HBaseLookupFunction resilientLookup = new HBaseLookupFunction(
282
resilientConf, "user_profiles", userProfileSchema);
283
```
284
285
### Graceful Degradation
286
287
```sql
288
-- Handle lookup failures gracefully
289
SELECT
290
e.event_id,
291
e.user_id,
292
e.event_type,
293
COALESCE(u.name, 'UNKNOWN') as user_name,
294
COALESCE(u.email, 'no-email@domain.com') as user_email,
295
COALESCE(u.department, 'UNASSIGNED') as department
296
FROM user_events e
297
LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
298
ON e.user_id = u.user_id;
299
```
300
301
## Monitoring and Metrics
302
303
### Lookup Performance Monitoring
304
305
```java
306
// Custom lookup function with metrics
307
public class MonitoredLookupFunction extends TableFunction<Row> {
308
private transient Counter lookupCount;
309
private transient Counter lookupFailures;
310
private transient Histogram lookupLatency;
311
private HBaseLookupFunction delegate;
312
313
@Override
314
public void open(FunctionContext context) throws Exception {
315
super.open(context);
316
317
// Initialize metrics
318
lookupCount = context.getMetricGroup().counter("lookup_count");
319
lookupFailures = context.getMetricGroup().counter("lookup_failures");
320
lookupLatency = context.getMetricGroup().histogram("lookup_latency");
321
322
// Initialize delegate
323
delegate = new HBaseLookupFunction(conf, tableName, schema);
324
delegate.open(context);
325
}
326
327
public void eval(Object rowKey) {
328
long startTime = System.currentTimeMillis();
329
lookupCount.inc();
330
331
try {
332
delegate.eval(rowKey);
333
lookupLatency.update(System.currentTimeMillis() - startTime);
334
} catch (Exception e) {
335
lookupFailures.inc();
336
// Log error but don't fail the job
337
LOG.warn("Lookup failed for key: {}", rowKey, e);
338
// Emit empty result or default values
339
collect(Row.of(rowKey, null, null, null, null));
340
}
341
}
342
}
343
```
344
345
## Lookup Join Patterns
346
347
### Dimension Table Enrichment
348
349
```sql
350
-- Enrich transaction events with customer information
351
CREATE TABLE transactions (
352
transaction_id STRING,
353
customer_id STRING,
354
amount DECIMAL(10,2),
355
transaction_time TIMESTAMP(3),
356
proc_time AS PROCTIME()
357
) WITH (
358
'connector' = 'kafka',
359
'topic' = 'transactions'
360
);
361
362
CREATE TABLE customers (
363
customer_id STRING,
364
name STRING,
365
tier STRING,
366
region STRING,
367
credit_limit DECIMAL(10,2)
368
) WITH (
369
'connector' = 'hbase',
370
'table-name' = 'customer_profiles',
371
'zookeeper.quorum' = 'localhost:2181'
372
);
373
374
-- Enriched transaction stream
375
SELECT
376
t.transaction_id,
377
t.customer_id,
378
t.amount,
379
c.name as customer_name,
380
c.tier as customer_tier,
381
c.region,
382
CASE
383
WHEN t.amount > c.credit_limit THEN 'OVERLIMIT'
384
ELSE 'NORMAL'
385
END as transaction_status
386
FROM transactions t
387
JOIN customers FOR SYSTEM_TIME AS OF t.proc_time AS c
388
ON t.customer_id = c.customer_id;
389
```
390
391
### Multi-Level Lookups
392
393
```sql
394
-- Multiple lookup joins for complex enrichment
395
CREATE TABLE events (
396
event_id STRING,
397
user_id STRING,
398
product_id STRING,
399
action STRING,
400
event_time TIMESTAMP(3),
401
proc_time AS PROCTIME()
402
) WITH ('connector' = 'kafka', 'topic' = 'user-events');
403
404
CREATE TABLE users (
405
user_id STRING,
406
name STRING,
407
segment STRING
408
) WITH ('connector' = 'hbase', 'table-name' = 'users', 'zookeeper.quorum' = 'localhost:2181');
409
410
CREATE TABLE products (
411
product_id STRING,
412
name STRING,
413
category STRING,
414
price DECIMAL(10,2)
415
) WITH ('connector' = 'hbase', 'table-name' = 'products', 'zookeeper.quorum' = 'localhost:2181');
416
417
-- Multi-level enriched stream
418
SELECT
419
e.event_id,
420
e.action,
421
u.name as user_name,
422
u.segment as user_segment,
423
p.name as product_name,
424
p.category as product_category,
425
p.price as product_price
426
FROM events e
427
JOIN users FOR SYSTEM_TIME AS OF e.proc_time AS u ON e.user_id = u.user_id
428
JOIN products FOR SYSTEM_TIME AS OF e.proc_time AS p ON e.product_id = p.product_id;
429
```
430
431
## Best Practices
432
433
### Lookup Performance
434
435
1. **Use appropriate row key design**: Ensure row keys are well-distributed for even load
436
2. **Configure proper timeouts**: Set reasonable RPC and operation timeouts
437
3. **Enable connection pooling**: Use connection pools for better resource utilization
438
4. **Monitor lookup latency**: Track lookup performance with metrics
439
5. **Consider caching**: Enable HBase client-side caching for hot data
440
441
### Schema Design for Lookups
442
443
```java
444
// Design schema for efficient lookups
445
HBaseTableSchema efficientLookupSchema = new HBaseTableSchema();
446
447
// Use meaningful row key
448
efficientLookupSchema.setRowKey("user_id", String.class);
449
450
// Group related data in same column family for locality
451
efficientLookupSchema.addColumn("profile", "name", String.class);
452
efficientLookupSchema.addColumn("profile", "email", String.class);
453
efficientLookupSchema.addColumn("profile", "department", String.class);
454
455
// Separate frequently changing data
456
efficientLookupSchema.addColumn("activity", "last_login", java.sql.Timestamp.class);
457
efficientLookupSchema.addColumn("activity", "login_count", Long.class);
458
459
// Use appropriate data types
460
efficientLookupSchema.addColumn("settings", "preferences", String.class); // JSON as string
461
efficientLookupSchema.addColumn("binary", "avatar", byte[].class); // Binary data
462
```
463
464
### Error Recovery
465
466
```java
467
// Implement robust error handling
468
public class RobustLookupFunction extends TableFunction<Row> {
469
private static final int MAX_RETRIES = 3;
470
private static final long RETRY_DELAY_MS = 1000;
471
472
public void eval(Object rowKey) {
473
int attempts = 0;
474
Exception lastException = null;
475
476
while (attempts < MAX_RETRIES) {
477
try {
478
// Perform lookup
479
performLookup(rowKey);
480
return; // Success
481
} catch (Exception e) {
482
lastException = e;
483
attempts++;
484
485
if (attempts < MAX_RETRIES) {
486
try {
487
Thread.sleep(RETRY_DELAY_MS * attempts); // Exponential backoff
488
} catch (InterruptedException ie) {
489
Thread.currentThread().interrupt();
490
break;
491
}
492
}
493
}
494
}
495
496
// All retries failed - emit default/empty result
497
LOG.error("Lookup failed after {} attempts for key: {}", MAX_RETRIES, rowKey, lastException);
498
emitDefaultResult(rowKey);
499
}
500
501
private void emitDefaultResult(Object rowKey) {
502
// Emit row with null values for missing data
503
collect(Row.of(rowKey, null, null, null, null));
504
}
505
}
506
```