Apache Flink HBase connector library that enables seamless integration between Flink streaming and batch processing applications with Apache HBase.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-hbase_2-11@1.10.00
# Apache Flink HBase Connector
1
2
The Apache Flink HBase connector provides comprehensive integration between Apache Flink's stream processing capabilities and Apache HBase's NoSQL database. It supports both DataStream API and Table API operations for reading from and writing to HBase tables, with features like lookup joins, buffered writes, and flexible serialization.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink:flink-hbase_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Version**: 1.10.3
10
- **Installation**: Add to your Maven pom.xml:
11
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-hbase_${scala.binary.version}</artifactId>
16
<version>1.10.3</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```java
23
// DataStream API - Input Format
24
import org.apache.flink.addons.hbase.HBaseRowInputFormat;
25
import org.apache.flink.addons.hbase.HBaseTableSchema;
26
27
// DataStream API - Sink Function
28
import org.apache.flink.addons.hbase.HBaseUpsertSinkFunction;
29
30
// Table API - Source and Sink
31
import org.apache.flink.addons.hbase.HBaseTableSource;
32
import org.apache.flink.addons.hbase.HBaseUpsertTableSink;
33
import org.apache.flink.addons.hbase.HBaseTableFactory;
34
35
// Configuration
36
import org.apache.flink.addons.hbase.HBaseOptions;
37
import org.apache.flink.addons.hbase.HBaseWriteOptions;
38
39
// Table API Descriptors
40
import org.apache.flink.table.descriptors.HBase;
41
```
42
43
## Basic Usage
44
45
### Reading from HBase (DataStream API)
46
47
```java
48
import org.apache.flink.addons.hbase.HBaseRowInputFormat;
49
import org.apache.flink.addons.hbase.HBaseTableSchema;
50
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
51
import org.apache.hadoop.conf.Configuration;
52
53
// Configure HBase connection
54
Configuration conf = new Configuration();
55
conf.set("hbase.zookeeper.quorum", "localhost:2181");
56
57
// Define table schema
58
HBaseTableSchema schema = new HBaseTableSchema();
59
schema.setRowKey("rowkey", String.class);
60
schema.addColumn("cf1", "col1", String.class);
61
schema.addColumn("cf1", "col2", Integer.class);
62
63
// Create input format
64
HBaseRowInputFormat inputFormat = new HBaseRowInputFormat(conf, "my_table", schema);
65
66
// Read as DataStream
67
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
68
DataStream<Row> hbaseData = env.createInput(inputFormat);
69
```
70
71
### Writing to HBase (DataStream API)
72
73
```java
74
import org.apache.flink.addons.hbase.HBaseUpsertSinkFunction;
75
import org.apache.flink.types.Row;
76
77
// Create sink function with buffering
78
HBaseUpsertSinkFunction sinkFunction = new HBaseUpsertSinkFunction(
79
"my_table", // table name
80
schema, // table schema
81
conf, // HBase configuration
82
2 * 1024 * 1024, // buffer flush max size (2MB)
83
1000, // buffer flush max mutations
84
5000 // buffer flush interval (5 seconds)
85
);
86
87
// Apply to DataStream
88
DataStream<Tuple2<Boolean, Row>> upsertStream = // your stream of upserts
89
upsertStream.addSink(sinkFunction);
90
```
91
92
### Table API Usage
93
94
```java
95
import org.apache.flink.table.api.EnvironmentSettings;
96
import org.apache.flink.table.api.TableEnvironment;
97
import org.apache.flink.table.descriptors.HBase;
98
import org.apache.flink.table.descriptors.Schema;
99
100
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
101
102
// Register HBase table
103
tableEnv.connect(
104
new HBase()
105
.version("1.4.3")
106
.tableName("my_table")
107
.zookeeperQuorum("localhost:2181")
108
)
109
.withSchema(
110
new Schema()
111
.field("rowkey", DataTypes.STRING())
112
.field("cf1_col1", DataTypes.STRING())
113
.field("cf1_col2", DataTypes.INT())
114
)
115
.createTemporaryTable("hbase_table");
116
117
// Query the table
118
Table result = tableEnv.sqlQuery("SELECT * FROM hbase_table WHERE cf1_col2 > 100");
119
```
120
121
## Architecture
122
123
The HBase connector is organized into several key components:
124
125
- **Input Formats**: For batch and streaming reads (`HBaseRowInputFormat`, `AbstractTableInputFormat`)
126
- **Sink Functions**: For streaming writes with buffering (`HBaseUpsertSinkFunction`)
127
- **Table API Integration**: Source/sink factories and descriptors (`HBaseTableFactory`, `HBaseTableSource`)
128
- **Schema Definition**: Type-safe column family and qualifier mapping (`HBaseTableSchema`)
129
- **Configuration**: Connection and write performance options (`HBaseOptions`, `HBaseWriteOptions`)
130
- **Utilities**: Type conversion and HBase operation helpers
131
132
## Capabilities
133
134
### DataStream API Input Formats
135
136
Read data from HBase tables using InputFormat classes with full control over scanning and result mapping.
137
138
```java { .api }
139
class HBaseRowInputFormat extends AbstractTableInputFormat<Row> {
140
public HBaseRowInputFormat(Configuration conf, String tableName, HBaseTableSchema schema);
141
public void configure(Configuration parameters);
142
public String getTableName();
143
public TypeInformation<Row> getProducedType();
144
}
145
```
146
147
[DataStream Input Formats](./input-formats.md)
148
149
### DataStream API Sink Functions
150
151
Write data to HBase tables with configurable buffering and automatic batching for optimal performance.
152
153
```java { .api }
154
class HBaseUpsertSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>>
155
implements CheckpointedFunction, BufferedMutator.ExceptionListener {
156
public HBaseUpsertSinkFunction(String hTableName, HBaseTableSchema schema,
157
Configuration conf, long bufferFlushMaxSizeInBytes,
158
long bufferFlushMaxMutations, long bufferFlushIntervalMillis);
159
public void open(Configuration parameters);
160
public void invoke(Tuple2<Boolean, Row> value, Context context);
161
public void close();
162
}
163
```
164
165
[DataStream Sink Functions](./sink-functions.md)
166
167
### Table API Sources and Sinks
168
169
Integrate HBase with Flink's Table API for SQL-based data processing and lookup joins.
170
171
```java { .api }
172
class HBaseTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> {
173
public HBaseTableSource(Configuration conf, String tableName);
174
public void addColumn(String family, String qualifier, Class<?> clazz);
175
public void setRowKey(String rowKeyName, Class<?> clazz);
176
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv);
177
public TableFunction<Row> getLookupFunction(String[] lookupKeys);
178
}
179
```
180
181
```java { .api }
182
class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {
183
public HBaseUpsertTableSink(HBaseTableSchema hbaseTableSchema,
184
HBaseOptions hbaseOptions, HBaseWriteOptions writeOptions);
185
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);
186
}
187
```
188
189
[Table API Integration](./table-api.md)
190
191
### Schema Configuration
192
193
Define type-safe mappings between Flink data types and HBase column families and qualifiers.
194
195
```java { .api }
196
class HBaseTableSchema {
197
public void addColumn(String family, String qualifier, Class<?> clazz);
198
public void setRowKey(String rowKeyName, Class<?> clazz);
199
public void setCharset(String charset);
200
public String[] getFamilyNames();
201
public TypeInformation<?>[] getQualifierTypes(String family);
202
}
203
```
204
205
[Schema and Configuration](./schema-config.md)
206
207
### Lookup Functions
208
209
Enable temporal table joins by looking up dimension data from HBase in real-time.
210
211
```java { .api }
212
class HBaseLookupFunction extends TableFunction<Row> {
213
public HBaseLookupFunction(Configuration configuration, String hTableName,
214
HBaseTableSchema hbaseTableSchema);
215
public void eval(Object rowKey);
216
public TypeInformation<Row> getResultType();
217
}
218
```
219
220
[Lookup Functions](./lookup-functions.md)
221
222
### Utility Classes
223
224
Helper classes for type conversion, configuration serialization, and HBase operation creation.
225
226
```java { .api }
227
class HBaseTypeUtils {
228
public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset);
229
public static byte[] serializeFromObject(Object value, int typeIdx, Charset stringCharset);
230
public static int getTypeIndex(TypeInformation typeInfo);
231
public static boolean isSupportedType(Class<?> clazz);
232
}
233
```
234
235
[Utilities](./utilities.md)
236
237
## Configuration
238
239
### Connection Configuration
240
241
```java { .api }
242
class HBaseOptions {
243
public static Builder builder();
244
245
static class Builder {
246
public Builder setTableName(String tableName); // Required
247
public Builder setZkQuorum(String zkQuorum); // Required
248
public Builder setZkNodeParent(String zkNodeParent); // Optional
249
public HBaseOptions build();
250
}
251
}
252
```
253
254
### Write Performance Configuration
255
256
```java { .api }
257
class HBaseWriteOptions {
258
public static Builder builder();
259
260
static class Builder {
261
public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes);
262
public Builder setBufferFlushMaxRows(long bufferFlushMaxRows);
263
public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis);
264
public HBaseWriteOptions build();
265
}
266
}
267
```
268
269
## Supported Data Types
270
271
The connector supports automatic serialization/deserialization for:
272
273
- **Primitive types**: `byte[]`, `String`, `Byte`, `Short`, `Integer`, `Long`, `Float`, `Double`, `Boolean`
274
- **Temporal types**: `java.sql.Timestamp`, `java.sql.Date`, `java.sql.Time`
275
- **Numeric types**: `java.math.BigDecimal`, `java.math.BigInteger`
276
277
## Error Handling
278
279
Common exceptions and their meanings:
280
281
- `RuntimeException`: HBase connection or configuration errors
282
- `IllegalArgumentException`: Invalid parameters or unsupported data types
283
- `IOException`: HBase I/O operation failures
284
- `ValidationException`: Table configuration validation errors
285
- `TableNotFoundException`: Specified HBase table doesn't exist
286
- `RetriesExhaustedWithDetailsException`: BufferedMutator operation failures
287
288
## Requirements
289
290
- **HBase Version**: 1.4.3
291
- **Flink Version**: 1.10.3
292
- **Java Version**: 8+
293
- **Hadoop Configuration**: Properly configured `hbase-site.xml` or programmatic configuration