or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md

table-api.mddocs/

0

# Table API Integration

1

2

The HBase connector provides comprehensive integration with Flink's Table API, enabling SQL-based data processing, declarative table definitions, and lookup joins. This includes table sources, sinks, factory classes, and descriptor-based configuration.

3

4

## HBaseTableSource

5

6

A table source that provides HBase table data to the Table API with support for both batch and streaming queries, plus lookup functionality for temporal joins.

7

8

```java { .api }

9

class HBaseTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> {

10

public HBaseTableSource(Configuration conf, String tableName);

11

12

// Schema configuration

13

public void addColumn(String family, String qualifier, Class<?> clazz);

14

public void setRowKey(String rowKeyName, Class<?> clazz);

15

public void setCharset(String charset);

16

17

// Table source methods

18

public TypeInformation<Row> getReturnType();

19

public TableSchema getTableSchema();

20

public DataSet<Row> getDataSet(ExecutionEnvironment execEnv);

21

public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv);

22

public HBaseTableSource projectFields(int[] fields);

23

public String explainSource();

24

25

// Lookup capabilities

26

public TableFunction<Row> getLookupFunction(String[] lookupKeys);

27

public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys); // Throws UnsupportedOperationException

28

public boolean isAsyncEnabled(); // Returns false

29

public boolean isBounded(); // Returns true

30

}

31

```

32

33

### Basic Table Source Usage

34

35

```java

36

import org.apache.flink.addons.hbase.HBaseTableSource;

37

import org.apache.flink.table.api.EnvironmentSettings;

38

import org.apache.flink.table.api.TableEnvironment;

39

import org.apache.hadoop.conf.Configuration;

40

41

// Configure HBase connection

42

Configuration conf = new Configuration();

43

conf.set("hbase.zookeeper.quorum", "localhost:2181");

44

45

// Create table source

46

HBaseTableSource tableSource = new HBaseTableSource(conf, "user_profiles");

47

48

// Define schema

49

tableSource.setRowKey("user_id", String.class);

50

tableSource.addColumn("profile", "name", String.class);

51

tableSource.addColumn("profile", "age", Integer.class);

52

tableSource.addColumn("profile", "email", String.class);

53

tableSource.addColumn("activity", "last_login", java.sql.Timestamp.class);

54

tableSource.addColumn("activity", "login_count", Long.class);

55

56

// Register with Table Environment

57

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());

58

tableEnv.registerTableSource("users", tableSource);

59

60

// Query the table

61

Table result = tableEnv.sqlQuery(

62

"SELECT user_id, name, age FROM users WHERE age > 21 AND login_count > 10"

63

);

64

```

65

66

### Column Projection

67

68

```java

69

// Project specific fields for better performance

70

int[] projectedFields = {0, 1, 3}; // user_id, name, email

71

HBaseTableSource projectedSource = tableSource.projectFields(projectedFields);

72

73

tableEnv.registerTableSource("users_projected", projectedSource);

74

Table result = tableEnv.sqlQuery("SELECT * FROM users_projected");

75

```

76

77

## HBaseUpsertTableSink

78

79

A table sink that handles upsert and delete operations for HBase tables through the Table API.

80

81

```java { .api }

82

class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {

83

public HBaseUpsertTableSink(HBaseTableSchema hbaseTableSchema,

84

HBaseOptions hbaseOptions, HBaseWriteOptions writeOptions);

85

86

// Configuration methods

87

public void setKeyFields(String[] keys); // Ignored - HBase always upserts on rowkey

88

public void setIsAppendOnly(Boolean isAppendOnly);

89

90

// Table sink methods

91

public TypeInformation<Row> getRecordType();

92

public TableSchema getTableSchema();

93

public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);

94

public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);

95

public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes);

96

}

97

```

98

99

### Basic Table Sink Usage

100

101

