0
# Lookup Options and Caching
1
2
Configuration for lookup join operations with caching, retry mechanisms, and async processing options in the Apache Flink HBase 1.4 Connector.
3
4
## Capabilities
5
6
### HBaseLookupOptions
7
8
Configuration class that encapsulates all lookup-related settings for optimizing dimension table joins and caching strategies.
9
10
```java { .api }
11
/**
12
* Options for HBase lookup operations
13
* Provides configuration for caching, retries, and async processing
14
*/
15
@Internal
16
public class HBaseLookupOptions implements Serializable {
17
18
/**
19
* Returns the maximum number of entries in the lookup cache
20
* @return Maximum cache size (-1 for unlimited, 0 for disabled)
21
*/
22
public long getCacheMaxSize();
23
24
/**
25
* Returns the cache entry expiration time in milliseconds
26
* @return Time-to-live in milliseconds (0 for no expiration)
27
*/
28
public long getCacheExpireMs();
29
30
/**
31
* Returns the maximum number of retry attempts for failed lookups
32
* @return Maximum retry count (default: 3)
33
*/
34
public int getMaxRetryTimes();
35
36
/**
37
* Returns whether async lookup processing is enabled
38
* @return true if async lookups are enabled (default: false)
39
*/
40
public boolean getLookupAsync();
41
42
/**
43
* Creates a new builder for configuring lookup options
44
* @return Builder instance for fluent configuration
45
*/
46
public static Builder builder();
47
}
48
```
49
50
### HBaseLookupOptions.Builder
51
52
Builder class providing fluent API for configuring lookup options with method chaining.
53
54
```java { .api }
55
/**
56
* Builder for HBaseLookupOptions using fluent interface pattern
57
* Allows step-by-step configuration of all lookup parameters
58
*/
59
public static class Builder {
60
61
/**
62
* Sets the maximum cache size for lookup entries
63
* @param cacheMaxSize Maximum number of entries to cache (-1 for unlimited, default: -1)
64
* @return Builder instance for method chaining
65
*/
66
public Builder setCacheMaxSize(long cacheMaxSize);
67
68
/**
69
* Sets the cache entry expiration time in milliseconds
70
* @param cacheExpireMs Time-to-live in milliseconds (0 for no expiration, default: 0)
71
* @return Builder instance for method chaining
72
*/
73
public Builder setCacheExpireMs(long cacheExpireMs);
74
75
/**
76
* Sets the maximum number of retry attempts for failed lookups
77
* @param maxRetryTimes Maximum retry count (default: 3)
78
* @return Builder instance for method chaining
79
*/
80
public Builder setMaxRetryTimes(int maxRetryTimes);
81
82
/**
83
* Sets whether to enable async lookup processing
84
* @param lookupAsync true to enable async lookups (default: false)
85
* @return Builder instance for method chaining
86
*/
87
public Builder setLookupAsync(boolean lookupAsync);
88
89
/**
90
* Creates a new HBaseLookupOptions instance with configured settings
91
* @return Configured HBaseLookupOptions instance
92
*/
93
public HBaseLookupOptions build();
94
}
95
```
96
97
**Usage Example:**
98
99
```java
100
// Example: High-performance lookup configuration with caching
101
HBaseLookupOptions cachedLookup = HBaseLookupOptions.builder()
102
.setCacheMaxSize(100000) // Cache up to 100K entries
103
.setCacheExpireMs(300000) // 5 minute TTL
104
.setMaxRetryTimes(5) // 5 retry attempts
105
.setLookupAsync(true) // Enable async processing
106
.build();
107
108
// Example: Memory-efficient lookup configuration
109
HBaseLookupOptions memoryEfficient = HBaseLookupOptions.builder()
110
.setCacheMaxSize(10000) // Smaller cache size
111
.setCacheExpireMs(60000) // 1 minute TTL
112
.setMaxRetryTimes(3) // Standard retry count
113
.setLookupAsync(false) // Synchronous processing
114
.build();
115
```
116
117
## Lookup Join Operations
118
119
### Dimension Table Pattern
120
121
HBase tables are commonly used as dimension tables in stream processing applications for data enrichment through lookup joins.
122
123
```sql
124
-- Example: User activity stream enriched with user profile data
125
CREATE TABLE user_activity_stream (
126
user_id STRING,
127
activity_type STRING,
128
timestamp_val TIMESTAMP(3),
129
proc_time AS PROCTIME() -- Processing time for temporal join
130
) WITH (
131
'connector' = 'kafka',
132
'topic' = 'user-activities'
133
);
134
135
CREATE TABLE user_profiles (
136
user_id STRING,
137
profile ROW<name STRING, segment STRING, country STRING, created_date DATE>,
138
preferences ROW<language STRING, timezone STRING, notifications BOOLEAN>,
139
PRIMARY KEY (user_id) NOT ENFORCED
140
) WITH (
141
'connector' = 'hbase-1.4',
142
'table-name' = 'user_dim',
143
'zookeeper.quorum' = 'localhost:2181',
144
'lookup.cache.max-rows' = '50000',
145
'lookup.cache.ttl' = '10min',
146
'lookup.max-retries' = '3'
147
);
148
149
-- Enrichment query using temporal join
150
SELECT
151
a.user_id,
152
a.activity_type,
153
a.timestamp_val,
154
u.profile.name,
155
u.profile.segment,
156
u.preferences.language
157
FROM user_activity_stream a
158
JOIN user_profiles FOR SYSTEM_TIME AS OF a.proc_time AS u
159
ON a.user_id = u.user_id;
160
```
161
162
### Async Lookup Processing
163
164
Enable async lookup processing for improved performance with high-volume streams.
165
166
```sql
167
-- High-volume stream with async lookups
168
CREATE TABLE product_events (
169
product_id STRING,
170
event_data ROW<event_type STRING, value DOUBLE, user_id STRING>,
171
proc_time AS PROCTIME()
172
) WITH (
173
'connector' = 'kafka',
174
'topic' = 'product-events'
175
);
176
177
CREATE TABLE product_catalog (
178
product_id STRING,
179
product_info ROW<name STRING, category STRING, price DECIMAL(10,2)>,
180
inventory ROW<stock_level INT, warehouse_id STRING>,
181
PRIMARY KEY (product_id) NOT ENFORCED
182
) WITH (
183
'connector' = 'hbase-1.4',
184
'table-name' = 'products',
185
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
186
'lookup.async' = 'true', -- Enable async lookups
187
'lookup.cache.max-rows' = '200000', -- Large cache for products
188
'lookup.cache.ttl' = '30min', -- 30 minute cache TTL
189
'lookup.max-retries' = '5' -- Higher retry count
190
);
191
```
192
193
## Caching Strategies
194
195
### Cache Size Configuration
196
197
Configure cache size based on data volume and memory constraints.
198
199
```java
200
// Small dimension table (< 10K records)
201
HBaseLookupOptions smallDimension = HBaseLookupOptions.builder()
202
.setCacheMaxSize(15000) // Cache all records plus buffer
203
.setCacheExpireMs(3600000) // 1 hour TTL
204
.build();
205
206
// Medium dimension table (10K-100K records)
207
HBaseLookupOptions mediumDimension = HBaseLookupOptions.builder()
208
.setCacheMaxSize(50000) // Cache hot subset
209
.setCacheExpireMs(1800000) // 30 minute TTL
210
.build();
211
212
// Large dimension table (> 100K records)
213
HBaseLookupOptions largeDimension = HBaseLookupOptions.builder()
214
.setCacheMaxSize(100000) // Cache only hottest data
215
.setCacheExpireMs(600000) // 10 minute TTL
216
.build();
217
```
218
219
### Cache TTL Strategies
220
221
Configure time-to-live based on data freshness requirements.
222
223
```java
224
// Real-time data (frequent updates)
225
HBaseLookupOptions realTime = HBaseLookupOptions.builder()
226
.setCacheMaxSize(20000)
227
.setCacheExpireMs(60000) // 1 minute TTL
228
.build();
229
230
// Reference data (infrequent updates)
231
HBaseLookupOptions reference = HBaseLookupOptions.builder()
232
.setCacheMaxSize(100000)
233
.setCacheExpireMs(7200000) // 2 hour TTL
234
.build();
235
236
// Static data (rare updates)
237
HBaseLookupOptions staticData = HBaseLookupOptions.builder()
238
.setCacheMaxSize(500000)
239
.setCacheExpireMs(86400000) // 24 hour TTL
240
.build();
241
```
242
243
### Cache Eviction Policies
244
245
The lookup cache uses LRU (Least Recently Used) eviction when the maximum size is reached.
246
247
**Cache Behavior:**
248
- **Hit**: Return cached value, update access time
249
- **Miss**: Query HBase, cache result if space available
250
- **Eviction**: Remove least recently accessed entries when cache is full
251
- **Expiration**: Remove entries older than TTL regardless of access
252
253
## Error Handling and Resilience
254
255
### Retry Configuration
256
257
Configure retry behavior for handling transient HBase failures.
258
259
```java
260
// Aggressive retry for critical lookups
261
HBaseLookupOptions criticalLookups = HBaseLookupOptions.builder()
262
.setCacheMaxSize(25000)
263
.setCacheExpireMs(300000)
264
.setMaxRetryTimes(10) // High retry count
265
.setLookupAsync(true) // Async for better resilience
266
.build();
267
268
// Conservative retry for best-effort lookups
269
HBaseLookupOptions bestEffort = HBaseLookupOptions.builder()
270
.setCacheMaxSize(10000)
271
.setCacheExpireMs(120000)
272
.setMaxRetryTimes(1) // Minimal retries
273
.setLookupAsync(false) // Synchronous processing
274
.build();
275
```
276
277
### Failure Handling Strategies
278
279
Different approaches for handling lookup failures:
280
281
1. **Fail Fast**: Fail the job on lookup errors (default behavior)
282
2. **Null Result**: Return null for failed lookups (requires null-safe processing)
283
3. **Default Values**: Return configured default values for failures
284
4. **Cache Fallback**: Use stale cached values on HBase failures
285
286
```sql
287
-- Example: Graceful handling of lookup failures
288
SELECT
289
e.event_id,
290
e.user_id,
291
COALESCE(u.profile.name, 'Unknown User') as user_name,
292
COALESCE(u.profile.segment, 'default') as user_segment
293
FROM events e
294
LEFT JOIN user_dim FOR SYSTEM_TIME AS OF e.proc_time AS u
295
ON e.user_id = u.user_id;
296
```
297
298
## Performance Optimization
299
300
### Cache Hit Rate Optimization
301
302
Monitor and optimize cache hit rates for maximum performance.
303
304
**Key Metrics:**
305
- Cache hit rate (target: > 90% for most workloads)
306
- Cache size utilization (target: 70-90% of max size)
307
- Average lookup latency (target: < 10ms for cached, < 100ms for uncached)
308
309
**Optimization Strategies:**
310
311
```java
312
// Hot data optimization - smaller cache, shorter TTL
313
HBaseLookupOptions hotData = HBaseLookupOptions.builder()
314
.setCacheMaxSize(30000) // Size for working set
315
.setCacheExpireMs(300000) // 5 minute TTL
316
.setMaxRetryTimes(3)
317
.build();
318
319
// Cold data optimization - larger cache, longer TTL
320
HBaseLookupOptions coldData = HBaseLookupOptions.builder()
321
.setCacheMaxSize(100000) // Large cache
322
.setCacheExpireMs(3600000) // 1 hour TTL
323
.setMaxRetryTimes(2)
324
.build();
325
```
326
327
### Memory Management
328
329
Balance cache size with available memory to avoid OOM errors.
330
331
**Memory Estimation:**
332
- Average record size × cache max size = approximate memory usage
333
- Include overhead for cache metadata and JVM objects
334
- Reserve memory for other Flink operations
335
336
```java
337
// Memory-constrained environment
338
HBaseLookupOptions memoryConstrained = HBaseLookupOptions.builder()
339
.setCacheMaxSize(5000) // Small cache size
340
.setCacheExpireMs(900000) // 15 minute TTL
341
.setMaxRetryTimes(2)
342
.setLookupAsync(false) // Reduce memory overhead
343
.build();
344
```
345
346
## SQL Configuration Examples
347
348
### High-Performance Lookup Configuration
349
350
```sql
351
CREATE TABLE customer_dimension (
352
customer_id STRING,
353
customer_data ROW<
354
name STRING,
355
segment STRING,
356
country STRING,
357
lifetime_value DECIMAL(12,2),
358
created_date DATE
359
>,
360
preferences ROW<
361
communication_channel STRING,
362
language STRING,
363
currency STRING
364
>,
365
PRIMARY KEY (customer_id) NOT ENFORCED
366
) WITH (
367
'connector' = 'hbase-1.4',
368
'table-name' = 'customers',
369
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
370
'lookup.async' = 'true', -- Async processing
371
'lookup.cache.max-rows' = '100000', -- 100K cache entries
372
'lookup.cache.ttl' = '15min', -- 15 minute TTL
373
'lookup.max-retries' = '5' -- 5 retry attempts
374
);
375
```
376
377
### Memory-Efficient Lookup Configuration
378
379
```sql
380
CREATE TABLE product_categories (
381
category_id STRING,
382
category_info ROW<name STRING, parent_id STRING, level INT>,
383
PRIMARY KEY (category_id) NOT ENFORCED
384
) WITH (
385
'connector' = 'hbase-1.4',
386
'table-name' = 'categories',
387
'zookeeper.quorum' = 'localhost:2181',
388
'lookup.async' = 'false', -- Sync processing
389
'lookup.cache.max-rows' = '5000', -- Smaller cache
390
'lookup.cache.ttl' = '1h', -- 1 hour TTL
391
'lookup.max-retries' = '2' -- Fewer retries
392
);
393
```
394
395
### Real-Time Lookup Configuration
396
397
```sql
398
CREATE TABLE real_time_prices (
399
symbol STRING,
400
price_data ROW<current_price DECIMAL(10,4), last_update TIMESTAMP(3)>,
401
PRIMARY KEY (symbol) NOT ENFORCED
402
) WITH (
403
'connector' = 'hbase-1.4',
404
'table-name' = 'live_prices',
405
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
406
'lookup.async' = 'true', -- Async for performance
407
'lookup.cache.max-rows' = '10000', -- Cache for active symbols
408
'lookup.cache.ttl' = '30s', -- Short TTL for freshness
409
'lookup.max-retries' = '3' -- Standard retry count
410
);
411
```