0
# Lookup Joins
1
2
Specialized lookup table source for dimension table joins in streaming applications. The HiveLookupTableSource provides caching capabilities and optimized access patterns for real-time data enrichment scenarios, enabling efficient temporal joins with Hive dimension tables.
3
4
## Capabilities
5
6
### Hive Lookup Table Source
7
8
Extends the standard HiveTableSource with lookup join capabilities, providing cached access to dimension data for streaming joins.
9
10
```java { .api }
11
/**
12
* Lookup table source for dimension table joins with caching support
13
* Extends HiveTableSource with optimized lookup access patterns
14
*/
15
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
16
17
/**
18
* Creates lookup runtime provider for join operations
19
* @param lookupContext Context containing lookup configuration and key information
20
* @return LookupRuntimeProvider for executing lookup operations
21
*/
22
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext);
23
24
/**
25
* Creates a copy of this lookup table source for planning
26
* @return Deep copy of the lookup table source
27
*/
28
public DynamicTableSource copy();
29
30
/**
31
* Returns string summary of the lookup table source
32
* @return Human-readable description including cache configuration
33
*/
34
public String asSummaryString();
35
}
36
```
37
38
### Lookup Runtime Provider
39
40
Runtime component that executes actual lookup operations with caching and optimization.
41
42
```java { .api }
43
/**
44
* Runtime provider for executing lookup operations
45
* Handles caching, key serialization, and result retrieval
46
*/
47
public interface LookupRuntimeProvider {
48
49
/**
50
* Performs synchronous lookup for the given key
51
* @param keyRow Row containing lookup key values
52
* @return Collection of matching rows from dimension table
53
*/
54
public Collection<RowData> lookup(RowData keyRow);
55
56
/**
57
* Performs asynchronous lookup for the given key
58
* @param keyRow Row containing lookup key values
59
* @return CompletableFuture containing matching rows
60
*/
61
public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);
62
}
63
```
64
65
## Usage Patterns
66
67
### Temporal Join Configuration
68
69
```sql
70
-- Create dimension table with lookup configuration
71
CREATE TABLE customer_dim (
72
customer_id BIGINT PRIMARY KEY NOT ENFORCED,
73
customer_name STRING,
74
customer_tier STRING,
75
registration_date DATE,
76
last_updated TIMESTAMP(3)
77
) WITH (
78
'connector' = 'hive',
79
80
-- Lookup join caching configuration
81
'lookup.join.cache.ttl' = '1 hour', -- Cache entries for 1 hour
82
'lookup.join.cache.max-size' = '10000', -- Maximum cache entries
83
84
-- Performance optimization
85
'table.exec.hive.infer-source-parallelism' = 'false', -- Use single task for lookup
86
'table.exec.hive.split-max-size' = '64MB' -- Smaller splits for lookup
87
);
88
89
-- Create streaming fact table
90
CREATE TABLE orders_stream (
91
order_id BIGINT,
92
customer_id BIGINT,
93
order_amount DECIMAL(10,2),
94
order_time TIMESTAMP(3),
95
proc_time AS PROCTIME() -- Processing time for temporal join
96
) WITH (
97
'connector' = 'kafka',
98
-- Kafka configuration...
99
);
100
101
-- Perform temporal join to enrich streaming data
102
SELECT
103
o.order_id,
104
o.order_amount,
105
o.order_time,
106
c.customer_name,
107
c.customer_tier,
108
-- Calculate discount based on customer tier
109
CASE
110
WHEN c.customer_tier = 'GOLD' THEN o.order_amount * 0.1
111
WHEN c.customer_tier = 'SILVER' THEN o.order_amount * 0.05
112
ELSE 0
113
END as discount_amount
114
FROM orders_stream o
115
JOIN customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
116
ON o.customer_id = c.customer_id;
117
```
118
119
### Advanced Lookup Join Scenarios
120
121
#### Multi-Key Lookups
122
123
```sql
124
-- Dimension table with composite key
125
CREATE TABLE product_inventory (
126
warehouse_id STRING,
127
product_id STRING,
128
available_quantity INT,
129
last_updated TIMESTAMP(3),
130
PRIMARY KEY (warehouse_id, product_id) NOT ENFORCED
131
) WITH (
132
'connector' = 'hive',
133
'lookup.join.cache.ttl' = '30 min',
134
'lookup.join.cache.max-size' = '50000'
135
);
136
137
-- Join with multiple lookup keys
138
SELECT
139
o.order_id,
140
o.product_id,
141
o.requested_quantity,
142
i.available_quantity,
143
CASE
144
WHEN i.available_quantity >= o.requested_quantity
145
THEN 'AVAILABLE'
146
ELSE 'BACKORDER'
147
END as fulfillment_status
148
FROM order_items_stream o
149
JOIN product_inventory FOR SYSTEM_TIME AS OF o.proc_time AS i
150
ON o.warehouse_id = i.warehouse_id
151
AND o.product_id = i.product_id;
152
```
153
154
#### Null Handling in Lookups
155
156
```sql
157
-- Handle missing dimension data gracefully
158
SELECT
159
o.order_id,
160
o.customer_id,
161
o.order_amount,
162
COALESCE(c.customer_name, 'UNKNOWN') as customer_name,
163
COALESCE(c.customer_tier, 'STANDARD') as customer_tier,
164
-- Apply default discount for unknown customers
165
CASE
166
WHEN c.customer_id IS NOT NULL AND c.customer_tier = 'GOLD'
167
THEN o.order_amount * 0.1
168
ELSE 0
169
END as discount_amount
170
FROM orders_stream o
171
LEFT JOIN customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
172
ON o.customer_id = c.customer_id;
173
```
174
175
### Performance Optimization
176
177
#### Cache Configuration Tuning
178
179
```java
180
// Programmatic cache configuration
181
Map<String, String> tableOptions = new HashMap<>();
182
183
// High-frequency lookups with stable dimension data
184
tableOptions.put("lookup.join.cache.ttl", "2 hours");
185
tableOptions.put("lookup.join.cache.max-size", "100000");
186
187
// Fast-changing dimension data
188
tableOptions.put("lookup.join.cache.ttl", "5 minutes");
189
tableOptions.put("lookup.join.cache.max-size", "5000");
190
191
// Memory-constrained environments
192
tableOptions.put("lookup.join.cache.ttl", "30 minutes");
193
tableOptions.put("lookup.join.cache.max-size", "1000");
194
```
195
196
#### Partitioning for Lookup Performance
197
198
```sql
199
-- Partition dimension table for better lookup performance
200
CREATE TABLE regional_customer_dim (
201
customer_id BIGINT,
202
customer_name STRING,
203
customer_tier STRING,
204
region STRING
205
) PARTITIONED BY (region)
206
WITH (
207
'connector' = 'hive',
208
'lookup.join.cache.ttl' = '1 hour',
209
210
-- Optimize for partitioned lookups
211
'table.exec.hive.read-partition-with-subdirectory.enabled' = 'true'
212
);
213
214
-- Use partition pruning in lookup joins
215
SELECT
216
o.order_id,
217
o.customer_id,
218
o.customer_region,
219
c.customer_name,
220
c.customer_tier
221
FROM orders_stream o
222
JOIN regional_customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
223
ON o.customer_id = c.customer_id
224
AND o.customer_region = c.region; -- Partition pruning
225
```
226
227
### Monitoring and Debugging
228
229
#### Cache Metrics
230
231
```java
232
// Access lookup cache metrics (requires custom metric collection)
233
public class LookupCacheMetrics {
234
private Counter cacheHits;
235
private Counter cacheMisses;
236
private Gauge cacheSize;
237
private Timer lookupLatency;
238
239
public void recordCacheHit() {
240
cacheHits.inc();
241
}
242
243
public void recordCacheMiss() {
244
cacheMisses.inc();
245
}
246
247
public void recordLookupLatency(long latencyMs) {
248
lookupLatency.update(latencyMs, TimeUnit.MILLISECONDS);
249
}
250
}
251
```
252
253
#### Troubleshooting Common Issues
254
255
**High Cache Miss Rate:**
256
```sql
257
-- Problem: Cache TTL too short for stable dimension data
258
-- Solution: Increase cache TTL
259
CREATE TABLE stable_dim (...) WITH (
260
'lookup.join.cache.ttl' = '4 hours', -- Increased from 1 hour
261
'lookup.join.cache.max-size' = '20000'
262
);
263
```
264
265
**Memory Pressure from Cache:**
266
```sql
267
-- Problem: Lookup cache consuming too much memory
268
-- Solution: Reduce cache size and optimize TTL
269
CREATE TABLE large_dim (...) WITH (
270
'lookup.join.cache.ttl' = '30 min', -- Reduced TTL
271
'lookup.join.cache.max-size' = '5000', -- Reduced size
272
);
273
```
274
275
**Slow Lookup Performance:**
276
```sql
277
-- Problem: Lookups taking too long due to large dimension table
278
-- Solution: Optimize table structure and add indices in Hive
279
-- In Hive:
280
-- CREATE INDEX customer_idx ON TABLE customer_dim (customer_id)
281
-- AS 'COMPACT' WITH DEFERRED REBUILD;
282
-- ALTER INDEX customer_idx ON customer_dim REBUILD;
283
284
-- In Flink, ensure proper key selection:
285
CREATE TABLE optimized_dim (
286
customer_id BIGINT PRIMARY KEY NOT ENFORCED, -- Explicit primary key
287
customer_data STRING
288
) WITH (
289
'connector' = 'hive',
290
'lookup.join.cache.ttl' = '1 hour'
291
);
292
```
293
294
### Async vs Sync Lookup
295
296
```java
297
// Configure async lookup for better throughput
298
TableEnvironment tableEnv = TableEnvironment.create(settings);
299
300
// Async lookup configuration
301
Configuration config = tableEnv.getConfig().getConfiguration();
302
config.setString("table.exec.async-lookup.buffer-capacity", "1000");
303
config.setString("table.exec.async-lookup.timeout", "3min");
304
```
305
306
```sql
307
-- Use async lookup hint for high-throughput scenarios
308
SELECT /*+ LOOKUP('table'='customer_dim', 'async'='true') */
309
o.order_id,
310
c.customer_name
311
FROM orders_stream o
312
JOIN customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
313
ON o.customer_id = c.customer_id;
314
```
315
316
### Best Practices
317
318
#### Design Guidelines
319
320
1. **Key Selection**: Use selective keys that minimize lookup result sets
321
2. **Cache Sizing**: Size cache based on working set, not total dimension size
322
3. **TTL Configuration**: Balance freshness requirements with performance
323
4. **Partitioning**: Partition large dimension tables for better pruning
324
5. **Schema Design**: Include timestamp columns for temporal validity checking
325
326
#### Operational Considerations
327
328
1. **Monitoring**: Track cache hit rates and lookup latencies
329
2. **Resource Planning**: Account for cache memory in task manager sizing
330
3. **Data Freshness**: Coordinate dimension updates with cache TTL settings
331
4. **Fault Tolerance**: Design for graceful degradation when lookups fail
332
5. **Testing**: Validate lookup behavior with realistic data volumes and patterns