0
# Apache Flink HBase 1.4 Connector
1
2
Apache Flink HBase 1.4 Connector provides comprehensive bidirectional data integration between Apache Flink stream processing framework and HBase 1.4 NoSQL database. The connector enables reading from and writing to HBase tables through Flink's Table API and SQL, with support for exactly-once processing guarantees and high-performance stream processing applications.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-hbase-1.4_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-connector-hbase-1.4_2.11
11
- **Version**: 1.14.6
12
- **Installation**: Add to Maven dependencies:
13
14
```xml
15
<dependency>
16
<groupId>org.apache.flink</groupId>
17
<artifactId>flink-connector-hbase-1.4_2.11</artifactId>
18
<version>1.14.6</version>
19
</dependency>
20
```
21
22
## Core Imports
23
24
```java
25
import org.apache.flink.connector.hbase1.HBase1DynamicTableFactory;
26
import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource;
27
import org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink;
28
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
29
import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
30
import org.apache.flink.connector.hbase.table.HBaseConnectorOptions;
31
```
32
33
## Basic Usage
34
35
### Creating HBase Table in Flink SQL
36
37
```sql
38
CREATE TABLE hbase_table (
39
rowkey STRING,
40
family1 ROW<col1 STRING, col2 BIGINT>,
41
family2 ROW<col1 STRING, col2 BOOLEAN>,
42
PRIMARY KEY (rowkey) NOT ENFORCED
43
) WITH (
44
'connector' = 'hbase-1.4',
45
'table-name' = 'my_hbase_table',
46
'zookeeper.quorum' = 'localhost:2181'
47
);
48
```
49
50
### Reading from HBase
51
52
```sql
53
SELECT rowkey, family1.col1, family2.col2
54
FROM hbase_table
55
WHERE family1.col2 > 100;
56
```
57
58
### Writing to HBase
59
60
```sql
61
INSERT INTO hbase_table
62
SELECT rowkey, family1, family2
63
FROM source_table;
64
```
65
66
## Architecture
67
68
The Apache Flink HBase 1.4 Connector is built around several key components:
69
70
- **Dynamic Table Factory**: `HBase1DynamicTableFactory` serves as the main entry point for creating HBase table sources and sinks through Flink's Table API
71
- **Source Integration**: `HBaseDynamicTableSource` and `HBaseRowDataInputFormat` handle reading data from HBase tables with support for region-aware splitting
72
- **Sink Integration**: `HBaseDynamicTableSink` provides buffered write operations with configurable flushing strategies
73
- **Configuration System**: Comprehensive configuration options for connection settings, performance tuning, and caching
74
- **Type System**: Full integration with Flink's type system and automatic serialization/deserialization of HBase data
75
76
## Capabilities
77
78
### Table Factory and Configuration
79
80
Core factory class for creating HBase table sources and sinks with comprehensive configuration options. Handles connector registration and table creation.
81
82
```java { .api }
83
public class HBase1DynamicTableFactory
84
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
85
86
public DynamicTableSource createDynamicTableSource(Context context);
87
public DynamicTableSink createDynamicTableSink(Context context);
88
public String factoryIdentifier(); // Returns "hbase-1.4"
89
public Set<ConfigOption<?>> requiredOptions();
90
public Set<ConfigOption<?>> optionalOptions();
91
}
92
```
93
94
[Table Factory and Configuration](./table-factory.md)
95
96
### Source Operations
97
98
Reading data from HBase tables with support for batch and lookup operations, region-aware splitting, and configurable caching.
99
100
```java { .api }
101
public class HBaseDynamicTableSource extends AbstractHBaseDynamicTableSource {
102
public HBaseDynamicTableSource(
103
Configuration conf,
104
String tableName,
105
HBaseTableSchema hbaseSchema,
106
String nullStringLiteral,
107
HBaseLookupOptions lookupOptions
108
);
109
110
public DynamicTableSource copy();
111
public InputFormat<RowData, ?> getInputFormat();
112
public HBaseLookupOptions getLookupOptions();
113
}
114
```
115
116
[Source Operations](./source-operations.md)
117
118
### Sink Operations
119
120
Writing data to HBase tables with configurable buffering, batching, and exactly-once processing guarantees.
121
122
```java { .api }
123
public class HBaseDynamicTableSink implements DynamicTableSink {
124
public HBaseDynamicTableSink(
125
String tableName,
126
HBaseTableSchema hbaseTableSchema,
127
Configuration hbaseConf,
128
HBaseWriteOptions writeOptions,
129
String nullStringLiteral
130
);
131
132
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
133
public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
134
public DynamicTableSink copy();
135
}
136
```
137
138
[Sink Operations](./sink-operations.md)
139
140
### Write Options and Performance Tuning
141
142
Configuration options for optimizing write performance through buffering, batching, and parallelism control.
143
144
```java { .api }
145
public class HBaseWriteOptions implements Serializable {
146
public static Builder builder();
147
148
public long getBufferFlushMaxSizeInBytes();
149
public long getBufferFlushMaxRows();
150
public long getBufferFlushIntervalMillis();
151
public Integer getParallelism();
152
}
153
154
public static class Builder {
155
public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes);
156
public Builder setBufferFlushMaxRows(long bufferFlushMaxRows);
157
public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis);
158
public Builder setParallelism(Integer parallelism);
159
public HBaseWriteOptions build();
160
}
161
```
162
163
[Write Options and Performance](./write-options.md)
164
165
### Lookup Options and Caching
166
167
Configuration for lookup join operations with caching, retry mechanisms, and async processing options.
168
169
```java { .api }
170
public class HBaseLookupOptions implements Serializable {
171
public static Builder builder();
172
173
public long getCacheMaxSize();
174
public long getCacheExpireMs();
175
public int getMaxRetryTimes();
176
public boolean getLookupAsync();
177
}
178
179
public static class Builder {
180
public Builder setCacheMaxSize(long cacheMaxSize);
181
public Builder setCacheExpireMs(long cacheExpireMs);
182
public Builder setMaxRetryTimes(int maxRetryTimes);
183
public Builder setLookupAsync(boolean lookupAsync);
184
public HBaseLookupOptions build();
185
}
186
```
187
188
[Lookup Options and Caching](./lookup-options.md)
189
190
## Types
191
192
```java { .api }
193
// Core configuration options
194
public class HBaseConnectorOptions {
195
public static final ConfigOption<String> TABLE_NAME;
196
public static final ConfigOption<String> ZOOKEEPER_QUORUM;
197
public static final ConfigOption<String> ZOOKEEPER_ZNODE_PARENT;
198
public static final ConfigOption<String> NULL_STRING_LITERAL;
199
public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_SIZE;
200
public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS;
201
public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL;
202
public static final ConfigOption<Boolean> LOOKUP_ASYNC;
203
public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS;
204
public static final ConfigOption<Duration> LOOKUP_CACHE_TTL;
205
public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES;
206
public static final ConfigOption<Integer> SINK_PARALLELISM;
207
}
208
209
// Input format for reading HBase data
210
public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> {
211
public HBaseRowDataInputFormat(
212
Configuration conf,
213
String tableName,
214
HBaseTableSchema schema,
215
String nullStringLiteral
216
);
217
}
218
219
// Abstract base class for HBase input formats
220
public abstract class AbstractTableInputFormat<T>
221
extends RichInputFormat<T, TableInputSplit> {
222
223
protected abstract void initTable() throws IOException;
224
protected abstract Scan getScanner();
225
protected abstract String getTableName();
226
protected abstract T mapResultToOutType(Result r);
227
}
228
```