or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdlookup-options.mdsink-operations.mdsource-operations.mdtable-factory.mdwrite-options.md

lookup-options.mddocs/

0

# Lookup Options and Caching

1

2

Configuration for lookup join operations with caching, retry mechanisms, and async processing options in the Apache Flink HBase 1.4 Connector.

3

4

## Capabilities

5

6

### HBaseLookupOptions

7

8

Configuration class that encapsulates all lookup-related settings for optimizing dimension table joins and caching strategies.

9

10

```java { .api }

11

/**

12

* Options for HBase lookup operations

13

* Provides configuration for caching, retries, and async processing

14

*/

15

@Internal

16

public class HBaseLookupOptions implements Serializable {

17

18

/**

19

* Returns the maximum number of entries in the lookup cache

20

* @return Maximum cache size (-1 for unlimited, 0 for disabled)

21

*/

22

public long getCacheMaxSize();

23

24

/**

25

* Returns the cache entry expiration time in milliseconds

26

* @return Time-to-live in milliseconds (0 for no expiration)

27

*/

28

public long getCacheExpireMs();

29

30

/**

31

* Returns the maximum number of retry attempts for failed lookups

32

* @return Maximum retry count (default: 3)

33

*/

34

public int getMaxRetryTimes();

35

36

/**

37

* Returns whether async lookup processing is enabled

38

* @return true if async lookups are enabled (default: false)

39

*/

40

public boolean getLookupAsync();

41

42

/**

43

* Creates a new builder for configuring lookup options

44

* @return Builder instance for fluent configuration

45

*/

46

public static Builder builder();

47

}

48

```

49

50

### HBaseLookupOptions.Builder

51

52

Builder class providing fluent API for configuring lookup options with method chaining.

53

54

```java { .api }

55

/**

56

* Builder for HBaseLookupOptions using fluent interface pattern

57

* Allows step-by-step configuration of all lookup parameters

58

*/

59

public static class Builder {

60

61

/**

62

* Sets the maximum cache size for lookup entries

63

* @param cacheMaxSize Maximum number of entries to cache (-1 for unlimited, default: -1)

64

* @return Builder instance for method chaining

65

*/

66

public Builder setCacheMaxSize(long cacheMaxSize);

67

68

/**

69

* Sets the cache entry expiration time in milliseconds

70

* @param cacheExpireMs Time-to-live in milliseconds (0 for no expiration, default: 0)

71

* @return Builder instance for method chaining

72

*/

73

public Builder setCacheExpireMs(long cacheExpireMs);

74

75

/**

76

* Sets the maximum number of retry attempts for failed lookups

77

* @param maxRetryTimes Maximum retry count (default: 3)

78

* @return Builder instance for method chaining

79

*/

80

public Builder setMaxRetryTimes(int maxRetryTimes);

81

82

/**

83

* Sets whether to enable async lookup processing

84

* @param lookupAsync true to enable async lookups (default: false)

85

* @return Builder instance for method chaining

86

*/

87

public Builder setLookupAsync(boolean lookupAsync);

88

89

/**

90

* Creates a new HBaseLookupOptions instance with configured settings

91

* @return Configured HBaseLookupOptions instance

92

*/

93

public HBaseLookupOptions build();

94

}

95

```

96

97

**Usage Example:**

98

99

```java

100

// Example: High-performance lookup configuration with caching

101

HBaseLookupOptions cachedLookup = HBaseLookupOptions.builder()

102

.setCacheMaxSize(100000) // Cache up to 100K entries

103

.setCacheExpireMs(300000) // 5 minute TTL

104

.setMaxRetryTimes(5) // 5 retry attempts

105

.setLookupAsync(true) // Enable async processing

106

.build();

107

108

// Example: Memory-efficient lookup configuration

109

HBaseLookupOptions memoryEfficient = HBaseLookupOptions.builder()

110

.setCacheMaxSize(10000) // Smaller cache size

111

.setCacheExpireMs(60000) // 1 minute TTL

112

.setMaxRetryTimes(3) // Standard retry count

113

.setLookupAsync(false) // Synchronous processing

114

.build();

115

```

116

117

## Lookup Join Operations

118

119

### Dimension Table Pattern

120

121

HBase tables are commonly used as dimension tables in stream processing applications for data enrichment through lookup joins.

122

123

