or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md

lookup-functions.mddocs/

0

# Lookup Functions

1

2

The HBase connector provides lookup function capabilities for temporal table joins, enabling real-time enrichment of streaming data with dimension data stored in HBase. This is essential for joining fast-changing stream data with slowly-changing dimension tables.

3

4

## HBaseLookupFunction

5

6

A table function that performs lookups in HBase tables for temporal joins in Flink's Table API.

7

8

```java { .api }

9

class HBaseLookupFunction extends TableFunction<Row> {

10

public HBaseLookupFunction(Configuration configuration, String hTableName,

11

HBaseTableSchema hbaseTableSchema);

12

13

// Core lookup method

14

public void eval(Object rowKey);

15

16

// Function lifecycle

17

public void open(FunctionContext context) throws Exception;

18

public void close() throws Exception;

19

20

// Type information

21

public TypeInformation<Row> getResultType();

22

}

23

```

24

25

### Basic Lookup Usage

26

27

```java

28

import org.apache.flink.addons.hbase.HBaseLookupFunction;

29

import org.apache.flink.addons.hbase.HBaseTableSchema;

30

import org.apache.flink.addons.hbase.HBaseTableSource;

31

import org.apache.flink.table.api.EnvironmentSettings;

32

import org.apache.flink.table.api.TableEnvironment;

33

import org.apache.hadoop.conf.Configuration;

34

35

// Configure HBase connection

36

Configuration conf = new Configuration();

37

conf.set("hbase.zookeeper.quorum", "localhost:2181");

38

39

// Define dimension table schema

40

HBaseTableSchema userProfileSchema = new HBaseTableSchema();

41

userProfileSchema.setRowKey("user_id", String.class);

42

userProfileSchema.addColumn("profile", "name", String.class);

43

userProfileSchema.addColumn("profile", "email", String.class);

44

userProfileSchema.addColumn("profile", "age", Integer.class);

45

userProfileSchema.addColumn("profile", "department", String.class);

46

47

// Create table source with lookup capability

48

HBaseTableSource userProfileSource = new HBaseTableSource(conf, "user_profiles");

49

userProfileSource.setRowKey("user_id", String.class);

50

userProfileSource.addColumn("profile", "name", String.class);

51

userProfileSource.addColumn("profile", "email", String.class);

52

userProfileSource.addColumn("profile", "age", Integer.class);

53

userProfileSource.addColumn("profile", "department", String.class);

54

55

// Register as temporal table

56

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());

57

tableEnv.registerTableSource("user_profiles", userProfileSource);

58

```

59

60

### Temporal Join with Lookup

61

62

```sql

63

-- Create the main event stream table

64

CREATE TABLE user_events (

65

event_id STRING,

66

user_id STRING,

67

event_type STRING,

68

event_time TIMESTAMP(3),

69

event_value DOUBLE,

70

proc_time AS PROCTIME()

71

) WITH (

72

'connector' = 'kafka',

73

'topic' = 'user-events',

74

'properties.bootstrap.servers' = 'localhost:9092',

75

'format' = 'json'

76

);

77

78

-- Create HBase lookup table

79

CREATE TABLE user_profiles (

80

user_id STRING,

81

name STRING,

82

email STRING,

83

age INT,

84

department STRING

85

) WITH (

86

'connector' = 'hbase',

87

'table-name' = 'user_profiles',

88

'zookeeper.quorum' = 'localhost:2181'

89

);

90

91

-- Perform temporal join (lookup)

92

SELECT

93

e.event_id,

94

e.user_id,

95

e.event_type,

96

e.event_value,

97

u.name,

98

u.email,

99

u.department

100

FROM user_events e

101

JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u

102

ON e.user_id = u.user_id;

103

```

104

105

### Programmatic Lookup Usage

106

107

```java

108

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

109

import org.apache.flink.table.functions.TableFunction;

110

111

// Create lookup function directly

112

HBaseLookupFunction lookupFunction = new HBaseLookupFunction(

113

conf, "user_profiles", userProfileSchema);

114

115

// Register as user-defined function

116

tableEnv.registerFunction("lookup_user", lookupFunction);

117

118

// Use in SQL query

119

Table enrichedEvents = tableEnv.sqlQuery(

120

"SELECT " +

121

" e.event_id, " +

122

" e.user_id, " +

123

" e.event_type, " +

124

" u.name, " +

125

" u.email " +

126

"FROM events e, " +

127

"LATERAL TABLE(lookup_user(e.user_id)) AS u(user_id, name, email, age, department)"

128

);

129

```

