Apache Flink HBase connector library that enables seamless integration between Flink streaming and batch processing applications with Apache HBase.
The HBase connector provides comprehensive integration with Flink's Table API, enabling SQL-based data processing, declarative table definitions, and lookup joins. This includes table sources, sinks, factory classes, and descriptor-based configuration.
A table source that provides HBase table data to the Table API with support for both batch and streaming queries, plus lookup functionality for temporal joins.
class HBaseTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> {
public HBaseTableSource(Configuration conf, String tableName);
// Schema configuration
public void addColumn(String family, String qualifier, Class<?> clazz);
public void setRowKey(String rowKeyName, Class<?> clazz);
public void setCharset(String charset);
// Table source methods
public TypeInformation<Row> getReturnType();
public TableSchema getTableSchema();
public DataSet<Row> getDataSet(ExecutionEnvironment execEnv);
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv);
public HBaseTableSource projectFields(int[] fields);
public String explainSource();
// Lookup capabilities
public TableFunction<Row> getLookupFunction(String[] lookupKeys);
public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys); // Throws UnsupportedOperationException
public boolean isAsyncEnabled(); // Returns false
public boolean isBounded(); // Returns true
}import org.apache.flink.addons.hbase.HBaseTableSource;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.hadoop.conf.Configuration;
// Configure HBase connection
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "localhost:2181");
// Create table source
HBaseTableSource tableSource = new HBaseTableSource(conf, "user_profiles");
// Define schema
tableSource.setRowKey("user_id", String.class);
tableSource.addColumn("profile", "name", String.class);
tableSource.addColumn("profile", "age", Integer.class);
tableSource.addColumn("profile", "email", String.class);
tableSource.addColumn("activity", "last_login", java.sql.Timestamp.class);
tableSource.addColumn("activity", "login_count", Long.class);
// Register with Table Environment
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
tableEnv.registerTableSource("users", tableSource);
// Query the table
Table result = tableEnv.sqlQuery(
"SELECT user_id, name, age FROM users WHERE age > 21 AND login_count > 10"
);// Project specific fields for better performance
int[] projectedFields = {0, 1, 3}; // user_id, name, email
HBaseTableSource projectedSource = tableSource.projectFields(projectedFields);
tableEnv.registerTableSource("users_projected", projectedSource);
Table result = tableEnv.sqlQuery("SELECT * FROM users_projected");A table sink that handles upsert and delete operations for HBase tables through the Table API.
class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {
public HBaseUpsertTableSink(HBaseTableSchema hbaseTableSchema,
HBaseOptions hbaseOptions, HBaseWriteOptions writeOptions);
// Configuration methods
public void setKeyFields(String[] keys); // Ignored - HBase always upserts on rowkey
public void setIsAppendOnly(Boolean isAppendOnly);
// Table sink methods
public TypeInformation<Row> getRecordType();
public TableSchema getTableSchema();
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes);
}import org.apache.flink.addons.hbase.HBaseUpsertTableSink;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.addons.hbase.HBaseOptions;
import org.apache.flink.addons.hbase.HBaseWriteOptions;
// Define HBase table schema
HBaseTableSchema schema = new HBaseTableSchema();
schema.setRowKey("user_id", String.class);
schema.addColumn("profile", "name", String.class);
schema.addColumn("profile", "age", Integer.class);
schema.addColumn("activity", "last_login", java.sql.Timestamp.class);
// Configure HBase connection
HBaseOptions hbaseOptions = HBaseOptions.builder()
.setTableName("user_profiles")
.setZkQuorum("localhost:2181")
.build();
// Configure write options
HBaseWriteOptions writeOptions = HBaseWriteOptions.builder()
.setBufferFlushMaxSizeInBytes(2 * 1024 * 1024) // 2MB
.setBufferFlushMaxRows(1000)
.setBufferFlushIntervalMillis(5000) // 5 seconds
.build();
// Create table sink
HBaseUpsertTableSink tableSink = new HBaseUpsertTableSink(schema, hbaseOptions, writeOptions);
// Register with Table Environment
tableEnv.registerTableSink("user_sink", tableSink);
// Insert data via SQL
tableEnv.sqlUpdate(
"INSERT INTO user_sink " +
"SELECT user_id, name, age, CURRENT_TIMESTAMP " +
"FROM source_table WHERE age > 18"
);Factory class for creating HBase table sources and sinks using connector properties.
class HBaseTableFactory implements
StreamTableSourceFactory<Row>, StreamTableSinkFactory<Tuple2<Boolean, Row>> {
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties);
public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties);
public Map<String, String> requiredContext();
public List<String> supportedProperties();
}import org.apache.flink.table.descriptors.ConnectorDescriptor;
import java.util.Map;
import java.util.HashMap;
// Define connector properties
Map<String, String> properties = new HashMap<>();
properties.put("connector.type", "hbase");
properties.put("connector.version", "1.4.3");
properties.put("connector.table-name", "user_profiles");
properties.put("connector.zookeeper.quorum", "localhost:2181");
properties.put("connector.zookeeper.znode.parent", "/hbase");
// Schema properties
properties.put("schema.0.name", "user_id");
properties.put("schema.0.data-type", "VARCHAR");
properties.put("schema.1.name", "name");
properties.put("schema.1.data-type", "VARCHAR");
properties.put("schema.2.name", "age");
properties.put("schema.2.data-type", "INT");
// Create source via factory
HBaseTableFactory factory = new HBaseTableFactory();
StreamTableSource<Row> source = factory.createStreamTableSource(properties);Declarative configuration using Flink's descriptor API for Table DDL.
class HBase extends ConnectorDescriptor {
public HBase();
// Required configuration
public HBase version(String version);
public HBase tableName(String tableName);
public HBase zookeeperQuorum(String zookeeperQuorum);
// Optional configuration
public HBase zookeeperNodeParent(String zookeeperNodeParent);
public HBase writeBufferFlushMaxSize(String maxSize);
public HBase writeBufferFlushMaxRows(int writeBufferFlushMaxRows);
public HBase writeBufferFlushInterval(String interval);
}import org.apache.flink.table.descriptors.HBase;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.api.DataTypes;
// Create HBase table using descriptors
tableEnv.connect(
new HBase()
.version("1.4.3")
.tableName("user_events")
.zookeeperQuorum("zk1:2181,zk2:2181,zk3:2181")
.zookeeperNodeParent("/hbase")
.writeBufferFlushMaxSize("4mb")
.writeBufferFlushMaxRows(2000)
.writeBufferFlushInterval("10s")
)
.withSchema(
new Schema()
.field("rowkey", DataTypes.STRING())
.field("event_type", DataTypes.STRING())
.field("user_id", DataTypes.STRING())
.field("timestamp", DataTypes.TIMESTAMP(3))
.field("properties", DataTypes.STRING())
)
.createTemporaryTable("events");
// Use the table in SQL
Table events = tableEnv.sqlQuery(
"SELECT user_id, event_type, timestamp " +
"FROM events " +
"WHERE event_type = 'login' AND user_id IS NOT NULL"
);Create HBase tables using SQL DDL statements:
-- Create HBase source table
CREATE TABLE user_profiles (
user_id STRING,
name STRING,
age INT,
email STRING,
last_login TIMESTAMP(3),
login_count BIGINT
) WITH (
'connector' = 'hbase',
'table-name' = 'user_profiles',
'zookeeper.quorum' = 'localhost:2181'
);
-- Create HBase sink table with write options
CREATE TABLE user_sink (
user_id STRING,
name STRING,
age INT,
registration_time TIMESTAMP(3)
) WITH (
'connector' = 'hbase',
'table-name' = 'users',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
'write.buffer-flush.max-size' = '4mb',
'write.buffer-flush.max-rows' = '2000',
'write.buffer-flush.interval' = '10s'
);Use HBase as a dimension table for enriching streaming data with lookup joins:
// Register HBase dimension table
tableEnv.connect(
new HBase()
.version("1.4.3")
.tableName("user_profiles")
.zookeeperQuorum("localhost:2181")
)
.withSchema(
new Schema()
.field("user_id", DataTypes.STRING())
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.field("email", DataTypes.STRING())
)
.createTemporaryTable("user_dim");
// Create a processing time temporal table
tableEnv.sqlUpdate(
"CREATE VIEW user_dim_temporal AS " +
"SELECT *, PROCTIME() as proc_time FROM user_dim"
);
// Perform lookup join
Table enrichedEvents = tableEnv.sqlQuery(
"SELECT " +
" e.event_id, " +
" e.user_id, " +
" e.event_type, " +
" u.name, " +
" u.email " +
"FROM events e " +
"JOIN user_dim_temporal FOR SYSTEM_TIME AS OF e.proc_time AS u " +
"ON e.user_id = u.user_id"
);-- Source stream table
CREATE TABLE user_events (
event_id STRING,
user_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'user-events'
);
-- HBase lookup table
CREATE TABLE user_profiles (
user_id STRING,
name STRING,
age INT,
email STRING
) WITH (
'connector' = 'hbase',
'table-name' = 'user_profiles',
'zookeeper.quorum' = 'localhost:2181'
);
-- Lookup join query
SELECT
e.event_id,
e.user_id,
e.event_type,
u.name,
u.email
FROM user_events e
JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;Property key constants for HBase connector configuration:
class HBaseValidator {
public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
public static final String CONNECTOR_VERSION_VALUE_143 = "1.4.3";
public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum";
public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval";
public void validate(DescriptorProperties properties);
}Complete list of supported connector properties:
| Property | Description | Example |
|---|---|---|
connector.type | Connector type (must be "hbase") | "hbase" |
connector.version | HBase version | "1.4.3" |
connector.table-name | HBase table name | "my_table" |
connector.zookeeper.quorum | ZooKeeper ensemble | "zk1:2181,zk2:2181" |
| Property | Description | Default | Example |
|---|---|---|---|
connector.zookeeper.znode.parent | ZK parent node | "/hbase" | "/hbase-prod" |
connector.write.buffer-flush.max-size | Max buffer size | "2mb" | "4mb" |
connector.write.buffer-flush.max-rows | Max buffered rows | 1000 | 2000 |
connector.write.buffer-flush.interval | Flush interval | "5s" | "10s" |
// Process HBase table data by row key ranges
Table rangeQuery = tableEnv.sqlQuery(
"SELECT * FROM users " +
"WHERE user_id BETWEEN 'user_00001' AND 'user_99999'"
);
// Use row key prefixes for efficient scanning
Table prefixQuery = tableEnv.sqlQuery(
"SELECT * FROM events " +
"WHERE rowkey LIKE 'user123_%'"
);// Aggregate data from HBase
Table analytics = tableEnv.sqlQuery(
"SELECT " +
" DATE_FORMAT(last_login, 'yyyy-MM-dd') as login_date, " +
" COUNT(*) as user_count, " +
" AVG(age) as avg_age " +
"FROM user_profiles " +
"WHERE last_login >= CURRENT_DATE - INTERVAL '7' DAY " +
"GROUP BY DATE_FORMAT(last_login, 'yyyy-MM-dd')"
);// Handle complex data types in HBase
tableEnv.connect(
new HBase()
.version("1.4.3")
.tableName("complex_data")
.zookeeperQuorum("localhost:2181")
)
.withSchema(
new Schema()
.field("id", DataTypes.STRING())
.field("binary_data", DataTypes.BYTES()) // byte[] data
.field("json_data", DataTypes.STRING()) // JSON as string
.field("decimal_value", DataTypes.DECIMAL(10, 2))
.field("timestamp_value", DataTypes.TIMESTAMP(3))
)
.createTemporaryTable("complex_table");// Optimize HBase source scanning
HBaseTableSource optimizedSource = new HBaseTableSource(conf, "large_table");
// Configure HBase client for better scan performance
conf.setInt("hbase.client.scanner.caching", 1000);
conf.setInt("hbase.client.scanner.max.result.size", 2 * 1024 * 1024);
conf.setBoolean("hbase.client.scanner.async.prefetch", true);// High-throughput sink configuration
HBaseWriteOptions highThroughputOptions = HBaseWriteOptions.builder()
.setBufferFlushMaxSizeInBytes(16 * 1024 * 1024) // 16MB
.setBufferFlushMaxRows(10000)
.setBufferFlushIntervalMillis(30000) // 30 seconds
.build();// Configure parallelism for HBase operations
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8); // Match number of HBase regions
// Set specific parallelism for HBase operations
DataStream<Row> hbaseStream = tableEnv.toAppendStream(hbaseTable, Row.class);
hbaseStream.addSink(hbaseSink).setParallelism(4);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hbase-2-11