Apache Flink HBase connector library that enables seamless integration between Flink streaming and batch processing applications with Apache HBase.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-hbase_2-11@1.10.0The Apache Flink HBase connector provides comprehensive integration between Apache Flink's stream processing capabilities and Apache HBase's NoSQL database. It supports both DataStream API and Table API operations for reading from and writing to HBase tables, with features like lookup joins, buffered writes, and flexible serialization.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_${scala.binary.version}</artifactId>
<version>1.10.3</version>
</dependency>// DataStream API - Input Format
import org.apache.flink.addons.hbase.HBaseRowInputFormat;
import org.apache.flink.addons.hbase.HBaseTableSchema;
// DataStream API - Sink Function
import org.apache.flink.addons.hbase.HBaseUpsertSinkFunction;
// Table API - Source and Sink
import org.apache.flink.addons.hbase.HBaseTableSource;
import org.apache.flink.addons.hbase.HBaseUpsertTableSink;
import org.apache.flink.addons.hbase.HBaseTableFactory;
// Configuration
import org.apache.flink.addons.hbase.HBaseOptions;
import org.apache.flink.addons.hbase.HBaseWriteOptions;
// Table API Descriptors
import org.apache.flink.table.descriptors.HBase;import org.apache.flink.addons.hbase.HBaseRowInputFormat;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
// Configure HBase connection
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "localhost:2181");
// Define table schema
HBaseTableSchema schema = new HBaseTableSchema();
schema.setRowKey("rowkey", String.class);
schema.addColumn("cf1", "col1", String.class);
schema.addColumn("cf1", "col2", Integer.class);
// Create input format
HBaseRowInputFormat inputFormat = new HBaseRowInputFormat(conf, "my_table", schema);
// Read as DataStream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Row> hbaseData = env.createInput(inputFormat);import org.apache.flink.addons.hbase.HBaseUpsertSinkFunction;
import org.apache.flink.types.Row;
// Create sink function with buffering
HBaseUpsertSinkFunction sinkFunction = new HBaseUpsertSinkFunction(
"my_table", // table name
schema, // table schema
conf, // HBase configuration
2 * 1024 * 1024, // buffer flush max size (2MB)
1000, // buffer flush max mutations
5000 // buffer flush interval (5 seconds)
);
// Apply to DataStream
DataStream<Tuple2<Boolean, Row>> upsertStream = // your stream of upserts
upsertStream.addSink(sinkFunction);import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.descriptors.HBase;
import org.apache.flink.table.descriptors.Schema;
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
// Register HBase table
tableEnv.connect(
new HBase()
.version("1.4.3")
.tableName("my_table")
.zookeeperQuorum("localhost:2181")
)
.withSchema(
new Schema()
.field("rowkey", DataTypes.STRING())
.field("cf1_col1", DataTypes.STRING())
.field("cf1_col2", DataTypes.INT())
)
.createTemporaryTable("hbase_table");
// Query the table
Table result = tableEnv.sqlQuery("SELECT * FROM hbase_table WHERE cf1_col2 > 100");The HBase connector is organized into several key components:
HBaseRowInputFormat, AbstractTableInputFormat)HBaseUpsertSinkFunction)HBaseTableFactory, HBaseTableSource)HBaseTableSchema)HBaseOptions, HBaseWriteOptions)Read data from HBase tables using InputFormat classes with full control over scanning and result mapping.
class HBaseRowInputFormat extends AbstractTableInputFormat<Row> {
public HBaseRowInputFormat(Configuration conf, String tableName, HBaseTableSchema schema);
public void configure(Configuration parameters);
public String getTableName();
public TypeInformation<Row> getProducedType();
}Write data to HBase tables with configurable buffering and automatic batching for optimal performance.
class HBaseUpsertSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>>
implements CheckpointedFunction, BufferedMutator.ExceptionListener {
public HBaseUpsertSinkFunction(String hTableName, HBaseTableSchema schema,
Configuration conf, long bufferFlushMaxSizeInBytes,
long bufferFlushMaxMutations, long bufferFlushIntervalMillis);
public void open(Configuration parameters);
public void invoke(Tuple2<Boolean, Row> value, Context context);
public void close();
}Integrate HBase with Flink's Table API for SQL-based data processing and lookup joins.
class HBaseTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> {
public HBaseTableSource(Configuration conf, String tableName);
public void addColumn(String family, String qualifier, Class<?> clazz);
public void setRowKey(String rowKeyName, Class<?> clazz);
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv);
public TableFunction<Row> getLookupFunction(String[] lookupKeys);
}class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {
public HBaseUpsertTableSink(HBaseTableSchema hbaseTableSchema,
HBaseOptions hbaseOptions, HBaseWriteOptions writeOptions);
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);
}Define type-safe mappings between Flink data types and HBase column families and qualifiers.
class HBaseTableSchema {
public void addColumn(String family, String qualifier, Class<?> clazz);
public void setRowKey(String rowKeyName, Class<?> clazz);
public void setCharset(String charset);
public String[] getFamilyNames();
public TypeInformation<?>[] getQualifierTypes(String family);
}Enable temporal table joins by looking up dimension data from HBase in real-time.
class HBaseLookupFunction extends TableFunction<Row> {
public HBaseLookupFunction(Configuration configuration, String hTableName,
HBaseTableSchema hbaseTableSchema);
public void eval(Object rowKey);
public TypeInformation<Row> getResultType();
}Helper classes for type conversion, configuration serialization, and HBase operation creation.
class HBaseTypeUtils {
public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset);
public static byte[] serializeFromObject(Object value, int typeIdx, Charset stringCharset);
public static int getTypeIndex(TypeInformation typeInfo);
public static boolean isSupportedType(Class<?> clazz);
}class HBaseOptions {
public static Builder builder();
static class Builder {
public Builder setTableName(String tableName); // Required
public Builder setZkQuorum(String zkQuorum); // Required
public Builder setZkNodeParent(String zkNodeParent); // Optional
public HBaseOptions build();
}
}class HBaseWriteOptions {
public static Builder builder();
static class Builder {
public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes);
public Builder setBufferFlushMaxRows(long bufferFlushMaxRows);
public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis);
public HBaseWriteOptions build();
}
}The connector supports automatic serialization/deserialization for:
byte[], String, Byte, Short, Integer, Long, Float, Double, Booleanjava.sql.Timestamp, java.sql.Date, java.sql.Timejava.math.BigDecimal, java.math.BigIntegerCommon exceptions and their meanings:
RuntimeException: HBase connection or configuration errorsIllegalArgumentException: Invalid parameters or unsupported data typesIOException: HBase I/O operation failuresValidationException: Table configuration validation errorsTableNotFoundException: Specified HBase table doesn't existRetriesExhaustedWithDetailsException: BufferedMutator operation failureshbase-site.xml or programmatic configuration