```sql

124

-- Example: User activity stream enriched with user profile data

125

CREATE TABLE user_activity_stream (

126

user_id STRING,

127

activity_type STRING,

128

timestamp_val TIMESTAMP(3),

129

proc_time AS PROCTIME() -- Processing time for temporal join

130

) WITH (

131

'connector' = 'kafka',

132

'topic' = 'user-activities'

133

);

134

135

CREATE TABLE user_profiles (

136

user_id STRING,

137

profile ROW<name STRING, segment STRING, country STRING, created_date DATE>,

138

preferences ROW<language STRING, timezone STRING, notifications BOOLEAN>,

139

PRIMARY KEY (user_id) NOT ENFORCED

140

) WITH (

141

'connector' = 'hbase-1.4',

142

'table-name' = 'user_dim',

143

'zookeeper.quorum' = 'localhost:2181',

144

'lookup.cache.max-rows' = '50000',

145

'lookup.cache.ttl' = '10min',

146

'lookup.max-retries' = '3'

147

);

148

149

-- Enrichment query using temporal join

150

SELECT

151

a.user_id,

152

a.activity_type,

153

a.timestamp_val,

154

u.profile.name,

155

u.profile.segment,

156

u.preferences.language

157

FROM user_activity_stream a

158

JOIN user_profiles FOR SYSTEM_TIME AS OF a.proc_time AS u

159

ON a.user_id = u.user_id;

160

```

161

162

### Async Lookup Processing

163

164

Enable async lookup processing for improved performance with high-volume streams.

165

166

```sql

167

-- High-volume stream with async lookups

168

CREATE TABLE product_events (

169

product_id STRING,

170

event_data ROW<event_type STRING, value DOUBLE, user_id STRING>,

171

proc_time AS PROCTIME()

172

) WITH (

173

'connector' = 'kafka',

174

'topic' = 'product-events'

175

);

176

177

CREATE TABLE product_catalog (

178

product_id STRING,

179

product_info ROW<name STRING, category STRING, price DECIMAL(10,2)>,

180

inventory ROW<stock_level INT, warehouse_id STRING>,

181

PRIMARY KEY (product_id) NOT ENFORCED

182

) WITH (

183

'connector' = 'hbase-1.4',

184

'table-name' = 'products',

185

'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',

186

'lookup.async' = 'true', -- Enable async lookups

187

'lookup.cache.max-rows' = '200000', -- Large cache for products

188

'lookup.cache.ttl' = '30min', -- 30 minute cache TTL

189

'lookup.max-retries' = '5' -- Higher retry count

190

);

191

```

192

193

## Caching Strategies

194

195

### Cache Size Configuration

196

197

Configure cache size based on data volume and memory constraints.

198

199

```java

200

// Small dimension table (< 10K records)

201

HBaseLookupOptions smallDimension = HBaseLookupOptions.builder()

202

.setCacheMaxSize(15000) // Cache all records plus buffer

203

.setCacheExpireMs(3600000) // 1 hour TTL

204

.build();

205

206

// Medium dimension table (10K-100K records)

207

HBaseLookupOptions mediumDimension = HBaseLookupOptions.builder()

208

.setCacheMaxSize(50000) // Cache hot subset

209

.setCacheExpireMs(1800000) // 30 minute TTL

210

.build();

211

212

// Large dimension table (> 100K records)

213

HBaseLookupOptions largeDimension = HBaseLookupOptions.builder()

214

.setCacheMaxSize(100000) // Cache only hottest data

215

.setCacheExpireMs(600000) // 10 minute TTL

216

.build();

217

```

218

219

### Cache TTL Strategies

220

221

Configure time-to-live based on data freshness requirements.

222

223

```java

224

// Real-time data (frequent updates)

225

HBaseLookupOptions realTime = HBaseLookupOptions.builder()

226

.setCacheMaxSize(20000)

227

.setCacheExpireMs(60000) // 1 minute TTL

228

.build();

229

230

// Reference data (infrequent updates)

231

HBaseLookupOptions reference = HBaseLookupOptions.builder()

232

.setCacheMaxSize(100000)

233

.setCacheExpireMs(7200000) // 2 hour TTL

234

.build();

235

236

// Static data (rare updates)

237

HBaseLookupOptions staticData = HBaseLookupOptions.builder()

238

.setCacheMaxSize(500000)

239

.setCacheExpireMs(86400000) // 24 hour TTL

240

.build();

241

```

242

243

### Cache Eviction Policies

244

245

The lookup cache uses LRU (Least Recently Used) eviction when the maximum size is reached.

246

247

**Cache Behavior:**

248

- **Hit**: Return cached value, update access time

249

- **Miss**: Query HBase, cache result if space available

250

- **Eviction**: Remove least recently accessed entries when cache is full

251

- **Expiration**: Remove entries older than TTL regardless of access

252

253

## Error Handling and Resilience

254

255

### Retry Configuration

256

257

Configure retry behavior for handling transient HBase failures.

258

259

```java

260

// Aggressive retry for critical lookups

261

HBaseLookupOptions criticalLookups = HBaseLookupOptions.builder()

262

.setCacheMaxSize(25000)

263

.setCacheExpireMs(300000)

264

.setMaxRetryTimes(10) // High retry count

265

.setLookupAsync(true) // Async for better resilience

266

.build();

267

268

// Conservative retry for best-effort lookups

269

HBaseLookupOptions bestEffort = HBaseLookupOptions.builder()

270

.setCacheMaxSize(10000)

271

.setCacheExpireMs(120000)

272

.setMaxRetryTimes(1) // Minimal retries

273

.setLookupAsync(false) // Synchronous processing

274

.build();

275

```

