CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-connector-hbase-2-2-2-11

Apache Flink SQL connector that enables seamless integration with HBase 2.2.x databases through Flink's Table API and SQL interface

Pending
Overview
Eval results
Files

sql-ddl.mddocs/

SQL DDL and Table Operations

Core SQL operations for creating HBase table mappings, defining column families and qualifiers, and managing table schemas with proper data type mappings.

Capabilities

CREATE TABLE Statement

Creates a logical table mapping to an existing HBase table, defining column families, qualifiers, and data types.

CREATE TABLE table_name (
    rowkey STRING,                                  -- Required: Row key column
    column_family ROW<qualifier DATA_TYPE, ...>,   -- Column family with nested qualifiers
    PRIMARY KEY (rowkey) NOT ENFORCED              -- Required: Primary key constraint
) WITH (
    'connector' = 'hbase-2.2',                     -- Required: Connector identifier
    'table-name' = 'hbase_table_name',             -- Required: Actual HBase table name
    -- Additional configuration options
);

Requirements:

  • Row key column must be defined and included in PRIMARY KEY
  • Column families must use ROW type with named qualifiers
  • PRIMARY KEY constraint must specify row key column with NOT ENFORCED
  • HBase table must exist before creating the Flink table mapping

Usage Examples:

-- Simple table with single column family
CREATE TABLE users (
    user_id STRING,
    info ROW<name STRING, age INT, email STRING>,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'users_table',
    'zookeeper.quorum' = 'localhost:2181'
);

-- Table with multiple column families
CREATE TABLE product_catalog (
    product_id STRING,
    basic_info ROW<name STRING, category STRING, price DECIMAL(10,2)>,
    inventory ROW<stock_count INT, warehouse_location STRING>,
    metadata ROW<created_at TIMESTAMP(3), updated_at TIMESTAMP(3)>,
    PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'products',
    'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
    'zookeeper.znode.parent' = '/hbase'
);

INSERT Operations (UPSERT)

Inserts data into HBase table using UPSERT semantics (creates new record or updates existing).

INSERT INTO table_name VALUES (
    'row_key_value',
    ROW(qualifier1_value, qualifier2_value, ...),
    ROW(qualifier3_value, qualifier4_value, ...)
);

INSERT INTO table_name (rowkey, column_family, ...)
VALUES ('row_key_value', ROW(...), ...);

INSERT INTO table_name
SELECT rowkey, column_family, ...
FROM source_table;

Usage Examples:

-- Direct value insertion
INSERT INTO users VALUES (
    'user123',
    ROW('Alice Johnson', 28, 'alice@example.com')
);

-- Column-specific insertion
INSERT INTO product_catalog (product_id, basic_info, inventory)
VALUES (
    'prod_001',
    ROW('Laptop Computer', 'Electronics', 999.99),
    ROW(50, 'Warehouse_A')
);

-- Batch insertion from another table
INSERT INTO users
SELECT 
    user_id,
    ROW(full_name, age, email_address)
FROM staging_users
WHERE status = 'active';

SELECT Operations

Queries data from HBase table with support for projection, filtering, and complex expressions.

SELECT rowkey, column_family.qualifier, ...
FROM table_name
WHERE condition;

SELECT rowkey, column_family.qualifier AS alias
FROM table_name
WHERE column_family.qualifier operator value;

Projection Pushdown: Column family and qualifier level filtering is pushed down to HBase for optimal performance.

Usage Examples:

-- Simple projection and filtering
SELECT user_id, info.name, info.age
FROM users
WHERE info.age > 25;

-- Complex nested field access
SELECT 
    product_id,
    basic_info.name AS product_name,
    basic_info.price,
    inventory.stock_count,
    metadata.created_at
FROM product_catalog
WHERE basic_info.category = 'Electronics'
  AND inventory.stock_count > 0;

-- Aggregation queries
SELECT 
    basic_info.category,
    COUNT(*) AS product_count,
    AVG(basic_info.price) AS avg_price
FROM product_catalog
GROUP BY basic_info.category;

DELETE Operations

Removes records from HBase table by row key.

DELETE FROM table_name 
WHERE rowkey = 'row_key_value';

DELETE FROM table_name
WHERE rowkey IN ('key1', 'key2', 'key3');

Note: DELETE operations in HBase are performed at the row level only. Column-level deletions are not supported through SQL DDL.

Usage Examples:

-- Delete single record
DELETE FROM users WHERE user_id = 'user123';

-- Delete multiple records
DELETE FROM product_catalog 
WHERE product_id IN ('prod_001', 'prod_002', 'prod_003');

Temporal Table Joins (Lookup)

Performs lookup joins using HBase table as a temporal table for enriching streaming data.

SELECT 
    stream_table.field,
    lookup_table.column_family.qualifier
FROM stream_table s
JOIN hbase_table FOR SYSTEM_TIME AS OF s.proc_time AS h
ON s.lookup_key = h.rowkey;

Usage Examples:

-- Enrich order stream with user information
SELECT 
    o.order_id,
    o.product_id,
    o.quantity,
    u.info.name AS customer_name,
    u.info.email AS customer_email
FROM order_stream o
JOIN users FOR SYSTEM_TIME AS OF o.proc_time AS u
ON o.user_id = u.user_id;

-- Product catalog lookup
SELECT 
    sale.transaction_id,
    sale.quantity,
    p.basic_info.name AS product_name,
    p.basic_info.price * sale.quantity AS total_amount
FROM sales_stream sale
JOIN product_catalog FOR SYSTEM_TIME AS OF sale.proc_time AS p
ON sale.product_id = p.product_id;

Schema Evolution Considerations

Column Family Management:

  • Adding new qualifiers: Modify the ROW type definition and recreate the table
  • Column family structure changes require table recreation
  • Data type changes may require custom conversion logic

Backward Compatibility:

  • Existing HBase data remains accessible after schema changes
  • Null handling for missing qualifiers is automatic
  • Type mismatches will cause runtime serialization errors

Usage Examples:

-- Original schema
CREATE TABLE users_v1 (
    user_id STRING,
    info ROW<name STRING, age INT>,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (...);

-- Evolved schema with additional fields
DROP TABLE users_v1;
CREATE TABLE users_v2 (
    user_id STRING,
    info ROW<name STRING, age INT, email STRING, phone STRING>,
    preferences ROW<language STRING, timezone STRING>,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (...);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hbase-2-2-2-11

docs

connection-config.md

data-types.md

index.md

lookup-operations.md

sink-operations.md

sql-ddl.md

tile.json