130

131

## Lookup Performance Optimization

132

133

### Connection Caching

134

135

The lookup function automatically manages HBase connections and implements connection pooling for better performance:

136

137

```java

138

// Configure HBase client for lookup performance

139

Configuration lookupConf = new Configuration();

140

lookupConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");

141

142

// Connection pool settings

143

lookupConf.setInt("hbase.client.ipc.pool.size", 10); // Connection pool size

144

lookupConf.setInt("hbase.client.ipc.pool.type", 1); // RoundRobin pool type

145

146

// Timeout settings for lookups

147

lookupConf.setLong("hbase.rpc.timeout", 5000); // 5 second RPC timeout

148

lookupConf.setLong("hbase.client.operation.timeout", 10000); // 10 second operation timeout

149

150

// Scanner settings for better lookup performance

151

lookupConf.setInt("hbase.client.scanner.caching", 100); // Scanner row caching

152

lookupConf.setBoolean("hbase.client.scanner.async.prefetch", false); // Disable prefetch for lookups

153

```

154

155

### Lookup Caching

156

157

```java

158

// Enable HBase client-side caching for frequently accessed data

159

Configuration cachedConf = new Configuration();

160

cachedConf.set("hbase.zookeeper.quorum", "localhost:2181");

161

162

// Enable block cache for better read performance

163

cachedConf.setBoolean("hbase.client.cache.enable", true);

164

cachedConf.setFloat("hbase.client.cache.size", 0.25f); // 25% of heap for cache

165

166

// Configure region cache

167

cachedConf.setInt("hbase.client.meta.cache.size", 1000); // Meta cache size

168

cachedConf.setLong("hbase.client.meta.cache.ttl", 60000); // 1 minute TTL

169

```

170

171

## Advanced Lookup Patterns

172

173

### Multi-Key Lookups

174

175

```java

176

// Schema for composite key lookups

177

HBaseTableSchema compositeKeySchema = new HBaseTableSchema();

178

compositeKeySchema.setRowKey("composite_key", String.class); // "userId:timestamp" format

179

compositeKeySchema.addColumn("data", "value", String.class);

180

compositeKeySchema.addColumn("data", "status", String.class);

181

182

// Custom lookup function for composite keys

183

public class CompositeKeyLookupFunction extends TableFunction<Row> {

184

private HBaseLookupFunction baseLookupFunction;

185

186

public CompositeKeyLookupFunction(Configuration conf, String tableName,

187

HBaseTableSchema schema) {

188

this.baseLookupFunction = new HBaseLookupFunction(conf, tableName, schema);

189

}

190

191

public void eval(String userId, Long timestamp) {

192

// Create composite key

193

String compositeKey = userId + ":" + timestamp;

194

195

// Delegate to base lookup function

196

baseLookupFunction.eval(compositeKey);

197

198

// Forward results

199

// Note: This is conceptual - actual implementation would need to handle result collection

200

}

201

}

202

```

203

204

### Conditional Lookups

205

206

```sql

207

-- Lookup with conditions

208

SELECT

209

e.event_id,

210

e.user_id,

211

e.event_type,

212

CASE

213

WHEN u.user_id IS NOT NULL THEN u.name

214

ELSE 'Unknown User'

215

END as user_name,

216

CASE

217

WHEN u.age >= 18 THEN 'Adult'

218

ELSE 'Minor'

219

END as age_category

220

FROM user_events e

221

LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u

222

ON e.user_id = u.user_id;

223

```

224

225

### Lookup with Data Transformation

226

227

```java

228

// Custom lookup function with data transformation

229

public class TransformingLookupFunction extends TableFunction<Row> {

230

private HBaseLookupFunction baseLookup;

231

232

public void eval(String userId) {

233

// Perform base lookup

234

baseLookup.eval(userId);

235

236

// Transform and emit results (conceptual)

237

// In practice, this would involve collecting results from baseLookup

238

// and transforming them before emitting

239

}

240

241

// Transform user data

242

private Row transformUserData(Row originalRow) {

243

String name = (String) originalRow.getField(1);

244

Integer age = (Integer) originalRow.getField(3);

245

String department = (String) originalRow.getField(4);

246

247

// Add computed fields

248

String displayName = formatDisplayName(name);

249

String ageGroup = categorizeAge(age);

250

String departmentCode = getDepartmentCode(department);

251

252

return Row.of(

253

originalRow.getField(0), // user_id

254

displayName,

255

originalRow.getField(2), // email

256

ageGroup,

257

departmentCode

258

);

259

}

260

}

261

```