```java

102

import org.apache.flink.addons.hbase.HBaseUpsertTableSink;

103

import org.apache.flink.addons.hbase.HBaseTableSchema;

104

import org.apache.flink.addons.hbase.HBaseOptions;

105

import org.apache.flink.addons.hbase.HBaseWriteOptions;

106

107

// Define HBase table schema

108

HBaseTableSchema schema = new HBaseTableSchema();

109

schema.setRowKey("user_id", String.class);

110

schema.addColumn("profile", "name", String.class);

111

schema.addColumn("profile", "age", Integer.class);

112

schema.addColumn("activity", "last_login", java.sql.Timestamp.class);

113

114

// Configure HBase connection

115

HBaseOptions hbaseOptions = HBaseOptions.builder()

116

.setTableName("user_profiles")

117

.setZkQuorum("localhost:2181")

118

.build();

119

120

// Configure write options

121

HBaseWriteOptions writeOptions = HBaseWriteOptions.builder()

122

.setBufferFlushMaxSizeInBytes(2 * 1024 * 1024) // 2MB

123

.setBufferFlushMaxRows(1000)

124

.setBufferFlushIntervalMillis(5000) // 5 seconds

125

.build();

126

127

// Create table sink

128

HBaseUpsertTableSink tableSink = new HBaseUpsertTableSink(schema, hbaseOptions, writeOptions);

129

130

// Register with Table Environment

131

tableEnv.registerTableSink("user_sink", tableSink);

132

133

// Insert data via SQL

134

tableEnv.sqlUpdate(

135

"INSERT INTO user_sink " +

136

"SELECT user_id, name, age, CURRENT_TIMESTAMP " +

137

"FROM source_table WHERE age > 18"

138

);

139

```

140

141

## HBaseTableFactory

142

143

Factory class for creating HBase table sources and sinks using connector properties.

144

145

```java { .api }

146

class HBaseTableFactory implements

147

StreamTableSourceFactory<Row>, StreamTableSinkFactory<Tuple2<Boolean, Row>> {

148

149

public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties);

150

public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties);

151

public Map<String, String> requiredContext();

152

public List<String> supportedProperties();

153

}

154

```

155

156

### Using Table Factory with Properties

157

158

```java

159

import org.apache.flink.table.descriptors.ConnectorDescriptor;

160

import java.util.Map;

161

import java.util.HashMap;

162

163

// Define connector properties

164

Map<String, String> properties = new HashMap<>();

165

properties.put("connector.type", "hbase");

166

properties.put("connector.version", "1.4.3");

167

properties.put("connector.table-name", "user_profiles");

168

properties.put("connector.zookeeper.quorum", "localhost:2181");

169

properties.put("connector.zookeeper.znode.parent", "/hbase");

170

171

// Schema properties

172

properties.put("schema.0.name", "user_id");

173

properties.put("schema.0.data-type", "VARCHAR");

174

properties.put("schema.1.name", "name");

175

properties.put("schema.1.data-type", "VARCHAR");

176

properties.put("schema.2.name", "age");

177

properties.put("schema.2.data-type", "INT");

178

179

// Create source via factory

180

HBaseTableFactory factory = new HBaseTableFactory();

181

StreamTableSource<Row> source = factory.createStreamTableSource(properties);

182

```

183

184

## Table Descriptors

185

186

Declarative configuration using Flink's descriptor API for Table DDL.

187

188

### HBase Descriptor

189

190

```java { .api }

191

class HBase extends ConnectorDescriptor {

192

public HBase();

193

194

// Required configuration

195

public HBase version(String version);

196

public HBase tableName(String tableName);

197

public HBase zookeeperQuorum(String zookeeperQuorum);

198

199

// Optional configuration

200

public HBase zookeeperNodeParent(String zookeeperNodeParent);

201

public HBase writeBufferFlushMaxSize(String maxSize);

202

public HBase writeBufferFlushMaxRows(int writeBufferFlushMaxRows);

203

public HBase writeBufferFlushInterval(String interval);

204

}

205

```

206

207

### Descriptor Usage Example

208

209

```java

210

import org.apache.flink.table.descriptors.HBase;

211

import org.apache.flink.table.descriptors.Schema;

212

import org.apache.flink.table.api.DataTypes;

213

214

// Create HBase table using descriptors

215

tableEnv.connect(

216

new HBase()

217

.version("1.4.3")

218

.tableName("user_events")

219

.zookeeperQuorum("zk1:2181,zk2:2181,zk3:2181")

220

.zookeeperNodeParent("/hbase")

221

.writeBufferFlushMaxSize("4mb")

222

.writeBufferFlushMaxRows(2000)

223

.writeBufferFlushInterval("10s")

224

)

225

.withSchema(

226

new Schema()

227

.field("rowkey", DataTypes.STRING())

228

.field("event_type", DataTypes.STRING())

229

.field("user_id", DataTypes.STRING())

230

.field("timestamp", DataTypes.TIMESTAMP(3))

231

.field("properties", DataTypes.STRING())

232

)

233

.createTemporaryTable("events");

234

235

// Use the table in SQL

236

Table events = tableEnv.sqlQuery(

237

"SELECT user_id, event_type, timestamp " +

238

"FROM events " +

239

"WHERE event_type = 'login' AND user_id IS NOT NULL"

240

);

241

```

