The HBase connector provides input formats for reading data from HBase tables in both batch and streaming Flink applications. These formats handle table scanning, row key ranges, and automatic result mapping to Flink data types.
The primary input format for reading HBase tables and converting results to Flink Row objects.
class HBaseRowInputFormat extends AbstractTableInputFormat<Row> {
public HBaseRowInputFormat(Configuration conf, String tableName, HBaseTableSchema schema);
public void configure(Configuration parameters);
public String getTableName();
public TypeInformation<Row> getProducedType();
}import org.apache.flink.addons.hbase.HBaseRowInputFormat;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
// Configure HBase connection
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
// Define table schema
HBaseTableSchema schema = new HBaseTableSchema();
schema.setRowKey("user_id", String.class);
schema.addColumn("profile", "name", String.class);
schema.addColumn("profile", "age", Integer.class);
schema.addColumn("activity", "last_login", java.sql.Timestamp.class);
// Create input format
HBaseRowInputFormat inputFormat = new HBaseRowInputFormat(conf, "user_table", schema);
// Use in streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Row> userStream = env.createInput(inputFormat);
// Process the data
userStream.filter(row -> (Integer) row.getField(2) > 18)
.print();
env.execute("HBase Read Job");Base class for creating custom HBase input formats with different output types.
abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit> {
// Abstract methods to implement
public abstract Scan getScanner();
public abstract String getTableName();
public abstract T mapResultToOutType(Result r);
public abstract void configure(Configuration parameters);
// Implemented methods
public void open(TableInputSplit split) throws IOException;
public T nextRecord(T reuse) throws IOException;
public boolean reachedEnd() throws IOException;
public void close() throws IOException;
public void closeInputFormat() throws IOException;
public TableInputSplit[] createInputSplits(int minNumSplits) throws IOException;
public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits);
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;
}import org.apache.flink.addons.hbase.AbstractTableInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
public class CustomHBaseInputFormat extends AbstractTableInputFormat<Tuple3<String, String, Integer>> {
private String tableName;
private Configuration conf;
public CustomHBaseInputFormat(Configuration conf, String tableName) {
this.conf = conf;
this.tableName = tableName;
}
@Override
public Scan getScanner() {
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("profile"));
scan.addFamily(Bytes.toBytes("activity"));
return scan;
}
@Override
public String getTableName() {
return tableName;
}
@Override
public Tuple3<String, String, Integer> mapResultToOutType(Result r) {
String rowKey = Bytes.toString(r.getRow());
String name = Bytes.toString(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("name")));
Integer age = Bytes.toInt(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("age")));
return new Tuple3<>(rowKey, name, age);
}
@Override
public void configure(Configuration parameters) {
// Configuration logic
}
}Abstract input format specialized for Tuple output types.
abstract class TableInputFormat<T> extends AbstractTableInputFormat<T> {
// Abstract methods specific to Tuple mapping
public abstract Scan getScanner();
public abstract String getTableName();
public abstract T mapResultToTuple(Result r);
// Implementation of configure method
public void configure(Configuration parameters);
}Represents a split of an HBase table for parallel processing.
class TableInputSplit implements InputSplit {
public byte[] getTableName();
public byte[] getStartRow();
public byte[] getEndRow();
public int getSplitNumber();
public String[] getLocations() throws IOException;
}The input formats use HBase Scan objects to define which data to read:
// Scan all rows and columns
Scan scan = new Scan();
// Scan specific column families
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("cf1"));
scan.addFamily(Bytes.toBytes("cf2"));
// Scan specific columns
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col1"));
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col2"));// Scan row key range
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes("user_00001"));
scan.setStopRow(Bytes.toBytes("user_99999"));
// Scan with row key prefix
Scan scan = new Scan();
scan.setRowPrefixFilter(Bytes.toBytes("user_2023"));import org.apache.hadoop.hbase.filter.*;
// Single column value filter
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("profile"),
Bytes.toBytes("age"),
CompareFilter.CompareOp.GREATER,
Bytes.toBytes(18)
);
Scan scan = new Scan();
scan.setFilter(filter);
// Multiple filters
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(new SingleColumnValueFilter(
Bytes.toBytes("profile"), Bytes.toBytes("active"),
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(true)));
filterList.addFilter(new SingleColumnValueFilter(
Bytes.toBytes("profile"), Bytes.toBytes("age"),
CompareFilter.CompareOp.GREATER, Bytes.toBytes(21)));
Scan scan = new Scan();
scan.setFilter(filterList);The input formats automatically create splits based on HBase region boundaries for optimal parallel processing:
// Control the minimum number of splits
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8); // This will influence split creation
DataSet<Row> data = env.createInput(inputFormat);Configure HBase scan caching for better performance:
Scan scan = new Scan();
scan.setCaching(1000); // Number of rows to cache per RPC
scan.setBatch(10); // Number of columns to retrieve per RPCFor large scans, consider memory usage:
// Limit scan to specific time range
Scan scan = new Scan();
scan.setTimeRange(startTime, endTime);
// Use filters to reduce data transfer
scan.setFilter(new PageFilter(10000)); // Limit results per regionCommon exceptions when using input formats:
try {
DataStream<Row> stream = env.createInput(inputFormat);
// Process stream
} catch (IOException e) {
// HBase connection or read errors
log.error("HBase read failed", e);
} catch (IllegalArgumentException e) {
// Invalid configuration or schema
log.error("Configuration error", e);
}The input formats handle automatic type conversion from HBase byte arrays:
| Java Type | HBase Storage | Notes |
|---|---|---|
String | byte[] | UTF-8 encoding |
Integer | byte[] | 4-byte big-endian |
Long | byte[] | 8-byte big-endian |
Double | byte[] | IEEE 754 format |
Boolean | byte[] | Single byte (0 or 1) |
java.sql.Timestamp | byte[] | Long timestamp |
byte[] | byte[] | Direct storage |