Apache Flink SQL connector that enables seamless integration with HBase 2.2.x databases through Flink's Table API and SQL interface
—
Complete data type mapping between Flink's type system and HBase storage format, including support for complex nested types and proper serialization handling.
Comprehensive mapping of Flink logical types to HBase byte storage with proper serialization and deserialization.
// Primitive numeric types
BOOLEAN // 1 byte (true=1, false=0)
TINYINT // 1 byte signed integer (-128 to 127)
SMALLINT // 2 byte signed integer (-32,768 to 32,767)
INTEGER // 4 byte signed integer
BIGINT // 8 byte signed integer
// Floating point types
FLOAT // 4 byte IEEE 754 floating point
DOUBLE // 8 byte IEEE 754 floating point
DECIMAL(precision, scale) // Variable length decimal number
// String and character types
CHAR(length) // Fixed length character string
VARCHAR(length) // Variable length character string
// Binary types
BINARY(length) // Fixed length binary data
VARBINARY(length) // Variable length binary data
// Temporal types
DATE // Date without time (stored as epoch days)
TIME(precision) // Time without date (precision 0-3)
TIMESTAMP(precision) // Date and time (precision 0-3)
// Interval types (limited support)
INTERVAL YEAR TO MONTH // Year-month intervals
INTERVAL DAY TO SECOND // Day-time intervals
// Complex types
ROW<field_name field_type, ...> // Nested row type for column familiesUsage Examples:
-- Table demonstrating all supported types
CREATE TABLE type_showcase (
row_id STRING,
-- Primitive types column family
primitives ROW<
flag BOOLEAN,
tiny_num TINYINT,
small_num SMALLINT,
int_num INTEGER,
big_num BIGINT,
float_num FLOAT,
double_num DOUBLE,
decimal_num DECIMAL(15, 2)
>,
-- String and binary types
text_data ROW<
fixed_char CHAR(10),
var_string VARCHAR(255),
binary_data VARBINARY(1024)
>,
-- Temporal types
time_data ROW<
event_date DATE,
event_time TIME(3),
event_timestamp TIMESTAMP(3)
>,
PRIMARY KEY (row_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'type_demo',
'zookeeper.quorum' = 'localhost:2181'
);How Flink table schema maps to HBase table structure with column families and qualifiers.
// Flink table structure
CREATE TABLE flink_table (
rowkey STRING, -- Maps to HBase row key
family1 ROW< -- Maps to HBase column family 'family1'
qualifier1 STRING, -- Maps to family1:qualifier1
qualifier2 INTEGER -- Maps to family1:qualifier2
>,
family2 ROW< -- Maps to HBase column family 'family2'
qual_a DOUBLE, -- Maps to family2:qual_a
qual_b TIMESTAMP(3) -- Maps to family2:qual_b
>,
PRIMARY KEY (rowkey) NOT ENFORCED
);HBase Table Structure:
Row Key: rowkey value
Column Family: family1
- family1:qualifier1 -> STRING bytes
- family1:qualifier2 -> INTEGER bytes
Column Family: family2
- family2:qual_a -> DOUBLE bytes
- family2:qual_b -> TIMESTAMP bytesUsage Examples:
-- E-commerce product table mapping
CREATE TABLE products (
product_id STRING, -- Row key: "PROD_12345"
basic_info ROW< -- Column family: info
name VARCHAR(200), -- info:name
description VARCHAR(1000), -- info:description
category VARCHAR(50), -- info:category
price DECIMAL(10,2) -- info:price
>,
inventory ROW< -- Column family: stock
quantity INTEGER, -- stock:quantity
warehouse_location VARCHAR(50), -- stock:warehouse_location
last_updated TIMESTAMP(3) -- stock:last_updated
>,
metadata ROW< -- Column family: meta
created_at TIMESTAMP(3), -- meta:created_at
updated_at TIMESTAMP(3), -- meta:updated_at
version INTEGER -- meta:version
>,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'product_catalog',
'zookeeper.quorum' = 'localhost:2181'
);Detailed serialization format for each data type showing how Flink types are converted to HBase bytes.
// Serialization specifications
// Boolean: 1 byte
true -> 0x01
false -> 0x00
// Integer types: Big-endian byte order
TINYINT -> 1 byte signed
SMALLINT -> 2 bytes signed big-endian
INTEGER -> 4 bytes signed big-endian
BIGINT -> 8 bytes signed big-endian
// Floating point: IEEE 754 big-endian
FLOAT -> 4 bytes IEEE 754 big-endian
DOUBLE -> 8 bytes IEEE 754 big-endian
// Decimal: Variable length
DECIMAL -> String representation as UTF-8 bytes
// String types: UTF-8 encoding
CHAR -> UTF-8 bytes (padded to fixed length)
VARCHAR -> UTF-8 bytes (variable length)
// Binary types: Raw bytes
BINARY -> Raw bytes (padded to fixed length)
VARBINARY -> Raw bytes (variable length)
// Temporal types: Numeric representation
DATE -> 4 bytes (epoch days as INTEGER)
TIME -> 4 bytes (milliseconds since midnight as INTEGER)
TIMESTAMP -> 8 bytes (milliseconds since epoch as BIGINT)Custom Serialization Example:
-- Table with various serialization examples
INSERT INTO type_showcase VALUES (
'sample_001',
ROW(
true, -- Boolean: 0x01
127, -- TINYINT: 0x7F
32767, -- SMALLINT: 0x7FFF
2147483647, -- INTEGER: 0x7FFFFFFF
9223372036854775807, -- BIGINT: 0x7FFFFFFFFFFFFFFF
3.14159, -- FLOAT: IEEE 754 representation
2.718281828, -- DOUBLE: IEEE 754 representation
999.99 -- DECIMAL: "999.99" as UTF-8
),
ROW(
'FIXED ', -- CHAR(10): UTF-8 padded to 10 bytes
'Variable text', -- VARCHAR: UTF-8 bytes
CAST('Binary data' AS VARBINARY(1024)) -- Raw bytes
),
ROW(
DATE '2023-12-25', -- Date: days since epoch
TIME '14:30:45.123', -- Time: milliseconds since midnight
TIMESTAMP '2023-12-25 14:30:45.123' -- Timestamp: milliseconds since epoch
)
);How null values are represented and handled in HBase storage.
// Null value representation options
'null-string-literal' = 'null' // Default null representation
'null-string-literal' = 'N/A' // Custom null representation
'null-string-literal' = '' // Empty string for nullsNull Handling Behavior:
Usage Examples:
-- Table with custom null handling
CREATE TABLE null_demo (
id STRING,
data ROW<
required_field STRING, -- Never null
optional_field STRING, -- May be null
numeric_field INTEGER -- May be null (absent qualifier)
>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'null_test',
'zookeeper.quorum' = 'localhost:2181',
'null-string-literal' = 'NULL' -- Custom null representation
);
-- Insert with null values
INSERT INTO null_demo VALUES (
'test_001',
ROW('Required Value', NULL, NULL) -- Optional fields as null
);
-- Query handling nulls
SELECT
id,
data.required_field,
COALESCE(data.optional_field, 'Not provided') AS optional_with_default,
COALESCE(data.numeric_field, 0) AS numeric_with_default
FROM null_demo;Automatic type conversions and explicit casting between compatible types.
// Automatic conversions (widening)
TINYINT -> SMALLINT -> INTEGER -> BIGINT // Integer widening
FLOAT -> DOUBLE // Floating point widening
CHAR -> VARCHAR // String length expansion
// Explicit casting required (narrowing or cross-type)
CAST(bigint_value AS INTEGER) // Integer narrowing
CAST(string_value AS INTEGER) // String to number
CAST(timestamp_value AS BIGINT) // Timestamp to epoch millis
CAST(binary_data AS STRING) // Binary to string (UTF-8)Usage Examples:
-- Type conversion examples
SELECT
id,
CAST(data.string_number AS INTEGER) AS parsed_int,
CAST(data.timestamp_field AS BIGINT) AS epoch_millis,
CAST(data.big_number AS DOUBLE) AS as_double
FROM conversion_demo;
-- Insert with type casting
INSERT INTO products VALUES (
'PROD_001',
ROW(
'Sample Product',
'Product description',
'Electronics',
CAST('299.99' AS DECIMAL(10,2)) -- String to decimal
),
ROW(
CAST('100' AS INTEGER), -- String to integer
'Warehouse A',
CURRENT_TIMESTAMP -- Current time
),
ROW(
CURRENT_TIMESTAMP,
CURRENT_TIMESTAMP,
1
)
);Managing schema changes and data type evolution in HBase tables.
Adding New Qualifiers:
-- Original schema
CREATE TABLE users_v1 (
user_id STRING,
profile ROW<name STRING, email STRING>,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (...);
-- Evolved schema with additional qualifiers
DROP TABLE users_v1;
CREATE TABLE users_v2 (
user_id STRING,
profile ROW<
name STRING,
email STRING,
phone STRING, -- New qualifier
age INTEGER -- New qualifier
>,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (...);Data Type Changes:
-- Careful type evolution requiring data migration
-- OLD: age stored as STRING
-- NEW: age stored as INTEGER
-- Migration approach
INSERT INTO users_new
SELECT
user_id,
ROW(
profile.name,
profile.email,
profile.phone,
CAST(profile.age AS INTEGER) -- Convert string to integer
)
FROM users_old
WHERE profile.age IS NOT NULL
AND profile.age REGEXP '^[0-9]+$'; -- Validate numeric stringsType-specific performance characteristics and optimization recommendations.
Storage Efficiency:
Serialization Performance:
Query Performance:
Optimization Example:
-- Optimized table design for performance
CREATE TABLE optimized_metrics (
-- Use efficient row key design
metric_key STRING, -- Format: "server_001_cpu_20231225_1430"
-- Group related small values in one family
values ROW<
cpu_percent DOUBLE, -- Efficient numeric storage
memory_bytes BIGINT, -- Efficient numeric storage
active_connections INTEGER -- Efficient numeric storage
>,
-- Separate family for optional/sparse data
metadata ROW<
hostname VARCHAR(100), -- String only when needed
os_version VARCHAR(50), -- String only when needed
last_restart TIMESTAMP(3) -- Efficient temporal storage
>,
PRIMARY KEY (metric_key) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'server_metrics',
'zookeeper.quorum' = 'localhost:2181'
);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hbase-2-2-2-11