242

243

## SQL DDL Support

244

245

Create HBase tables using SQL DDL statements:

246

247

```sql

248

-- Create HBase source table

249

CREATE TABLE user_profiles (

250

user_id STRING,

251

name STRING,

252

age INT,

253

email STRING,

254

last_login TIMESTAMP(3),

255

login_count BIGINT

256

) WITH (

257

'connector' = 'hbase',

258

'table-name' = 'user_profiles',

259

'zookeeper.quorum' = 'localhost:2181'

260

);

261

262

-- Create HBase sink table with write options

263

CREATE TABLE user_sink (

264

user_id STRING,

265

name STRING,

266

age INT,

267

registration_time TIMESTAMP(3)

268

) WITH (

269

'connector' = 'hbase',

270

'table-name' = 'users',

271

'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',

272

'write.buffer-flush.max-size' = '4mb',

273

'write.buffer-flush.max-rows' = '2000',

274

'write.buffer-flush.interval' = '10s'

275

);

276

```

277

278

## Temporal Table Joins (Lookup)

279

280

Use HBase as a dimension table for enriching streaming data with lookup joins:

281

282

```java

283

// Register HBase dimension table

284

tableEnv.connect(

285

new HBase()

286

.version("1.4.3")

287

.tableName("user_profiles")

288

.zookeeperQuorum("localhost:2181")

289

)

290

.withSchema(

291

new Schema()

292

.field("user_id", DataTypes.STRING())

293

.field("name", DataTypes.STRING())

294

.field("age", DataTypes.INT())

295

.field("email", DataTypes.STRING())

296

)

297

.createTemporaryTable("user_dim");

298

299

// Create a processing time temporal table

300

tableEnv.sqlUpdate(

301

"CREATE VIEW user_dim_temporal AS " +

302

"SELECT *, PROCTIME() as proc_time FROM user_dim"

303

);

304

305

// Perform lookup join

306

Table enrichedEvents = tableEnv.sqlQuery(

307

"SELECT " +

308

" e.event_id, " +

309

" e.user_id, " +

310

" e.event_type, " +

311

" u.name, " +

312

" u.email " +

313

"FROM events e " +

314

"JOIN user_dim_temporal FOR SYSTEM_TIME AS OF e.proc_time AS u " +

315

"ON e.user_id = u.user_id"

316

);

317

```

318

319

### Lookup Join with SQL DDL

320

321

```sql

322

-- Source stream table

323

CREATE TABLE user_events (

324

event_id STRING,

325

user_id STRING,

326

event_type STRING,

327

event_time TIMESTAMP(3),

328

proc_time AS PROCTIME()

329

) WITH (

330

'connector' = 'kafka',

331

'topic' = 'user-events'

332

);

333

334

-- HBase lookup table

335

CREATE TABLE user_profiles (

336

user_id STRING,

337

name STRING,

338

age INT,

339

email STRING

340

) WITH (

341

'connector' = 'hbase',

342

'table-name' = 'user_profiles',

343

'zookeeper.quorum' = 'localhost:2181'

344

);

345

346

-- Lookup join query

347

SELECT

348

e.event_id,

349

e.user_id,

350

e.event_type,

351

u.name,

352

u.email

353

FROM user_events e

354

JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u

355

ON e.user_id = u.user_id;

356

```

357

358

## HBaseValidator Constants

359

360

Property key constants for HBase connector configuration:

361

362

```java { .api }

363

class HBaseValidator {

364

public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";

365

public static final String CONNECTOR_VERSION_VALUE_143 = "1.4.3";

366

public static final String CONNECTOR_TABLE_NAME = "connector.table-name";

367

public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum";

368

public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent";

369

public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size";

370

public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows";

371

public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval";

372

373

public void validate(DescriptorProperties properties);

374

}

375

```