262

263

## Error Handling and Resilience

264

265

### Lookup Failure Handling

266

267

```java

268

// Configure retry and timeout behavior for lookups

269

Configuration resilientConf = new Configuration();

270

resilientConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");

271

272

// Retry configuration

273

resilientConf.setInt("hbase.client.retries.number", 5); // Max 5 retries

274

resilientConf.setLong("hbase.client.pause", 1000); // 1 second retry pause

275

resilientConf.setInt("hbase.client.rpc.retry.sleep", 100); // Base retry sleep

276

277

// Circuit breaker style configuration

278

resilientConf.setLong("hbase.client.operation.timeout", 30000); // 30 second timeout

279

280

// Create resilient lookup function

281

HBaseLookupFunction resilientLookup = new HBaseLookupFunction(

282

resilientConf, "user_profiles", userProfileSchema);

283

```

284

285

### Graceful Degradation

286

287

```sql

288

-- Handle lookup failures gracefully

289

SELECT

290

e.event_id,

291

e.user_id,

292

e.event_type,

293

COALESCE(u.name, 'UNKNOWN') as user_name,

294

COALESCE(u.email, 'no-email@domain.com') as user_email,

295

COALESCE(u.department, 'UNASSIGNED') as department

296

FROM user_events e

297

LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u

298

ON e.user_id = u.user_id;

299

```

300

301

## Monitoring and Metrics

302

303

### Lookup Performance Monitoring

304

305

```java

306

// Custom lookup function with metrics

307

public class MonitoredLookupFunction extends TableFunction<Row> {

308

private transient Counter lookupCount;

309

private transient Counter lookupFailures;

310

private transient Histogram lookupLatency;

311

private HBaseLookupFunction delegate;

312

313

@Override

314

public void open(FunctionContext context) throws Exception {

315

super.open(context);

316

317

// Initialize metrics

318

lookupCount = context.getMetricGroup().counter("lookup_count");

319

lookupFailures = context.getMetricGroup().counter("lookup_failures");

320

lookupLatency = context.getMetricGroup().histogram("lookup_latency");

321

322

// Initialize delegate

323

delegate = new HBaseLookupFunction(conf, tableName, schema);

324

delegate.open(context);

325

}

326

327

public void eval(Object rowKey) {

328

long startTime = System.currentTimeMillis();

329

lookupCount.inc();

330

331

try {

332

delegate.eval(rowKey);

333

lookupLatency.update(System.currentTimeMillis() - startTime);

334

} catch (Exception e) {

335

lookupFailures.inc();

336

// Log error but don't fail the job

337

LOG.warn("Lookup failed for key: {}", rowKey, e);

338

// Emit empty result or default values

339

collect(Row.of(rowKey, null, null, null, null));

340

}

341

}

342

}

343

```

344

345

## Lookup Join Patterns

346

347

### Dimension Table Enrichment

348

349

```sql

350

-- Enrich transaction events with customer information

351

CREATE TABLE transactions (

352

transaction_id STRING,

353

customer_id STRING,

354

amount DECIMAL(10,2),

355

transaction_time TIMESTAMP(3),

356

proc_time AS PROCTIME()

357

) WITH (

358

'connector' = 'kafka',

359

'topic' = 'transactions'

360

);

361

362

CREATE TABLE customers (

363

customer_id STRING,

364

name STRING,

365

tier STRING,

366

region STRING,

367

credit_limit DECIMAL(10,2)

368

) WITH (

369

'connector' = 'hbase',

370

'table-name' = 'customer_profiles',

371

'zookeeper.quorum' = 'localhost:2181'

372

);

373

374

-- Enriched transaction stream

375

SELECT

376

t.transaction_id,

377

t.customer_id,

378

t.amount,

379

c.name as customer_name,

380

c.tier as customer_tier,

381

c.region,

382

CASE

383

WHEN t.amount > c.credit_limit THEN 'OVERLIMIT'

384

ELSE 'NORMAL'

385

END as transaction_status

386

FROM transactions t

387

JOIN customers FOR SYSTEM_TIME AS OF t.proc_time AS c

388

ON t.customer_id = c.customer_id;

389

```

