or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdconfiguration-management.mdfactory-registration.mdfunction-module.mdindex.mdlookup-joins.mdtable-sources-sinks.md

lookup-joins.mddocs/

0

# Lookup Joins

1

2

Specialized lookup table source for dimension table joins in streaming applications. The HiveLookupTableSource provides caching capabilities and optimized access patterns for real-time data enrichment scenarios, enabling efficient temporal joins with Hive dimension tables.

3

4

## Capabilities

5

6

### Hive Lookup Table Source

7

8

Extends the standard HiveTableSource with lookup join capabilities, providing cached access to dimension data for streaming joins.

9

10

```java { .api }

11

/**

12

* Lookup table source for dimension table joins with caching support

13

* Extends HiveTableSource with optimized lookup access patterns

14

*/

15

public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {

16

17

/**

18

* Creates lookup runtime provider for join operations

19

* @param lookupContext Context containing lookup configuration and key information

20

* @return LookupRuntimeProvider for executing lookup operations

21

*/

22

public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext);

23

24

/**

25

* Creates a copy of this lookup table source for planning

26

* @return Deep copy of the lookup table source

27

*/

28

public DynamicTableSource copy();

29

30

/**

31

* Returns string summary of the lookup table source

32

* @return Human-readable description including cache configuration

33

*/

34

public String asSummaryString();

35

}

36

```

37

38

### Lookup Runtime Provider

39

40

Runtime component that executes actual lookup operations with caching and optimization.

41

42

```java { .api }

43

/**

44

* Runtime provider for executing lookup operations

45

* Handles caching, key serialization, and result retrieval

46

*/

47

public interface LookupRuntimeProvider {

48

49

/**

50

* Performs synchronous lookup for the given key

51

* @param keyRow Row containing lookup key values

52

* @return Collection of matching rows from dimension table

53

*/

54

public Collection<RowData> lookup(RowData keyRow);

55

56

/**

57

* Performs asynchronous lookup for the given key

58

* @param keyRow Row containing lookup key values

59

* @return CompletableFuture containing matching rows

60

*/

61

public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);

62

}

63

```

64

65

## Usage Patterns

66

67

### Temporal Join Configuration

68

69

```sql

70

-- Create dimension table with lookup configuration

71

CREATE TABLE customer_dim (

72

customer_id BIGINT PRIMARY KEY NOT ENFORCED,

73

customer_name STRING,

74

customer_tier STRING,

75

registration_date DATE,

76

last_updated TIMESTAMP(3)

77

) WITH (

78

'connector' = 'hive',

79

80

-- Lookup join caching configuration

81

'lookup.join.cache.ttl' = '1 hour', -- Cache entries for 1 hour

82

'lookup.join.cache.max-size' = '10000', -- Maximum cache entries

83

84

-- Performance optimization

85

'table.exec.hive.infer-source-parallelism' = 'false', -- Use single task for lookup

86

'table.exec.hive.split-max-size' = '64MB' -- Smaller splits for lookup

87

);

88

89

-- Create streaming fact table

90

CREATE TABLE orders_stream (

91

order_id BIGINT,

92

customer_id BIGINT,

93

order_amount DECIMAL(10,2),

94

order_time TIMESTAMP(3),

95

proc_time AS PROCTIME() -- Processing time for temporal join

96

) WITH (

97

'connector' = 'kafka',

98

-- Kafka configuration...

99

);

100

101

-- Perform temporal join to enrich streaming data

102

SELECT

103

o.order_id,

104

o.order_amount,

105

o.order_time,

106

c.customer_name,

107

c.customer_tier,

108

-- Calculate discount based on customer tier

109

CASE

110

WHEN c.customer_tier = 'GOLD' THEN o.order_amount * 0.1

111

WHEN c.customer_tier = 'SILVER' THEN o.order_amount * 0.05

112

ELSE 0

113

END as discount_amount

114

FROM orders_stream o

115

JOIN customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c

116

ON o.customer_id = c.customer_id;

117

```

118

119

### Advanced Lookup Join Scenarios

120

121

#### Multi-Key Lookups

122

123

```sql

124

-- Dimension table with composite key

125

CREATE TABLE product_inventory (

126

warehouse_id STRING,

127

product_id STRING,

128

available_quantity INT,

129

last_updated TIMESTAMP(3),

130

PRIMARY KEY (warehouse_id, product_id) NOT ENFORCED

131

) WITH (

132

'connector' = 'hive',

133

'lookup.join.cache.ttl' = '30 min',

134

'lookup.join.cache.max-size' = '50000'

135

);

136

137

-- Join with multiple lookup keys

138

SELECT

139

o.order_id,

140

o.product_id,

141

o.requested_quantity,

142

i.available_quantity,

143

CASE

144

WHEN i.available_quantity >= o.requested_quantity

145

THEN 'AVAILABLE'

146

ELSE 'BACKORDER'

147

END as fulfillment_status

148

FROM order_items_stream o

149

JOIN product_inventory FOR SYSTEM_TIME AS OF o.proc_time AS i

150

ON o.warehouse_id = i.warehouse_id

151

AND o.product_id = i.product_id;

152

```

153

154

#### Null Handling in Lookups

155

156

```sql

157

-- Handle missing dimension data gracefully

158

SELECT

159

o.order_id,

160

o.customer_id,

161

o.order_amount,

162

COALESCE(c.customer_name, 'UNKNOWN') as customer_name,

163

COALESCE(c.customer_tier, 'STANDARD') as customer_tier,

164

-- Apply default discount for unknown customers

165

CASE

166

WHEN c.customer_id IS NOT NULL AND c.customer_tier = 'GOLD'

167

THEN o.order_amount * 0.1

168

ELSE 0

169

END as discount_amount

170

FROM orders_stream o

171

LEFT JOIN customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c

172

ON o.customer_id = c.customer_id;

173

```