376

377

## Configuration Properties

378

379

Complete list of supported connector properties:

380

381

### Required Properties

382

383

| Property | Description | Example |

384

|----------|-------------|---------|

385

| `connector.type` | Connector type (must be "hbase") | `"hbase"` |

386

| `connector.version` | HBase version | `"1.4.3"` |

387

| `connector.table-name` | HBase table name | `"my_table"` |

388

| `connector.zookeeper.quorum` | ZooKeeper ensemble | `"zk1:2181,zk2:2181"` |

389

390

### Optional Properties

391

392

| Property | Description | Default | Example |

393

|----------|-------------|---------|---------|

394

| `connector.zookeeper.znode.parent` | ZK parent node | `"/hbase"` | `"/hbase-prod"` |

395

| `connector.write.buffer-flush.max-size` | Max buffer size | `"2mb"` | `"4mb"` |

396

| `connector.write.buffer-flush.max-rows` | Max buffered rows | `1000` | `2000` |

397

| `connector.write.buffer-flush.interval` | Flush interval | `"5s"` | `"10s"` |

398

399

## Advanced Patterns

400

401

### Partitioned Table Processing

402

403

```java

404

// Process HBase table data by row key ranges

405

Table rangeQuery = tableEnv.sqlQuery(

406

"SELECT * FROM users " +

407

"WHERE user_id BETWEEN 'user_00001' AND 'user_99999'"

408

);

409

410

// Use row key prefixes for efficient scanning

411

Table prefixQuery = tableEnv.sqlQuery(

412

"SELECT * FROM events " +

413

"WHERE rowkey LIKE 'user123_%'"

414

);

415

```

416

417

### Aggregations and Analytics

418

419

```java

420

// Aggregate data from HBase

421

Table analytics = tableEnv.sqlQuery(

422

"SELECT " +

423

" DATE_FORMAT(last_login, 'yyyy-MM-dd') as login_date, " +

424

" COUNT(*) as user_count, " +

425

" AVG(age) as avg_age " +

426

"FROM user_profiles " +

427

"WHERE last_login >= CURRENT_DATE - INTERVAL '7' DAY " +

428

"GROUP BY DATE_FORMAT(last_login, 'yyyy-MM-dd')"

429

);

430

```

431

432

### Complex Data Types

433

434

```java

435

// Handle complex data types in HBase

436

tableEnv.connect(

437

new HBase()

438

.version("1.4.3")

439

.tableName("complex_data")

440

.zookeeperQuorum("localhost:2181")

441

)

442

.withSchema(

443

new Schema()

444

.field("id", DataTypes.STRING())

445

.field("binary_data", DataTypes.BYTES()) // byte[] data

446

.field("json_data", DataTypes.STRING()) // JSON as string

447

.field("decimal_value", DataTypes.DECIMAL(10, 2))

448

.field("timestamp_value", DataTypes.TIMESTAMP(3))

449

)

450

.createTemporaryTable("complex_table");

451

```

452

453

## Performance Tuning

454

455

### Source Performance

456

457

```java

458

// Optimize HBase source scanning

459

HBaseTableSource optimizedSource = new HBaseTableSource(conf, "large_table");

460

461

// Configure HBase client for better scan performance

462

conf.setInt("hbase.client.scanner.caching", 1000);

463

conf.setInt("hbase.client.scanner.max.result.size", 2 * 1024 * 1024);

464

conf.setBoolean("hbase.client.scanner.async.prefetch", true);

465

```

466

467

### Sink Performance

468

469

```java

470

// High-throughput sink configuration

471

HBaseWriteOptions highThroughputOptions = HBaseWriteOptions.builder()

472

.setBufferFlushMaxSizeInBytes(16 * 1024 * 1024) // 16MB

473

.setBufferFlushMaxRows(10000)

474

.setBufferFlushIntervalMillis(30000) // 30 seconds

475

.build();

476

```

477

478

### Parallelism Configuration

479

480

```java

481

// Configure parallelism for HBase operations

482

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

483

env.setParallelism(8); // Match number of HBase regions

484

485

// Set specific parallelism for HBase operations

486

DataStream<Row> hbaseStream = tableEnv.toAppendStream(hbaseTable, Row.class);

487

hbaseStream.addSink(hbaseSink).setParallelism(4);

488

```