0
# Table API Integration
1
2
The HBase connector provides comprehensive integration with Flink's Table API, enabling SQL-based data processing, declarative table definitions, and lookup joins. This includes table sources, sinks, factory classes, and descriptor-based configuration.
3
4
## HBaseTableSource
5
6
A table source that provides HBase table data to the Table API with support for both batch and streaming queries, plus lookup functionality for temporal joins.
7
8
```java { .api }
9
class HBaseTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> {
10
public HBaseTableSource(Configuration conf, String tableName);
11
12
// Schema configuration
13
public void addColumn(String family, String qualifier, Class<?> clazz);
14
public void setRowKey(String rowKeyName, Class<?> clazz);
15
public void setCharset(String charset);
16
17
// Table source methods
18
public TypeInformation<Row> getReturnType();
19
public TableSchema getTableSchema();
20
public DataSet<Row> getDataSet(ExecutionEnvironment execEnv);
21
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv);
22
public HBaseTableSource projectFields(int[] fields);
23
public String explainSource();
24
25
// Lookup capabilities
26
public TableFunction<Row> getLookupFunction(String[] lookupKeys);
27
public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys); // Throws UnsupportedOperationException
28
public boolean isAsyncEnabled(); // Returns false
29
public boolean isBounded(); // Returns true
30
}
31
```
32
33
### Basic Table Source Usage
34
35
```java
36
import org.apache.flink.addons.hbase.HBaseTableSource;
37
import org.apache.flink.table.api.EnvironmentSettings;
38
import org.apache.flink.table.api.TableEnvironment;
39
import org.apache.hadoop.conf.Configuration;
40
41
// Configure HBase connection
42
Configuration conf = new Configuration();
43
conf.set("hbase.zookeeper.quorum", "localhost:2181");
44
45
// Create table source
46
HBaseTableSource tableSource = new HBaseTableSource(conf, "user_profiles");
47
48
// Define schema
49
tableSource.setRowKey("user_id", String.class);
50
tableSource.addColumn("profile", "name", String.class);
51
tableSource.addColumn("profile", "age", Integer.class);
52
tableSource.addColumn("profile", "email", String.class);
53
tableSource.addColumn("activity", "last_login", java.sql.Timestamp.class);
54
tableSource.addColumn("activity", "login_count", Long.class);
55
56
// Register with Table Environment
57
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
58
tableEnv.registerTableSource("users", tableSource);
59
60
// Query the table
61
Table result = tableEnv.sqlQuery(
62
"SELECT user_id, name, age FROM users WHERE age > 21 AND login_count > 10"
63
);
64
```
65
66
### Column Projection
67
68
```java
69
// Project specific fields for better performance
70
int[] projectedFields = {0, 1, 3}; // user_id, name, email
71
HBaseTableSource projectedSource = tableSource.projectFields(projectedFields);
72
73
tableEnv.registerTableSource("users_projected", projectedSource);
74
Table result = tableEnv.sqlQuery("SELECT * FROM users_projected");
75
```
76
77
## HBaseUpsertTableSink
78
79
A table sink that handles upsert and delete operations for HBase tables through the Table API.
80
81
```java { .api }
82
class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {
83
public HBaseUpsertTableSink(HBaseTableSchema hbaseTableSchema,
84
HBaseOptions hbaseOptions, HBaseWriteOptions writeOptions);
85
86
// Configuration methods
87
public void setKeyFields(String[] keys); // Ignored - HBase always upserts on rowkey
88
public void setIsAppendOnly(Boolean isAppendOnly);
89
90
// Table sink methods
91
public TypeInformation<Row> getRecordType();
92
public TableSchema getTableSchema();
93
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);
94
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);
95
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes);
96
}
97
```
98
99
### Basic Table Sink Usage
100
101
```java
102
import org.apache.flink.addons.hbase.HBaseUpsertTableSink;
103
import org.apache.flink.addons.hbase.HBaseTableSchema;
104
import org.apache.flink.addons.hbase.HBaseOptions;
105
import org.apache.flink.addons.hbase.HBaseWriteOptions;
106
107
// Define HBase table schema
108
HBaseTableSchema schema = new HBaseTableSchema();
109
schema.setRowKey("user_id", String.class);
110
schema.addColumn("profile", "name", String.class);
111
schema.addColumn("profile", "age", Integer.class);
112
schema.addColumn("activity", "last_login", java.sql.Timestamp.class);
113
114
// Configure HBase connection
115
HBaseOptions hbaseOptions = HBaseOptions.builder()
116
.setTableName("user_profiles")
117
.setZkQuorum("localhost:2181")
118
.build();
119
120
// Configure write options
121
HBaseWriteOptions writeOptions = HBaseWriteOptions.builder()
122
.setBufferFlushMaxSizeInBytes(2 * 1024 * 1024) // 2MB
123
.setBufferFlushMaxRows(1000)
124
.setBufferFlushIntervalMillis(5000) // 5 seconds
125
.build();
126
127
// Create table sink
128
HBaseUpsertTableSink tableSink = new HBaseUpsertTableSink(schema, hbaseOptions, writeOptions);
129
130
// Register with Table Environment
131
tableEnv.registerTableSink("user_sink", tableSink);
132
133
// Insert data via SQL
134
tableEnv.sqlUpdate(
135
"INSERT INTO user_sink " +
136
"SELECT user_id, name, age, CURRENT_TIMESTAMP " +
137
"FROM source_table WHERE age > 18"
138
);
139
```
140
141
## HBaseTableFactory
142
143
Factory class for creating HBase table sources and sinks using connector properties.
144
145
```java { .api }
146
class HBaseTableFactory implements
147
StreamTableSourceFactory<Row>, StreamTableSinkFactory<Tuple2<Boolean, Row>> {
148
149
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties);
150
public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties);
151
public Map<String, String> requiredContext();
152
public List<String> supportedProperties();
153
}
154
```
155
156
### Using Table Factory with Properties
157
158
```java
159
import org.apache.flink.table.descriptors.ConnectorDescriptor;
160
import java.util.Map;
161
import java.util.HashMap;
162
163
// Define connector properties
164
Map<String, String> properties = new HashMap<>();
165
properties.put("connector.type", "hbase");
166
properties.put("connector.version", "1.4.3");
167
properties.put("connector.table-name", "user_profiles");
168
properties.put("connector.zookeeper.quorum", "localhost:2181");
169
properties.put("connector.zookeeper.znode.parent", "/hbase");
170
171
// Schema properties
172
properties.put("schema.0.name", "user_id");
173
properties.put("schema.0.data-type", "VARCHAR");
174
properties.put("schema.1.name", "name");
175
properties.put("schema.1.data-type", "VARCHAR");
176
properties.put("schema.2.name", "age");
177
properties.put("schema.2.data-type", "INT");
178
179
// Create source via factory
180
HBaseTableFactory factory = new HBaseTableFactory();
181
StreamTableSource<Row> source = factory.createStreamTableSource(properties);
182
```
183
184
## Table Descriptors
185
186
Declarative configuration using Flink's descriptor API for Table DDL.
187
188
### HBase Descriptor
189
190
```java { .api }
191
class HBase extends ConnectorDescriptor {
192
public HBase();
193
194
// Required configuration
195
public HBase version(String version);
196
public HBase tableName(String tableName);
197
public HBase zookeeperQuorum(String zookeeperQuorum);
198
199
// Optional configuration
200
public HBase zookeeperNodeParent(String zookeeperNodeParent);
201
public HBase writeBufferFlushMaxSize(String maxSize);
202
public HBase writeBufferFlushMaxRows(int writeBufferFlushMaxRows);
203
public HBase writeBufferFlushInterval(String interval);
204
}
205
```
206
207
### Descriptor Usage Example
208
209
```java
210
import org.apache.flink.table.descriptors.HBase;
211
import org.apache.flink.table.descriptors.Schema;
212
import org.apache.flink.table.api.DataTypes;
213
214
// Create HBase table using descriptors
215
tableEnv.connect(
216
new HBase()
217
.version("1.4.3")
218
.tableName("user_events")
219
.zookeeperQuorum("zk1:2181,zk2:2181,zk3:2181")
220
.zookeeperNodeParent("/hbase")
221
.writeBufferFlushMaxSize("4mb")
222
.writeBufferFlushMaxRows(2000)
223
.writeBufferFlushInterval("10s")
224
)
225
.withSchema(
226
new Schema()
227
.field("rowkey", DataTypes.STRING())
228
.field("event_type", DataTypes.STRING())
229
.field("user_id", DataTypes.STRING())
230
.field("timestamp", DataTypes.TIMESTAMP(3))
231
.field("properties", DataTypes.STRING())
232
)
233
.createTemporaryTable("events");
234
235
// Use the table in SQL
236
Table events = tableEnv.sqlQuery(
237
"SELECT user_id, event_type, timestamp " +
238
"FROM events " +
239
"WHERE event_type = 'login' AND user_id IS NOT NULL"
240
);
241
```
242
243
## SQL DDL Support
244
245
Create HBase tables using SQL DDL statements:
246
247
```sql
248
-- Create HBase source table
249
CREATE TABLE user_profiles (
250
user_id STRING,
251
name STRING,
252
age INT,
253
email STRING,
254
last_login TIMESTAMP(3),
255
login_count BIGINT
256
) WITH (
257
'connector' = 'hbase',
258
'table-name' = 'user_profiles',
259
'zookeeper.quorum' = 'localhost:2181'
260
);
261
262
-- Create HBase sink table with write options
263
CREATE TABLE user_sink (
264
user_id STRING,
265
name STRING,
266
age INT,
267
registration_time TIMESTAMP(3)
268
) WITH (
269
'connector' = 'hbase',
270
'table-name' = 'users',
271
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
272
'write.buffer-flush.max-size' = '4mb',
273
'write.buffer-flush.max-rows' = '2000',
274
'write.buffer-flush.interval' = '10s'
275
);
276
```
277
278
## Temporal Table Joins (Lookup)
279
280
Use HBase as a dimension table for enriching streaming data with lookup joins:
281
282
```java
283
// Register HBase dimension table
284
tableEnv.connect(
285
new HBase()
286
.version("1.4.3")
287
.tableName("user_profiles")
288
.zookeeperQuorum("localhost:2181")
289
)
290
.withSchema(
291
new Schema()
292
.field("user_id", DataTypes.STRING())
293
.field("name", DataTypes.STRING())
294
.field("age", DataTypes.INT())
295
.field("email", DataTypes.STRING())
296
)
297
.createTemporaryTable("user_dim");
298
299
// Create a processing time temporal table
300
tableEnv.sqlUpdate(
301
"CREATE VIEW user_dim_temporal AS " +
302
"SELECT *, PROCTIME() as proc_time FROM user_dim"
303
);
304
305
// Perform lookup join
306
Table enrichedEvents = tableEnv.sqlQuery(
307
"SELECT " +
308
" e.event_id, " +
309
" e.user_id, " +
310
" e.event_type, " +
311
" u.name, " +
312
" u.email " +
313
"FROM events e " +
314
"JOIN user_dim_temporal FOR SYSTEM_TIME AS OF e.proc_time AS u " +
315
"ON e.user_id = u.user_id"
316
);
317
```
318
319
### Lookup Join with SQL DDL
320
321
```sql
322
-- Source stream table
323
CREATE TABLE user_events (
324
event_id STRING,
325
user_id STRING,
326
event_type STRING,
327
event_time TIMESTAMP(3),
328
proc_time AS PROCTIME()
329
) WITH (
330
'connector' = 'kafka',
331
'topic' = 'user-events'
332
);
333
334
-- HBase lookup table
335
CREATE TABLE user_profiles (
336
user_id STRING,
337
name STRING,
338
age INT,
339
email STRING
340
) WITH (
341
'connector' = 'hbase',
342
'table-name' = 'user_profiles',
343
'zookeeper.quorum' = 'localhost:2181'
344
);
345
346
-- Lookup join query
347
SELECT
348
e.event_id,
349
e.user_id,
350
e.event_type,
351
u.name,
352
u.email
353
FROM user_events e
354
JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
355
ON e.user_id = u.user_id;
356
```
357
358
## HBaseValidator Constants
359
360
Property key constants for HBase connector configuration:
361
362
```java { .api }
363
class HBaseValidator {
364
public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
365
public static final String CONNECTOR_VERSION_VALUE_143 = "1.4.3";
366
public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
367
public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum";
368
public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent";
369
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size";
370
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows";
371
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval";
372
373
public void validate(DescriptorProperties properties);
374
}
375
```
376
377
## Configuration Properties
378
379
Complete list of supported connector properties:
380
381
### Required Properties
382
383
| Property | Description | Example |
384
|----------|-------------|---------|
385
| `connector.type` | Connector type (must be "hbase") | `"hbase"` |
386
| `connector.version` | HBase version | `"1.4.3"` |
387
| `connector.table-name` | HBase table name | `"my_table"` |
388
| `connector.zookeeper.quorum` | ZooKeeper ensemble | `"zk1:2181,zk2:2181"` |
389
390
### Optional Properties
391
392
| Property | Description | Default | Example |
393
|----------|-------------|---------|---------|
394
| `connector.zookeeper.znode.parent` | ZK parent node | `"/hbase"` | `"/hbase-prod"` |
395
| `connector.write.buffer-flush.max-size` | Max buffer size | `"2mb"` | `"4mb"` |
396
| `connector.write.buffer-flush.max-rows` | Max buffered rows | `1000` | `2000` |
397
| `connector.write.buffer-flush.interval` | Flush interval | `"5s"` | `"10s"` |
398
399
## Advanced Patterns
400
401
### Partitioned Table Processing
402
403
```java
404
// Process HBase table data by row key ranges
405
Table rangeQuery = tableEnv.sqlQuery(
406
"SELECT * FROM users " +
407
"WHERE user_id BETWEEN 'user_00001' AND 'user_99999'"
408
);
409
410
// Use row key prefixes for efficient scanning
411
Table prefixQuery = tableEnv.sqlQuery(
412
"SELECT * FROM events " +
413
"WHERE rowkey LIKE 'user123_%'"
414
);
415
```
416
417
### Aggregations and Analytics
418
419
```java
420
// Aggregate data from HBase
421
Table analytics = tableEnv.sqlQuery(
422
"SELECT " +
423
" DATE_FORMAT(last_login, 'yyyy-MM-dd') as login_date, " +
424
" COUNT(*) as user_count, " +
425
" AVG(age) as avg_age " +
426
"FROM user_profiles " +
427
"WHERE last_login >= CURRENT_DATE - INTERVAL '7' DAY " +
428
"GROUP BY DATE_FORMAT(last_login, 'yyyy-MM-dd')"
429
);
430
```
431
432
### Complex Data Types
433
434
```java
435
// Handle complex data types in HBase
436
tableEnv.connect(
437
new HBase()
438
.version("1.4.3")
439
.tableName("complex_data")
440
.zookeeperQuorum("localhost:2181")
441
)
442
.withSchema(
443
new Schema()
444
.field("id", DataTypes.STRING())
445
.field("binary_data", DataTypes.BYTES()) // byte[] data
446
.field("json_data", DataTypes.STRING()) // JSON as string
447
.field("decimal_value", DataTypes.DECIMAL(10, 2))
448
.field("timestamp_value", DataTypes.TIMESTAMP(3))
449
)
450
.createTemporaryTable("complex_table");
451
```
452
453
## Performance Tuning
454
455
### Source Performance
456
457
```java
458
// Optimize HBase source scanning
459
HBaseTableSource optimizedSource = new HBaseTableSource(conf, "large_table");
460
461
// Configure HBase client for better scan performance
462
conf.setInt("hbase.client.scanner.caching", 1000);
463
conf.setInt("hbase.client.scanner.max.result.size", 2 * 1024 * 1024);
464
conf.setBoolean("hbase.client.scanner.async.prefetch", true);
465
```
466
467
### Sink Performance
468
469
```java
470
// High-throughput sink configuration
471
HBaseWriteOptions highThroughputOptions = HBaseWriteOptions.builder()
472
.setBufferFlushMaxSizeInBytes(16 * 1024 * 1024) // 16MB
473
.setBufferFlushMaxRows(10000)
474
.setBufferFlushIntervalMillis(30000) // 30 seconds
475
.build();
476
```
477
478
### Parallelism Configuration
479
480
```java
481
// Configure parallelism for HBase operations
482
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
483
env.setParallelism(8); // Match number of HBase regions
484
485
// Set specific parallelism for HBase operations
486
DataStream<Row> hbaseStream = tableEnv.toAppendStream(hbaseTable, Row.class);
487
hbaseStream.addSink(hbaseSink).setParallelism(4);
488
```