0
# Table Factory and Configuration
1
2
Core factory class for creating HBase table sources and sinks with comprehensive configuration options for the Apache Flink HBase 1.4 Connector.
3
4
## Capabilities
5
6
### HBase1DynamicTableFactory
7
8
Main factory class that implements both `DynamicTableSourceFactory` and `DynamicTableSinkFactory` interfaces to provide HBase table integration with Flink's Table API.
9
10
```java { .api }
11
/**
12
* HBase connector factory for creating dynamic table sources and sinks
13
* Supports connector identifier "hbase-1.4" in Flink SQL DDL statements
14
*/
15
@Internal
16
public class HBase1DynamicTableFactory
17
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
18
19
/**
20
* Creates a dynamic table source for reading from HBase tables
21
* @param context Factory context containing catalog table information and options
22
* @return HBaseDynamicTableSource configured for the specified table
23
*/
24
public DynamicTableSource createDynamicTableSource(Context context);
25
26
/**
27
* Creates a dynamic table sink for writing to HBase tables
28
* @param context Factory context containing catalog table information and options
29
* @return HBaseDynamicTableSink configured for the specified table
30
*/
31
public DynamicTableSink createDynamicTableSink(Context context);
32
33
/**
34
* Returns the unique identifier for this connector factory
35
* @return String identifier "hbase-1.4" used in CREATE TABLE statements
36
*/
37
public String factoryIdentifier();
38
39
/**
40
* Returns the set of required configuration options
41
* @return Set containing TABLE_NAME and ZOOKEEPER_QUORUM options
42
*/
43
public Set<ConfigOption<?>> requiredOptions();
44
45
/**
46
* Returns the set of optional configuration options
47
* @return Set containing all optional configuration parameters
48
*/
49
public Set<ConfigOption<?>> optionalOptions();
50
}
51
```
52
53
**Usage Example:**
54
55
```sql
56
CREATE TABLE my_hbase_table (
57
rowkey STRING,
58
cf1 ROW<col1 STRING, col2 BIGINT>,
59
cf2 ROW<status BOOLEAN, timestamp TIMESTAMP(3)>,
60
PRIMARY KEY (rowkey) NOT ENFORCED
61
) WITH (
62
'connector' = 'hbase-1.4',
63
'table-name' = 'my_table',
64
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
65
'zookeeper.znode.parent' = '/hbase'
66
);
67
```
68
69
## Configuration Options
70
71
### Required Options
72
73
Configuration parameters that must be specified when creating HBase tables.
74
75
```java { .api }
76
// Required: Name of the HBase table to connect to
77
public static final ConfigOption<String> TABLE_NAME =
78
ConfigOptions.key("table-name")
79
.stringType()
80
.noDefaultValue()
81
.withDescription("The name of HBase table to connect.");
82
83
// Required: HBase Zookeeper quorum for cluster connection
84
public static final ConfigOption<String> ZOOKEEPER_QUORUM =
85
ConfigOptions.key("zookeeper.quorum")
86
.stringType()
87
.noDefaultValue()
88
.withDescription("The HBase Zookeeper quorum.");
89
```
90
91
### Optional Options
92
93
Configuration parameters with default values that can be customized for specific use cases.
94
95
```java { .api }
96
// Optional: Zookeeper root directory for HBase cluster
97
public static final ConfigOption<String> ZOOKEEPER_ZNODE_PARENT =
98
ConfigOptions.key("zookeeper.znode.parent")
99
.stringType()
100
.defaultValue("/hbase")
101
.withDescription("The root dir in Zookeeper for HBase cluster.");
102
103
// Optional: Null value representation for string fields
104
public static final ConfigOption<String> NULL_STRING_LITERAL =
105
ConfigOptions.key("null-string-literal")
106
.stringType()
107
.defaultValue("null")
108
.withDescription("Representation for null values for string fields.");
109
110
// Optional: Maximum buffer size for write operations
111
public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_SIZE =
112
ConfigOptions.key("sink.buffer-flush.max-size")
113
.memoryType()
114
.defaultValue(MemorySize.parse("2mb"))
115
.withDescription("Maximum size in memory of buffered rows for each writing request.");
116
117
// Optional: Maximum number of buffered rows
118
public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
119
ConfigOptions.key("sink.buffer-flush.max-rows")
120
.intType()
121
.defaultValue(1000)
122
.withDescription("Maximum number of rows to buffer for each writing request.");
123
124
// Optional: Buffer flush interval
125
public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
126
ConfigOptions.key("sink.buffer-flush.interval")
127
.durationType()
128
.defaultValue(Duration.ofSeconds(1))
129
.withDescription("The interval to flush any buffered rows.");
130
131
// Optional: Sink operator parallelism
132
public static final ConfigOption<Integer> SINK_PARALLELISM =
133
FactoryUtil.SINK_PARALLELISM;
134
135
// Optional: Enable async lookup operations
136
public static final ConfigOption<Boolean> LOOKUP_ASYNC =
137
ConfigOptions.key("lookup.async")
138
.booleanType()
139
.defaultValue(false)
140
.withDescription("whether to set async lookup.");
141
142
// Optional: Maximum lookup cache size
143
public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
144
ConfigOptions.key("lookup.cache.max-rows")
145
.longType()
146
.defaultValue(-1L)
147
.withDescription("the max number of rows of lookup cache.");
148
149
// Optional: Lookup cache time-to-live
150
public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
151
ConfigOptions.key("lookup.cache.ttl")
152
.durationType()
153
.defaultValue(Duration.ofSeconds(0))
154
.withDescription("the cache time to live.");
155
156
// Optional: Maximum lookup retry attempts
157
public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
158
ConfigOptions.key("lookup.max-retries")
159
.intType()
160
.defaultValue(3)
161
.withDescription("the max retry times if lookup database failed.");
162
```
163
164
**Configuration Example:**
165
166
```sql
167
CREATE TABLE orders (
168
order_id STRING,
169
customer ROW<id STRING, name STRING>,
170
order_details ROW<amount DECIMAL(10,2), status STRING, created_at TIMESTAMP(3)>,
171
PRIMARY KEY (order_id) NOT ENFORCED
172
) WITH (
173
'connector' = 'hbase-1.4',
174
'table-name' = 'orders_table',
175
'zookeeper.quorum' = 'zk1:2181,zk2:2181',
176
'zookeeper.znode.parent' = '/hbase',
177
'null-string-literal' = 'NULL',
178
'sink.buffer-flush.max-size' = '4mb',
179
'sink.buffer-flush.max-rows' = '2000',
180
'sink.buffer-flush.interval' = '2s',
181
'lookup.cache.max-rows' = '10000',
182
'lookup.cache.ttl' = '5min',
183
'lookup.max-retries' = '5'
184
);
185
```
186
187
## Integration with Flink Table API
188
189
### Table Schema Requirements
190
191
HBase tables in Flink require a primary key definition that maps to the HBase row key. The schema supports column families through nested ROW types.
192
193
**Schema Validation:**
194
195
- Primary key must be defined (maps to HBase row key)
196
- Column families represented as ROW types
197
- All Flink data types supported through serialization
198
199
**Supported Data Types:**
200
201
- Primitive types: STRING, BIGINT, INT, DOUBLE, FLOAT, BOOLEAN, etc.
202
- Timestamp types: TIMESTAMP(3), TIMESTAMP_LTZ(3)
203
- Complex types: ROW (for column families), ARRAY, MAP
204
- Decimal types: DECIMAL(precision, scale)
205
206
### Error Handling
207
208
The factory provides comprehensive error handling and validation:
209
210
- **Table validation**: Ensures primary key is properly defined
211
- **Configuration validation**: Validates required options are present
212
- **Connection validation**: Verifies HBase cluster connectivity
213
- **Schema validation**: Ensures schema compatibility with HBase storage model