390

391

### Multi-Level Lookups

392

393

```sql

394

-- Multiple lookup joins for complex enrichment

395

CREATE TABLE events (

396

event_id STRING,

397

user_id STRING,

398

product_id STRING,

399

action STRING,

400

event_time TIMESTAMP(3),

401

proc_time AS PROCTIME()

402

) WITH ('connector' = 'kafka', 'topic' = 'user-events');

403

404

CREATE TABLE users (

405

user_id STRING,

406

name STRING,

407

segment STRING

408

) WITH ('connector' = 'hbase', 'table-name' = 'users', 'zookeeper.quorum' = 'localhost:2181');

409

410

CREATE TABLE products (

411

product_id STRING,

412

name STRING,

413

category STRING,

414

price DECIMAL(10,2)

415

) WITH ('connector' = 'hbase', 'table-name' = 'products', 'zookeeper.quorum' = 'localhost:2181');

416

417

-- Multi-level enriched stream

418

SELECT

419

e.event_id,

420

e.action,

421

u.name as user_name,

422

u.segment as user_segment,

423

p.name as product_name,

424

p.category as product_category,

425

p.price as product_price

426

FROM events e

427

JOIN users FOR SYSTEM_TIME AS OF e.proc_time AS u ON e.user_id = u.user_id

428

JOIN products FOR SYSTEM_TIME AS OF e.proc_time AS p ON e.product_id = p.product_id;

429

```

430

431

## Best Practices

432

433

### Lookup Performance

434

435

1. **Use appropriate row key design**: Ensure row keys are well-distributed for even load

436

2. **Configure proper timeouts**: Set reasonable RPC and operation timeouts

437

3. **Enable connection pooling**: Use connection pools for better resource utilization

438

4. **Monitor lookup latency**: Track lookup performance with metrics

439

5. **Consider caching**: Enable HBase client-side caching for hot data

440

441

### Schema Design for Lookups

442

443

```java

444

// Design schema for efficient lookups

445

HBaseTableSchema efficientLookupSchema = new HBaseTableSchema();

446

447

// Use meaningful row key

448

efficientLookupSchema.setRowKey("user_id", String.class);

449

450

// Group related data in same column family for locality

451

efficientLookupSchema.addColumn("profile", "name", String.class);

452

efficientLookupSchema.addColumn("profile", "email", String.class);

453

efficientLookupSchema.addColumn("profile", "department", String.class);

454

455

// Separate frequently changing data

456

efficientLookupSchema.addColumn("activity", "last_login", java.sql.Timestamp.class);

457

efficientLookupSchema.addColumn("activity", "login_count", Long.class);

458

459

// Use appropriate data types

460

efficientLookupSchema.addColumn("settings", "preferences", String.class); // JSON as string

461

efficientLookupSchema.addColumn("binary", "avatar", byte[].class); // Binary data

462

```

463

464

### Error Recovery

465

466

```java

467

// Implement robust error handling

468

public class RobustLookupFunction extends TableFunction<Row> {

469

private static final int MAX_RETRIES = 3;

470

private static final long RETRY_DELAY_MS = 1000;

471

472

public void eval(Object rowKey) {

473

int attempts = 0;

474

Exception lastException = null;

475

476

while (attempts < MAX_RETRIES) {

477

try {

478

// Perform lookup

479

performLookup(rowKey);

480

return; // Success

481

} catch (Exception e) {

482

lastException = e;

483

attempts++;

484

485

if (attempts < MAX_RETRIES) {

486

try {

487

Thread.sleep(RETRY_DELAY_MS * attempts); // Exponential backoff

488

} catch (InterruptedException ie) {

489

Thread.currentThread().interrupt();

490

break;

491

}

492

}

493

}

494

}

495

496

// All retries failed - emit default/empty result

497

LOG.error("Lookup failed after {} attempts for key: {}", MAX_RETRIES, rowKey, lastException);

498

emitDefaultResult(rowKey);

499

}

500

501

private void emitDefaultResult(Object rowKey) {

502

// Emit row with null values for missing data

503

collect(Row.of(rowKey, null, null, null, null));

504

}

505

}

506

```