174

175

### Performance Optimization

176

177

#### Cache Configuration Tuning

178

179

```java

180

// Programmatic cache configuration

181

Map<String, String> tableOptions = new HashMap<>();

182

183

// High-frequency lookups with stable dimension data

184

tableOptions.put("lookup.join.cache.ttl", "2 hours");

185

tableOptions.put("lookup.join.cache.max-size", "100000");

186

187

// Fast-changing dimension data

188

tableOptions.put("lookup.join.cache.ttl", "5 minutes");

189

tableOptions.put("lookup.join.cache.max-size", "5000");

190

191

// Memory-constrained environments

192

tableOptions.put("lookup.join.cache.ttl", "30 minutes");

193

tableOptions.put("lookup.join.cache.max-size", "1000");

194

```

195

196

#### Partitioning for Lookup Performance

197

198

```sql

199

-- Partition dimension table for better lookup performance

200

CREATE TABLE regional_customer_dim (

201

customer_id BIGINT,

202

customer_name STRING,

203

customer_tier STRING,

204

region STRING

205

) PARTITIONED BY (region)

206

WITH (

207

'connector' = 'hive',

208

'lookup.join.cache.ttl' = '1 hour',

209

210

-- Optimize for partitioned lookups

211

'table.exec.hive.read-partition-with-subdirectory.enabled' = 'true'

212

);

213

214

-- Use partition pruning in lookup joins

215

SELECT

216

o.order_id,

217

o.customer_id,

218

o.customer_region,

219

c.customer_name,

220

c.customer_tier

221

FROM orders_stream o

222

JOIN regional_customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c

223

ON o.customer_id = c.customer_id

224

AND o.customer_region = c.region; -- Partition pruning

225

```

226

227

### Monitoring and Debugging

228

229

#### Cache Metrics

230

231

```java

232

// Access lookup cache metrics (requires custom metric collection)

233

public class LookupCacheMetrics {

234

private Counter cacheHits;

235

private Counter cacheMisses;

236

private Gauge cacheSize;

237

private Timer lookupLatency;

238

239

public void recordCacheHit() {

240

cacheHits.inc();

241

}

242

243

public void recordCacheMiss() {

244

cacheMisses.inc();

245

}

246

247

public void recordLookupLatency(long latencyMs) {

248

lookupLatency.update(latencyMs, TimeUnit.MILLISECONDS);

249

}

250

}

251

```

252

253

#### Troubleshooting Common Issues

254

255

**High Cache Miss Rate:**

256

```sql

257

-- Problem: Cache TTL too short for stable dimension data

258

-- Solution: Increase cache TTL

259

CREATE TABLE stable_dim (...) WITH (

260

'lookup.join.cache.ttl' = '4 hours', -- Increased from 1 hour

261

'lookup.join.cache.max-size' = '20000'

262

);

263

```

264

265

**Memory Pressure from Cache:**

266

```sql

267

-- Problem: Lookup cache consuming too much memory

268

-- Solution: Reduce cache size and optimize TTL

269

CREATE TABLE large_dim (...) WITH (

270

'lookup.join.cache.ttl' = '30 min', -- Reduced TTL

271

'lookup.join.cache.max-size' = '5000', -- Reduced size

272

);

273

```

274

275

**Slow Lookup Performance:**

276

```sql

277

-- Problem: Lookups taking too long due to large dimension table

278

-- Solution: Optimize table structure and add indices in Hive

279

-- In Hive:

280

-- CREATE INDEX customer_idx ON TABLE customer_dim (customer_id)

281

-- AS 'COMPACT' WITH DEFERRED REBUILD;

282

-- ALTER INDEX customer_idx ON customer_dim REBUILD;

283

284

-- In Flink, ensure proper key selection:

285

CREATE TABLE optimized_dim (

286

customer_id BIGINT PRIMARY KEY NOT ENFORCED, -- Explicit primary key

287

customer_data STRING

288

) WITH (

289

'connector' = 'hive',

290

'lookup.join.cache.ttl' = '1 hour'

291

);

292

```

293

294

### Async vs Sync Lookup

295

296

```java

297

// Configure async lookup for better throughput

298

TableEnvironment tableEnv = TableEnvironment.create(settings);

299

300

// Async lookup configuration

301

Configuration config = tableEnv.getConfig().getConfiguration();

302

config.setString("table.exec.async-lookup.buffer-capacity", "1000");

303

config.setString("table.exec.async-lookup.timeout", "3min");

304

```

305

306

```sql

307

-- Use async lookup hint for high-throughput scenarios

308

SELECT /*+ LOOKUP('table'='customer_dim', 'async'='true') */

309

o.order_id,

310

c.customer_name

311

FROM orders_stream o

312

JOIN customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c

313

ON o.customer_id = c.customer_id;

314

```

315

316

### Best Practices

317

318

#### Design Guidelines

319

320

1. **Key Selection**: Use selective keys that minimize lookup result sets

321

2. **Cache Sizing**: Size cache based on working set, not total dimension size

322

3. **TTL Configuration**: Balance freshness requirements with performance

323

4. **Partitioning**: Partition large dimension tables for better pruning

324

5. **Schema Design**: Include timestamp columns for temporal validity checking

325

326

#### Operational Considerations

327

328

1. **Monitoring**: Track cache hit rates and lookup latencies

329

2. **Resource Planning**: Account for cache memory in task manager sizing

330

3. **Data Freshness**: Coordinate dimension updates with cache TTL settings

331

4. **Fault Tolerance**: Design for graceful degradation when lookups fail

332

5. **Testing**: Validate lookup behavior with realistic data volumes and patterns