The HBase connector provides comprehensive integration with Flink's Table API, enabling SQL-based data processing, declarative table definitions, and lookup joins. This includes table sources, sinks, factory classes, and descriptor-based configuration.
A table source that provides HBase table data to the Table API with support for both batch and streaming queries, plus lookup functionality for temporal joins.
class HBaseTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> {
public HBaseTableSource(Configuration conf, String tableName);
// Schema configuration
public void addColumn(String family, String qualifier, Class<?> clazz);
public void setRowKey(String rowKeyName, Class<?> clazz);
public void setCharset(String charset);
// Table source methods
public TypeInformation<Row> getReturnType();
public TableSchema getTableSchema();
public DataSet<Row> getDataSet(ExecutionEnvironment execEnv);
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv);
public HBaseTableSource projectFields(int[] fields);
public String explainSource();
// Lookup capabilities
public TableFunction<Row> getLookupFunction(String[] lookupKeys);
public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys); // Throws UnsupportedOperationException
public boolean isAsyncEnabled(); // Returns false
public boolean isBounded(); // Returns true
}import org.apache.flink.addons.hbase.HBaseTableSource;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.hadoop.conf.Configuration;
// Configure HBase connection
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "localhost:2181");
// Create table source
HBaseTableSource tableSource = new HBaseTableSource(conf, "user_profiles");
// Define schema
tableSource.setRowKey("user_id", String.class);
tableSource.addColumn("profile", "name", String.class);
tableSource.addColumn("profile", "age", Integer.class);
tableSource.addColumn("profile", "email", String.class);
tableSource.addColumn("activity", "last_login", java.sql.Timestamp.class);
tableSource.addColumn("activity", "login_count", Long.class);
// Register with Table Environment
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
tableEnv.registerTableSource("users", tableSource);
// Query the table
Table result = tableEnv.sqlQuery(
"SELECT user_id, name, age FROM users WHERE age > 21 AND login_count > 10"
);// Project specific fields for better performance
int[] projectedFields = {0, 1, 3}; // user_id, name, email
HBaseTableSource projectedSource = tableSource.projectFields(projectedFields);
tableEnv.registerTableSource("users_projected", projectedSource);
Table result = tableEnv.sqlQuery("SELECT * FROM users_projected");A table sink that handles upsert and delete operations for HBase tables through the Table API.
class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {
public HBaseUpsertTableSink(HBaseTableSchema hbaseTableSchema,
HBaseOptions hbaseOptions, HBaseWriteOptions writeOptions);
// Configuration methods
public void setKeyFields(String[] keys); // Ignored - HBase always upserts on rowkey
public void setIsAppendOnly(Boolean isAppendOnly);
// Table sink methods
public TypeInformation<Row> getRecordType();
public TableSchema getTableSchema();
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes);
}import org.apache.flink.addons.hbase.HBaseUpsertTableSink;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.addons.hbase.HBaseOptions;
import org.apache.flink.addons.hbase.HBaseWriteOptions;
// Define HBase table schema
HBaseTableSchema schema = new HBaseTableSchema();
schema.setRowKey("user_id", String.class);
schema.addColumn("profile", "name", String.class);
schema.addColumn("profile", "age", Integer.class);
schema.addColumn("activity", "last_login", java.sql.Timestamp.class);
// Configure HBase connection
HBaseOptions hbaseOptions = HBaseOptions.builder()
.setTableName("user_profiles")
.setZkQuorum("localhost:2181")
.build();
// Configure write options
HBaseWriteOptions writeOptions = HBaseWriteOptions.builder()
.setBufferFlushMaxSizeInBytes(2 * 1024 * 1024) // 2MB
.setBufferFlushMaxRows(1000)
.setBufferFlushIntervalMillis(5000) // 5 seconds
.build();
// Create table sink
HBaseUpsertTableSink tableSink = new HBaseUpsertTableSink(schema, hbaseOptions, writeOptions);
// Register with Table Environment
tableEnv.registerTableSink("user_sink", tableSink);
// Insert data via SQL
tableEnv.sqlUpdate(
"INSERT INTO user_sink " +
"SELECT user_id, name, age, CURRENT_TIMESTAMP " +
"FROM source_table WHERE age > 18"
);Factory class for creating HBase table sources and sinks using connector properties.
class HBaseTableFactory implements
StreamTableSourceFactory<Row>, StreamTableSinkFactory<Tuple2<Boolean, Row>> {
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties);
public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties);
public Map<String, String> requiredContext();
public List<String> supportedProperties();
}import org.apache.flink.table.descriptors.ConnectorDescriptor;
import java.util.Map;
import java.util.HashMap;
// Define connector properties
Map<String, String> properties = new HashMap<>();
properties.put("connector.type", "hbase");
properties.put("connector.version", "1.4.3");
properties.put("connector.table-name", "user_profiles");
properties.put("connector.zookeeper.quorum", "localhost:2181");
properties.put("connector.zookeeper.znode.parent", "/hbase");
// Schema properties
properties.put("schema.0.name", "user_id");
properties.put("schema.0.data-type", "VARCHAR");
properties.put("schema.1.name", "name");
properties.put("schema.1.data-type", "VARCHAR");
properties.put("schema.2.name", "age");
properties.put("schema.2.data-type", "INT");
// Create source via factory
HBaseTableFactory factory = new HBaseTableFactory();
StreamTableSource<Row> source = factory.createStreamTableSource(properties);Declarative configuration using Flink's descriptor API for Table DDL.
class HBase extends ConnectorDescriptor {
public HBase();
// Required configuration
public HBase version(String version);
public HBase tableName(String tableName);
public HBase zookeeperQuorum(String zookeeperQuorum);
// Optional configuration
public HBase zookeeperNodeParent(String zookeeperNodeParent);
public HBase writeBufferFlushMaxSize(String maxSize);
public HBase writeBufferFlushMaxRows(int writeBufferFlushMaxRows);
public HBase writeBufferFlushInterval(String interval);
}import org.apache.flink.table.descriptors.HBase;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.api.DataTypes;
// Create HBase table using descriptors
tableEnv.connect(
new HBase()
.version("1.4.3")
.tableName("user_events")
.zookeeperQuorum("zk1:2181,zk2:2181,zk3:2181")
.zookeeperNodeParent("/hbase")
.writeBufferFlushMaxSize("4mb")
.writeBufferFlushMaxRows(2000)
.writeBufferFlushInterval("10s")
)
.withSchema(
new Schema()
.field("rowkey", DataTypes.STRING())
.field("event_type", DataTypes.STRING())
.field("user_id", DataTypes.STRING())
.field("timestamp", DataTypes.TIMESTAMP(3))
.field("properties", DataTypes.STRING())
)
.createTemporaryTable("events");
// Use the table in SQL
Table events = tableEnv.sqlQuery(
"SELECT user_id, event_type, timestamp " +
"FROM events " +
"WHERE event_type = 'login' AND user_id IS NOT NULL"
);Create HBase tables using SQL DDL statements:
-- Create HBase source table
CREATE TABLE user_profiles (
user_id STRING,
name STRING,
age INT,
email STRING,
last_login TIMESTAMP(3),
login_count BIGINT
) WITH (
'connector' = 'hbase',
'table-name' = 'user_profiles',
'zookeeper.quorum' = 'localhost:2181'
);
-- Create HBase sink table with write options
CREATE TABLE user_sink (
user_id STRING,
name STRING,
age INT,
registration_time TIMESTAMP(3)
) WITH (
'connector' = 'hbase',
'table-name' = 'users',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
'write.buffer-flush.max-size' = '4mb',
'write.buffer-flush.max-rows' = '2000',
'write.buffer-flush.interval' = '10s'
);Use HBase as a dimension table for enriching streaming data with lookup joins:
// Register HBase dimension table
tableEnv.connect(
new HBase()
.version("1.4.3")
.tableName("user_profiles")
.zookeeperQuorum("localhost:2181")
)
.withSchema(
new Schema()
.field("user_id", DataTypes.STRING())
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.field("email", DataTypes.STRING())
)
.createTemporaryTable("user_dim");
// Create a processing time temporal table
tableEnv.sqlUpdate(
"CREATE VIEW user_dim_temporal AS " +
"SELECT *, PROCTIME() as proc_time FROM user_dim"
);
// Perform lookup join
Table enrichedEvents = tableEnv.sqlQuery(
"SELECT " +
" e.event_id, " +
" e.user_id, " +
" e.event_type, " +
" u.name, " +
" u.email " +
"FROM events e " +
"JOIN user_dim_temporal FOR SYSTEM_TIME AS OF e.proc_time AS u " +
"ON e.user_id = u.user_id"
);-- Source stream table
CREATE TABLE user_events (
event_id STRING,
user_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'user-events'
);
-- HBase lookup table
CREATE TABLE user_profiles (
user_id STRING,
name STRING,
age INT,
email STRING
) WITH (
'connector' = 'hbase',
'table-name' = 'user_profiles',
'zookeeper.quorum' = 'localhost:2181'
);
-- Lookup join query
SELECT
e.event_id,
e.user_id,
e.event_type,
u.name,
u.email
FROM user_events e
JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;Property key constants for HBase connector configuration:
class HBaseValidator {
public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
public static final String CONNECTOR_VERSION_VALUE_143 = "1.4.3";
public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum";
public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval";
public void validate(DescriptorProperties properties);
}Complete list of supported connector properties:
| Property | Description | Example |
|---|---|---|
connector.type | Connector type (must be "hbase") | "hbase" |
connector.version | HBase version | "1.4.3" |
connector.table-name | HBase table name | "my_table" |
connector.zookeeper.quorum | ZooKeeper ensemble | "zk1:2181,zk2:2181" |
| Property | Description | Default | Example |
|---|---|---|---|
connector.zookeeper.znode.parent | ZK parent node | "/hbase" | "/hbase-prod" |
connector.write.buffer-flush.max-size | Max buffer size | "2mb" | "4mb" |
connector.write.buffer-flush.max-rows | Max buffered rows | 1000 | 2000 |
connector.write.buffer-flush.interval | Flush interval | "5s" | "10s" |
// Process HBase table data by row key ranges
Table rangeQuery = tableEnv.sqlQuery(
"SELECT * FROM users " +
"WHERE user_id BETWEEN 'user_00001' AND 'user_99999'"
);
// Use row key prefixes for efficient scanning
Table prefixQuery = tableEnv.sqlQuery(
"SELECT * FROM events " +
"WHERE rowkey LIKE 'user123_%'"
);// Aggregate data from HBase
Table analytics = tableEnv.sqlQuery(
"SELECT " +
" DATE_FORMAT(last_login, 'yyyy-MM-dd') as login_date, " +
" COUNT(*) as user_count, " +
" AVG(age) as avg_age " +
"FROM user_profiles " +
"WHERE last_login >= CURRENT_DATE - INTERVAL '7' DAY " +
"GROUP BY DATE_FORMAT(last_login, 'yyyy-MM-dd')"
);// Handle complex data types in HBase
tableEnv.connect(
new HBase()
.version("1.4.3")
.tableName("complex_data")
.zookeeperQuorum("localhost:2181")
)
.withSchema(
new Schema()
.field("id", DataTypes.STRING())
.field("binary_data", DataTypes.BYTES()) // byte[] data
.field("json_data", DataTypes.STRING()) // JSON as string
.field("decimal_value", DataTypes.DECIMAL(10, 2))
.field("timestamp_value", DataTypes.TIMESTAMP(3))
)
.createTemporaryTable("complex_table");// Optimize HBase source scanning
HBaseTableSource optimizedSource = new HBaseTableSource(conf, "large_table");
// Configure HBase client for better scan performance
conf.setInt("hbase.client.scanner.caching", 1000);
conf.setInt("hbase.client.scanner.max.result.size", 2 * 1024 * 1024);
conf.setBoolean("hbase.client.scanner.async.prefetch", true);// High-throughput sink configuration
HBaseWriteOptions highThroughputOptions = HBaseWriteOptions.builder()
.setBufferFlushMaxSizeInBytes(16 * 1024 * 1024) // 16MB
.setBufferFlushMaxRows(10000)
.setBufferFlushIntervalMillis(30000) // 30 seconds
.build();// Configure parallelism for HBase operations
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8); // Match number of HBase regions
// Set specific parallelism for HBase operations
DataStream<Row> hbaseStream = tableEnv.toAppendStream(hbaseTable, Row.class);
hbaseStream.addSink(hbaseSink).setParallelism(4);