Apache Flink SQL connector that enables seamless integration with HBase 2.2.x databases through Flink's Table API and SQL interface
—
Core SQL operations for creating HBase table mappings, defining column families and qualifiers, and managing table schemas with proper data type mappings.
Creates a logical table mapping to an existing HBase table, defining column families, qualifiers, and data types.
CREATE TABLE table_name (
rowkey STRING, -- Required: Row key column
column_family ROW<qualifier DATA_TYPE, ...>, -- Column family with nested qualifiers
PRIMARY KEY (rowkey) NOT ENFORCED -- Required: Primary key constraint
) WITH (
'connector' = 'hbase-2.2', -- Required: Connector identifier
'table-name' = 'hbase_table_name', -- Required: Actual HBase table name
-- Additional configuration options
);Requirements:
Usage Examples:
-- Simple table with single column family
CREATE TABLE users (
user_id STRING,
info ROW<name STRING, age INT, email STRING>,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'users_table',
'zookeeper.quorum' = 'localhost:2181'
);
-- Table with multiple column families
CREATE TABLE product_catalog (
product_id STRING,
basic_info ROW<name STRING, category STRING, price DECIMAL(10,2)>,
inventory ROW<stock_count INT, warehouse_location STRING>,
metadata ROW<created_at TIMESTAMP(3), updated_at TIMESTAMP(3)>,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'products',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
'zookeeper.znode.parent' = '/hbase'
);Inserts data into HBase table using UPSERT semantics (creates new record or updates existing).
INSERT INTO table_name VALUES (
'row_key_value',
ROW(qualifier1_value, qualifier2_value, ...),
ROW(qualifier3_value, qualifier4_value, ...)
);
INSERT INTO table_name (rowkey, column_family, ...)
VALUES ('row_key_value', ROW(...), ...);
INSERT INTO table_name
SELECT rowkey, column_family, ...
FROM source_table;Usage Examples:
-- Direct value insertion
INSERT INTO users VALUES (
'user123',
ROW('Alice Johnson', 28, 'alice@example.com')
);
-- Column-specific insertion
INSERT INTO product_catalog (product_id, basic_info, inventory)
VALUES (
'prod_001',
ROW('Laptop Computer', 'Electronics', 999.99),
ROW(50, 'Warehouse_A')
);
-- Batch insertion from another table
INSERT INTO users
SELECT
user_id,
ROW(full_name, age, email_address)
FROM staging_users
WHERE status = 'active';Queries data from HBase table with support for projection, filtering, and complex expressions.
SELECT rowkey, column_family.qualifier, ...
FROM table_name
WHERE condition;
SELECT rowkey, column_family.qualifier AS alias
FROM table_name
WHERE column_family.qualifier operator value;Projection Pushdown: Column family and qualifier level filtering is pushed down to HBase for optimal performance.
Usage Examples:
-- Simple projection and filtering
SELECT user_id, info.name, info.age
FROM users
WHERE info.age > 25;
-- Complex nested field access
SELECT
product_id,
basic_info.name AS product_name,
basic_info.price,
inventory.stock_count,
metadata.created_at
FROM product_catalog
WHERE basic_info.category = 'Electronics'
AND inventory.stock_count > 0;
-- Aggregation queries
SELECT
basic_info.category,
COUNT(*) AS product_count,
AVG(basic_info.price) AS avg_price
FROM product_catalog
GROUP BY basic_info.category;Removes records from HBase table by row key.
DELETE FROM table_name
WHERE rowkey = 'row_key_value';
DELETE FROM table_name
WHERE rowkey IN ('key1', 'key2', 'key3');Note: DELETE operations in HBase are performed at the row level only. Column-level deletions are not supported through SQL DDL.
Usage Examples:
-- Delete single record
DELETE FROM users WHERE user_id = 'user123';
-- Delete multiple records
DELETE FROM product_catalog
WHERE product_id IN ('prod_001', 'prod_002', 'prod_003');Performs lookup joins using HBase table as a temporal table for enriching streaming data.
SELECT
stream_table.field,
lookup_table.column_family.qualifier
FROM stream_table s
JOIN hbase_table FOR SYSTEM_TIME AS OF s.proc_time AS h
ON s.lookup_key = h.rowkey;Usage Examples:
-- Enrich order stream with user information
SELECT
o.order_id,
o.product_id,
o.quantity,
u.info.name AS customer_name,
u.info.email AS customer_email
FROM order_stream o
JOIN users FOR SYSTEM_TIME AS OF o.proc_time AS u
ON o.user_id = u.user_id;
-- Product catalog lookup
SELECT
sale.transaction_id,
sale.quantity,
p.basic_info.name AS product_name,
p.basic_info.price * sale.quantity AS total_amount
FROM sales_stream sale
JOIN product_catalog FOR SYSTEM_TIME AS OF sale.proc_time AS p
ON sale.product_id = p.product_id;Column Family Management:
Backward Compatibility:
Usage Examples:
-- Original schema
CREATE TABLE users_v1 (
user_id STRING,
info ROW<name STRING, age INT>,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (...);
-- Evolved schema with additional fields
DROP TABLE users_v1;
CREATE TABLE users_v2 (
user_id STRING,
info ROW<name STRING, age INT, email STRING, phone STRING>,
preferences ROW<language STRING, timezone STRING>,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (...);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hbase-2-2-2-11