0
# DataStream Input Formats
1
2
The HBase connector provides input formats for reading data from HBase tables in both batch and streaming Flink applications. These formats handle table scanning, row key ranges, and automatic result mapping to Flink data types.
3
4
## HBaseRowInputFormat
5
6
The primary input format for reading HBase tables and converting results to Flink Row objects.
7
8
```java { .api }
9
class HBaseRowInputFormat extends AbstractTableInputFormat<Row> {
10
public HBaseRowInputFormat(Configuration conf, String tableName, HBaseTableSchema schema);
11
public void configure(Configuration parameters);
12
public String getTableName();
13
public TypeInformation<Row> getProducedType();
14
}
15
```
16
17
### Usage Example
18
19
```java
20
import org.apache.flink.addons.hbase.HBaseRowInputFormat;
21
import org.apache.flink.addons.hbase.HBaseTableSchema;
22
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
23
import org.apache.hadoop.conf.Configuration;
24
25
// Configure HBase connection
26
Configuration conf = new Configuration();
27
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
28
conf.set("hbase.zookeeper.property.clientPort", "2181");
29
conf.set("zookeeper.znode.parent", "/hbase");
30
31
// Define table schema
32
HBaseTableSchema schema = new HBaseTableSchema();
33
schema.setRowKey("user_id", String.class);
34
schema.addColumn("profile", "name", String.class);
35
schema.addColumn("profile", "age", Integer.class);
36
schema.addColumn("activity", "last_login", java.sql.Timestamp.class);
37
38
// Create input format
39
HBaseRowInputFormat inputFormat = new HBaseRowInputFormat(conf, "user_table", schema);
40
41
// Use in streaming environment
42
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
43
DataStream<Row> userStream = env.createInput(inputFormat);
44
45
// Process the data
46
userStream.filter(row -> (Integer) row.getField(2) > 18)
47
.print();
48
49
env.execute("HBase Read Job");
50
```
51
52
## AbstractTableInputFormat<T>
53
54
Base class for creating custom HBase input formats with different output types.
55
56
```java { .api }
57
abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit> {
58
// Abstract methods to implement
59
public abstract Scan getScanner();
60
public abstract String getTableName();
61
public abstract T mapResultToOutType(Result r);
62
public abstract void configure(Configuration parameters);
63
64
// Implemented methods
65
public void open(TableInputSplit split) throws IOException;
66
public T nextRecord(T reuse) throws IOException;
67
public boolean reachedEnd() throws IOException;
68
public void close() throws IOException;
69
public void closeInputFormat() throws IOException;
70
public TableInputSplit[] createInputSplits(int minNumSplits) throws IOException;
71
public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits);
72
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;
73
}
74
```
75
76
### Custom Input Format Example
77
78
```java
79
import org.apache.flink.addons.hbase.AbstractTableInputFormat;
80
import org.apache.flink.api.java.tuple.Tuple3;
81
import org.apache.hadoop.hbase.client.Result;
82
import org.apache.hadoop.hbase.client.Scan;
83
import org.apache.hadoop.hbase.util.Bytes;
84
85
public class CustomHBaseInputFormat extends AbstractTableInputFormat<Tuple3<String, String, Integer>> {
86
private String tableName;
87
private Configuration conf;
88
89
public CustomHBaseInputFormat(Configuration conf, String tableName) {
90
this.conf = conf;
91
this.tableName = tableName;
92
}
93
94
@Override
95
public Scan getScanner() {
96
Scan scan = new Scan();
97
scan.addFamily(Bytes.toBytes("profile"));
98
scan.addFamily(Bytes.toBytes("activity"));
99
return scan;
100
}
101
102
@Override
103
public String getTableName() {
104
return tableName;
105
}
106
107
@Override
108
public Tuple3<String, String, Integer> mapResultToOutType(Result r) {
109
String rowKey = Bytes.toString(r.getRow());
110
String name = Bytes.toString(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("name")));
111
Integer age = Bytes.toInt(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("age")));
112
return new Tuple3<>(rowKey, name, age);
113
}
114
115
@Override
116
public void configure(Configuration parameters) {
117
// Configuration logic
118
}
119
}
120
```
121
122
## TableInputFormat<T>
123
124
Abstract input format specialized for Tuple output types.
125
126
```java { .api }
127
abstract class TableInputFormat<T> extends AbstractTableInputFormat<T> {
128
// Abstract methods specific to Tuple mapping
129
public abstract Scan getScanner();
130
public abstract String getTableName();
131
public abstract T mapResultToTuple(Result r);
132
133
// Implementation of configure method
134
public void configure(Configuration parameters);
135
}
136
```
137
138
## TableInputSplit
139
140
Represents a split of an HBase table for parallel processing.
141
142
```java { .api }
143
class TableInputSplit implements InputSplit {
144
public byte[] getTableName();
145
public byte[] getStartRow();
146
public byte[] getEndRow();
147
public int getSplitNumber();
148
public String[] getLocations() throws IOException;
149
}
150
```
151
152
## Scan Configuration
153
154
The input formats use HBase Scan objects to define which data to read:
155
156
### Basic Scanning
157
158
```java
159
// Scan all rows and columns
160
Scan scan = new Scan();
161
162
// Scan specific column families
163
Scan scan = new Scan();
164
scan.addFamily(Bytes.toBytes("cf1"));
165
scan.addFamily(Bytes.toBytes("cf2"));
166
167
// Scan specific columns
168
Scan scan = new Scan();
169
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col1"));
170
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col2"));
171
```
172
173
### Row Key Range Scanning
174
175
```java
176
// Scan row key range
177
Scan scan = new Scan();
178
scan.setStartRow(Bytes.toBytes("user_00001"));
179
scan.setStopRow(Bytes.toBytes("user_99999"));
180
181
// Scan with row key prefix
182
Scan scan = new Scan();
183
scan.setRowPrefixFilter(Bytes.toBytes("user_2023"));
184
```
185
186
### Filtering
187
188
```java
189
import org.apache.hadoop.hbase.filter.*;
190
191
// Single column value filter
192
SingleColumnValueFilter filter = new SingleColumnValueFilter(
193
Bytes.toBytes("profile"),
194
Bytes.toBytes("age"),
195
CompareFilter.CompareOp.GREATER,
196
Bytes.toBytes(18)
197
);
198
Scan scan = new Scan();
199
scan.setFilter(filter);
200
201
// Multiple filters
202
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
203
filterList.addFilter(new SingleColumnValueFilter(
204
Bytes.toBytes("profile"), Bytes.toBytes("active"),
205
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(true)));
206
filterList.addFilter(new SingleColumnValueFilter(
207
Bytes.toBytes("profile"), Bytes.toBytes("age"),
208
CompareFilter.CompareOp.GREATER, Bytes.toBytes(21)));
209
210
Scan scan = new Scan();
211
scan.setFilter(filterList);
212
```
213
214
## Performance Considerations
215
216
### Parallelism
217
218
The input formats automatically create splits based on HBase region boundaries for optimal parallel processing:
219
220
```java
221
// Control the minimum number of splits
222
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
223
env.setParallelism(8); // This will influence split creation
224
225
DataSet<Row> data = env.createInput(inputFormat);
226
```
227
228
### Caching and Batching
229
230
Configure HBase scan caching for better performance:
231
232
```java
233
Scan scan = new Scan();
234
scan.setCaching(1000); // Number of rows to cache per RPC
235
scan.setBatch(10); // Number of columns to retrieve per RPC
236
```
237
238
### Memory Management
239
240
For large scans, consider memory usage:
241
242
```java
243
// Limit scan to specific time range
244
Scan scan = new Scan();
245
scan.setTimeRange(startTime, endTime);
246
247
// Use filters to reduce data transfer
248
scan.setFilter(new PageFilter(10000)); // Limit results per region
249
```
250
251
## Error Handling
252
253
Common exceptions when using input formats:
254
255
```java
256
try {
257
DataStream<Row> stream = env.createInput(inputFormat);
258
// Process stream
259
} catch (IOException e) {
260
// HBase connection or read errors
261
log.error("HBase read failed", e);
262
} catch (IllegalArgumentException e) {
263
// Invalid configuration or schema
264
log.error("Configuration error", e);
265
}
266
```
267
268
## Type Mapping
269
270
The input formats handle automatic type conversion from HBase byte arrays:
271
272
| Java Type | HBase Storage | Notes |
273
|-----------|---------------|-------|
274
| `String` | `byte[]` | UTF-8 encoding |
275
| `Integer` | `byte[]` | 4-byte big-endian |
276
| `Long` | `byte[]` | 8-byte big-endian |
277
| `Double` | `byte[]` | IEEE 754 format |
278
| `Boolean` | `byte[]` | Single byte (0 or 1) |
279
| `java.sql.Timestamp` | `byte[]` | Long timestamp |
280
| `byte[]` | `byte[]` | Direct storage |