276

277

### Failure Handling Strategies

278

279

Different approaches for handling lookup failures:

280

281

1. **Fail Fast**: Fail the job on lookup errors (default behavior)

282

2. **Null Result**: Return null for failed lookups (requires null-safe processing)

283

3. **Default Values**: Return configured default values for failures

284

4. **Cache Fallback**: Use stale cached values on HBase failures

285

286

```sql

287

-- Example: Graceful handling of lookup failures

288

SELECT

289

e.event_id,

290

e.user_id,

291

COALESCE(u.profile.name, 'Unknown User') as user_name,

292

COALESCE(u.profile.segment, 'default') as user_segment

293

FROM events e

294

LEFT JOIN user_dim FOR SYSTEM_TIME AS OF e.proc_time AS u

295

ON e.user_id = u.user_id;

296

```

297

298

## Performance Optimization

299

300

### Cache Hit Rate Optimization

301

302

Monitor and optimize cache hit rates for maximum performance.

303

304

**Key Metrics:**

305

- Cache hit rate (target: > 90% for most workloads)

306

- Cache size utilization (target: 70-90% of max size)

307

- Average lookup latency (target: < 10ms for cached, < 100ms for uncached)

308

309

**Optimization Strategies:**

310

311

```java

312

// Hot data optimization - smaller cache, shorter TTL

313

HBaseLookupOptions hotData = HBaseLookupOptions.builder()

314

.setCacheMaxSize(30000) // Size for working set

315

.setCacheExpireMs(300000) // 5 minute TTL

316

.setMaxRetryTimes(3)

317

.build();

318

319

// Cold data optimization - larger cache, longer TTL

320

HBaseLookupOptions coldData = HBaseLookupOptions.builder()

321

.setCacheMaxSize(100000) // Large cache

322

.setCacheExpireMs(3600000) // 1 hour TTL

323

.setMaxRetryTimes(2)

324

.build();

325

```

326

327

### Memory Management

328

329

Balance cache size with available memory to avoid OOM errors.

330

331

**Memory Estimation:**

332

- Average record size × cache max size = approximate memory usage

333

- Include overhead for cache metadata and JVM objects

334

- Reserve memory for other Flink operations

335

336

```java

337

// Memory-constrained environment

338

HBaseLookupOptions memoryConstrained = HBaseLookupOptions.builder()

339

.setCacheMaxSize(5000) // Small cache size

340

.setCacheExpireMs(900000) // 15 minute TTL

341

.setMaxRetryTimes(2)

342

.setLookupAsync(false) // Reduce memory overhead

343

.build();

344

```

345

346

## SQL Configuration Examples

347

348

### High-Performance Lookup Configuration

349

350

```sql

351

CREATE TABLE customer_dimension (

352

customer_id STRING,

353

customer_data ROW<

354

name STRING,

355

segment STRING,

356

country STRING,

357

lifetime_value DECIMAL(12,2),

358

created_date DATE

359

>,

360

preferences ROW<

361

communication_channel STRING,

362

language STRING,

363

currency STRING

364

>,

365

PRIMARY KEY (customer_id) NOT ENFORCED

366

) WITH (

367

'connector' = 'hbase-1.4',

368

'table-name' = 'customers',

369

'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',

370

'lookup.async' = 'true', -- Async processing

371

'lookup.cache.max-rows' = '100000', -- 100K cache entries

372

'lookup.cache.ttl' = '15min', -- 15 minute TTL

373

'lookup.max-retries' = '5' -- 5 retry attempts

374

);

375

```

376

377

### Memory-Efficient Lookup Configuration

378

379

```sql

380

CREATE TABLE product_categories (

381

category_id STRING,

382

category_info ROW<name STRING, parent_id STRING, level INT>,

383

PRIMARY KEY (category_id) NOT ENFORCED

384

) WITH (

385

'connector' = 'hbase-1.4',

386

'table-name' = 'categories',

387

'zookeeper.quorum' = 'localhost:2181',

388

'lookup.async' = 'false', -- Sync processing

389

'lookup.cache.max-rows' = '5000', -- Smaller cache

390

'lookup.cache.ttl' = '1h', -- 1 hour TTL

391

'lookup.max-retries' = '2' -- Fewer retries

392

);

393

```

394

395

### Real-Time Lookup Configuration

396

397

```sql

398

CREATE TABLE real_time_prices (

399

symbol STRING,

400

price_data ROW<current_price DECIMAL(10,4), last_update TIMESTAMP(3)>,

401

PRIMARY KEY (symbol) NOT ENFORCED

402

) WITH (

403

'connector' = 'hbase-1.4',

404

'table-name' = 'live_prices',

405

'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',

406

'lookup.async' = 'true', -- Async for performance

407

'lookup.cache.max-rows' = '10000', -- Cache for active symbols

408

'lookup.cache.ttl' = '30s', -- Short TTL for freshness

409

'lookup.max-retries' = '3' -- Standard retry count

410

);

411

```