Apache Flink SQL connector that enables seamless integration with HBase 2.2.x databases through Flink's Table API and SQL interface
—
UPSERT sink operations with intelligent buffering strategies, exactly-once semantics through checkpointing, and comprehensive error handling.
HBase sink supports UPSERT semantics, handling INSERT, UPDATE_AFTER, and DELETE operations from Flink's changelog streams.
-- Supported changelog operations
INSERT INTO hbase_table VALUES (...); -- Creates new row or updates existing
UPDATE hbase_table SET ... WHERE rowkey = '...'; -- Updates existing row (becomes UPDATE_AFTER)
DELETE FROM hbase_table WHERE rowkey = '...'; -- Deletes row by row keyOperation Mapping:
INSERT and UPDATE_AFTER: Converted to HBase PUT operationsDELETE: Converted to HBase DELETE operationsUPDATE_BEFORE: Ignored (not needed for UPSERT semantics)Usage Examples:
-- Streaming UPSERT from changelog source
INSERT INTO user_profiles
SELECT
user_id,
ROW(name, email, age) AS info,
ROW(last_login, total_orders) AS activity
FROM user_changelog_stream;
-- Batch UPSERT operation
INSERT INTO product_inventory
SELECT
product_id,
ROW(name, category, price) AS basic_info,
ROW(stock_count, warehouse_location) AS inventory
FROM product_updates
WHERE update_type IN ('INSERT', 'UPDATE');
-- Delete operation
DELETE FROM expired_sessions
WHERE session_id IN (
SELECT session_id FROM session_cleanup_stream
);Intelligent buffering strategies to optimize write performance and reduce HBase load.
WITH (
'sink.buffer-flush.max-size' = '2mb', -- Buffer size threshold (default: 2MB)
'sink.buffer-flush.max-rows' = '1000', -- Buffer row count threshold (default: 1000)
'sink.buffer-flush.interval' = '1s' -- Time-based flush interval (default: 1s)
)Buffer Flush Triggers:
Usage Examples:
-- High-throughput buffering for batch loads
CREATE TABLE batch_sink (
id STRING,
data ROW<value STRING, timestamp BIGINT>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'batch_data',
'zookeeper.quorum' = 'localhost:2181',
'sink.buffer-flush.max-size' = '64mb', -- Large buffer for batch
'sink.buffer-flush.max-rows' = '100000', -- High row count
'sink.buffer-flush.interval' = '30s' -- Longer flush interval
);
-- Low-latency streaming for real-time updates
CREATE TABLE realtime_sink (
event_id STRING,
event_data ROW<type STRING, payload STRING, timestamp TIMESTAMP(3)>,
PRIMARY KEY (event_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'events',
'zookeeper.quorum' = 'localhost:2181',
'sink.buffer-flush.max-size' = '100kb', -- Small buffer for low latency
'sink.buffer-flush.max-rows' = '10', -- Low row count
'sink.buffer-flush.interval' = '100ms' -- Fast flush interval
);Control sink parallelism for optimal write throughput and resource utilization.
WITH (
'sink.parallelism' = '4' -- Number of parallel sink operators
)Parallelism Considerations:
Usage Examples:
-- High-throughput sink with multiple parallel writers
CREATE TABLE parallel_sink (
partition_key STRING,
metrics ROW<cpu_usage DOUBLE, memory_usage DOUBLE, timestamp TIMESTAMP(3)>,
PRIMARY KEY (partition_key) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'system_metrics',
'zookeeper.quorum' = 'localhost:2181',
'sink.parallelism' = '8', -- 8 parallel writers
'sink.buffer-flush.max-size' = '8mb',
'sink.buffer-flush.max-rows' = '5000'
);
-- Single-writer sink for ordered operations
CREATE TABLE ordered_sink (
sequence_id STRING,
ordered_data ROW<value STRING, order_timestamp TIMESTAMP(3)>,
PRIMARY KEY (sequence_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'ordered_events',
'zookeeper.quorum' = 'localhost:2181',
'sink.parallelism' = '1' -- Single writer for ordering
);Checkpoint integration ensures exactly-once processing guarantees with HBase.
Checkpoint Behavior:
Configuration Example:
-- Sink with exactly-once guarantees
CREATE TABLE exactly_once_sink (
transaction_id STRING,
transaction ROW<amount DECIMAL(15,2), currency STRING, timestamp TIMESTAMP(3)>,
PRIMARY KEY (transaction_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'financial_transactions',
'zookeeper.quorum' = 'localhost:2181',
'sink.buffer-flush.max-size' = '4mb',
'sink.buffer-flush.max-rows' = '2000',
'sink.buffer-flush.interval' = '5s'
);Flink Job Configuration (for exactly-once):
// Configure checkpointing for exactly-once
env.enableCheckpointing(60000); // Checkpoint every 60 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);Comprehensive error handling for write failures and network issues.
Error Categories:
Error Handling Strategies:
-- Robust sink with retry and error handling
CREATE TABLE robust_sink (
record_id STRING,
payload ROW<data STRING, metadata STRING>,
PRIMARY KEY (record_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'reliable_data',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
'sink.buffer-flush.max-size' = '2mb',
'sink.buffer-flush.max-rows' = '1000',
'sink.buffer-flush.interval' = '10s'
);Dead Letter Queue Pattern:
-- Main sink with error handling
INSERT INTO main_hbase_sink
SELECT * FROM input_stream
WHERE is_valid_record(data);
-- Error records to dead letter queue
INSERT INTO error_sink
SELECT *, 'validation_failed' AS error_reason
FROM input_stream
WHERE NOT is_valid_record(data);Buffer Size Tuning:
Row Count Tuning:
Flush Interval Tuning:
Complete Performance Configuration:
CREATE TABLE tuned_sink (
key STRING,
large_payload ROW<
json_data STRING, -- Large JSON payloads
binary_data VARBINARY, -- Binary attachments
metadata ROW<
size_bytes BIGINT,
content_type STRING,
created_at TIMESTAMP(3)
>
>,
PRIMARY KEY (key) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'large_objects',
'zookeeper.quorum' = 'localhost:2181',
-- Tuned for large records
'sink.buffer-flush.max-size' = '32mb', -- Large buffer for big records
'sink.buffer-flush.max-rows' = '100', -- Few rows due to size
'sink.buffer-flush.interval' = '15s', -- Longer interval for batching
'sink.parallelism' = '4' -- Moderate parallelism
);Key Metrics to Monitor:
Monitoring Query Example:
-- Monitor sink performance
SELECT
window_start,
window_end,
COUNT(*) AS records_written,
COUNT(DISTINCT rowkey) AS unique_keys,
COUNT(*) / EXTRACT(EPOCH FROM (window_end - window_start)) AS records_per_second
FROM TABLE(
TUMBLE(TABLE sink_monitoring, DESCRIPTOR(proc_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end;High Memory Usage:
sink.buffer-flush.max-sizesink.parallelismSlow Write Performance:
sink.buffer-flush.max-sizesink.parallelismData Loss Concerns:
Connection Issues:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hbase-2-2-2-11