0
# HBase Connector
1
2
Apache HBase database connectivity for Flink batch processing, providing region-aware table access with distributed processing capabilities.
3
4
## Capabilities
5
6
### TableInputFormat
7
8
Abstract base class for reading from HBase tables with region-aware splitting for optimal distributed processing.
9
10
```java { .api }
11
/**
12
* Abstract base class for reading from HBase tables in Flink
13
* @param <T> Tuple type representing the HBase table row structure
14
*/
15
public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> {
16
17
/**
18
* Default constructor for TableInputFormat
19
*/
20
public TableInputFormat();
21
22
/**
23
* Returns the HBase Scan instance for reading table data
24
* Subclasses must implement this to define what data to read
25
* @return Scan object configured with column families, filters, etc.
26
*/
27
protected abstract Scan getScanner();
28
29
/**
30
* Returns the name of the HBase table to read from
31
* @return HBase table name as a string
32
*/
33
protected abstract String getTableName();
34
35
/**
36
* Maps an HBase Result to a Flink Tuple
37
* This method defines how to convert HBase row data to Flink types
38
* @param r HBase Result containing row data
39
* @return Flink Tuple representing the row
40
*/
41
protected abstract T mapResultToTuple(Result r);
42
43
/**
44
* Determines whether to include a specific HBase region in the input split
45
* Can be overridden to filter regions based on key ranges
46
* @param startKey Start key of the region
47
* @param endKey End key of the region
48
* @return true to include the region, false to skip it
49
*/
50
protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey);
51
}
52
```
53
54
**Usage Example:**
55
56
```java
57
import org.apache.flink.addons.hbase.TableInputFormat;
58
import org.apache.flink.api.java.tuple.Tuple3;
59
import org.apache.hadoop.hbase.client.Scan;
60
import org.apache.hadoop.hbase.client.Result;
61
import org.apache.hadoop.hbase.util.Bytes;
62
63
// Custom implementation for reading user data from HBase
64
public class UserTableInputFormat extends TableInputFormat<Tuple3<String, String, Integer>> {
65
66
@Override
67
protected Scan getScanner() {
68
Scan scan = new Scan();
69
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
70
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("email"));
71
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));
72
return scan;
73
}
74
75
@Override
76
protected String getTableName() {
77
return "users";
78
}
79
80
@Override
81
protected Tuple3<String, String, Integer> mapResultToTuple(Result r) {
82
String name = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
83
String email = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("email")));
84
Integer age = Bytes.toInt(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));
85
return new Tuple3<>(name, email, age);
86
}
87
88
@Override
89
protected boolean includeRegionInSplit(byte[] startKey, byte[] endKey) {
90
// Include all regions by default
91
return true;
92
}
93
}
94
95
// Use in Flink program
96
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
97
DataSet<Tuple3<String, String, Integer>> users = env.createInput(new UserTableInputFormat());
98
users.print();
99
```
100
101
### TableInputSplit
102
103
Input split representing HBase table region ranges for distributed processing.
104
105
```java { .api }
106
/**
107
* Input split representing HBase table region ranges
108
*/
109
public class TableInputSplit extends LocatableInputSplit {
110
111
/**
112
* Creates a new TableInputSplit for an HBase region
113
* @param splitNumber Unique identifier for this split
114
* @param hostnames Array of hostnames where this region is located
115
* @param tableName Name of the HBase table (as byte array)
116
* @param startRow Start row key for this region (as byte array)
117
* @param endRow End row key for this region (as byte array)
118
*/
119
TableInputSplit(final int splitNumber, final String[] hostnames,
120
final byte[] tableName, final byte[] startRow, final byte[] endRow);
121
122
/**
123
* Returns the HBase table name for this split
124
* @return Table name as byte array
125
*/
126
public byte[] getTableName();
127
128
/**
129
* Returns the start row key for this region split
130
* @return Start row key as byte array
131
*/
132
public byte[] getStartRow();
133
134
/**
135
* Returns the end row key for this region split
136
* @return End row key as byte array
137
*/
138
public byte[] getEndRow();
139
}
140
```
141
142
## Advanced Usage Patterns
143
144
### Custom Row Key Filtering
145
146
```java
147
public class FilteredUserTableInputFormat extends TableInputFormat<Tuple2<String, String>> {
148
149
@Override
150
protected Scan getScanner() {
151
Scan scan = new Scan();
152
// Add row key prefix filter
153
scan.setRowPrefixFilter(Bytes.toBytes("user_"));
154
155
// Add column filter
156
scan.addColumn(Bytes.toBytes("profile"), Bytes.toBytes("name"));
157
scan.addColumn(Bytes.toBytes("profile"), Bytes.toBytes("status"));
158
159
// Set time range for recent data only
160
try {
161
long oneWeekAgo = System.currentTimeMillis() - (7 * 24 * 60 * 60 * 1000);
162
scan.setTimeRange(oneWeekAgo, System.currentTimeMillis());
163
} catch (IOException e) {
164
throw new RuntimeException("Failed to set time range", e);
165
}
166
167
return scan;
168
}
169
170
@Override
171
protected String getTableName() {
172
return "user_profiles";
173
}
174
175
@Override
176
protected Tuple2<String, String> mapResultToTuple(Result r) {
177
String name = Bytes.toString(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("name")));
178
String status = Bytes.toString(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("status")));
179
return new Tuple2<>(name, status);
180
}
181
182
@Override
183
protected boolean includeRegionInSplit(byte[] startKey, byte[] endKey) {
184
// Only include regions that contain user data
185
if (startKey.length == 0) return true; // First region
186
if (endKey.length == 0) return true; // Last region
187
188
String startKeyStr = Bytes.toString(startKey);
189
return startKeyStr.startsWith("user_");
190
}
191
}
192
```
193
194
### Multi-Column Family Access
195
196
```java
197
public class CompleteUserTableInputFormat extends TableInputFormat<Tuple5<String, String, Integer, String, Long>> {
198
199
@Override
200
protected Scan getScanner() {
201
Scan scan = new Scan();
202
// Add multiple column families
203
scan.addFamily(Bytes.toBytes("basic")); // Basic info
204
scan.addFamily(Bytes.toBytes("contact")); // Contact info
205
scan.addFamily(Bytes.toBytes("activity")); // Activity data
206
207
// Configure caching for better performance
208
scan.setCaching(1000);
209
scan.setBatch(100);
210
211
return scan;
212
}
213
214
@Override
215
protected String getTableName() {
216
return "complete_users";
217
}
218
219
@Override
220
protected Tuple5<String, String, Integer, String, Long> mapResultToTuple(Result r) {
221
// Extract from basic column family
222
String name = Bytes.toString(r.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name")));
223
Integer age = Bytes.toInt(r.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age")));
224
225
// Extract from contact column family
226
String email = Bytes.toString(r.getValue(Bytes.toBytes("contact"), Bytes.toBytes("email")));
227
228
// Extract from activity column family
229
Long lastLogin = Bytes.toLong(r.getValue(Bytes.toBytes("activity"), Bytes.toBytes("last_login")));
230
231
// Get row key
232
String rowKey = Bytes.toString(r.getRow());
233
234
return new Tuple5<>(rowKey, name, age, email, lastLogin);
235
}
236
}
237
```
238
239
### Integration with Flink's Type System
240
241
```java
242
import org.apache.flink.api.common.typeinfo.TypeInformation;
243
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
244
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
245
246
public class TypedUserTableInputFormat extends TableInputFormat<Tuple3<String, String, Integer>> {
247
248
// ... implement abstract methods as shown above ...
249
250
// Optional: Override to provide explicit type information
251
@Override
252
public TypeInformation<Tuple3<String, String, Integer>> getProducedType() {
253
return new TupleTypeInfo<>(
254
BasicTypeInfo.STRING_TYPE_INFO, // name
255
BasicTypeInfo.STRING_TYPE_INFO, // email
256
BasicTypeInfo.INT_TYPE_INFO // age
257
);
258
}
259
}
260
```
261
262
## Performance Considerations
263
264
### Region-Aware Processing
265
266
HBase regions are automatically mapped to Flink parallel tasks for optimal data locality:
267
268
```java
269
// Configure HBase client for better performance
270
Configuration hbaseConfig = HBaseConfiguration.create();
271
hbaseConfig.set("hbase.client.scanner.caching", "1000");
272
hbaseConfig.set("hbase.client.scanner.timeout.period", "600000");
273
274
// These settings are automatically picked up by TableInputFormat
275
```
276
277
### Memory Management
278
279
```java
280
@Override
281
protected Scan getScanner() {
282
Scan scan = new Scan();
283
284
// Configure batch size to control memory usage
285
scan.setBatch(100); // Process 100 columns per RPC
286
287
// Configure caching for network efficiency
288
scan.setCaching(1000); // Cache 1000 rows per RPC
289
290
// Limit columns to reduce network traffic
291
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col1"));
292
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col2"));
293
294
return scan;
295
}
296
```
297
298
## Common Types
299
300
```java { .api }
301
import org.apache.flink.api.common.io.RichInputFormat;
302
import org.apache.flink.api.common.io.LocatableInputSplit;
303
import org.apache.flink.api.java.tuple.Tuple;
304
import org.apache.hadoop.hbase.client.Scan;
305
import org.apache.hadoop.hbase.client.Result;
306
import org.apache.hadoop.hbase.util.Bytes;
307
import org.apache.hadoop.hbase.HBaseConfiguration;
308
import org.apache.hadoop.conf.Configuration;
309
import java.io.IOException;
310
```