CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-connector-hbase-2-2-2-11

Apache Flink SQL connector that enables seamless integration with HBase 2.2.x databases through Flink's Table API and SQL interface

Pending
Overview
Eval results
Files

sink-operations.mddocs/

Sink Operations and Buffering

UPSERT sink operations with intelligent buffering strategies, exactly-once semantics through checkpointing, and comprehensive error handling.

Capabilities

UPSERT Operations

HBase sink supports UPSERT semantics, handling INSERT, UPDATE_AFTER, and DELETE operations from Flink's changelog streams.

-- Supported changelog operations
INSERT INTO hbase_table VALUES (...);              -- Creates new row or updates existing
UPDATE hbase_table SET ... WHERE rowkey = '...';   -- Updates existing row (becomes UPDATE_AFTER)
DELETE FROM hbase_table WHERE rowkey = '...';      -- Deletes row by row key

Operation Mapping:

  • INSERT and UPDATE_AFTER: Converted to HBase PUT operations
  • DELETE: Converted to HBase DELETE operations
  • UPDATE_BEFORE: Ignored (not needed for UPSERT semantics)

Usage Examples:

-- Streaming UPSERT from changelog source
INSERT INTO user_profiles
SELECT 
    user_id,
    ROW(name, email, age) AS info,
    ROW(last_login, total_orders) AS activity
FROM user_changelog_stream;

-- Batch UPSERT operation
INSERT INTO product_inventory
SELECT 
    product_id,
    ROW(name, category, price) AS basic_info,
    ROW(stock_count, warehouse_location) AS inventory
FROM product_updates
WHERE update_type IN ('INSERT', 'UPDATE');

-- Delete operation
DELETE FROM expired_sessions 
WHERE session_id IN (
    SELECT session_id FROM session_cleanup_stream
);

Buffering Configuration

Intelligent buffering strategies to optimize write performance and reduce HBase load.

WITH (
    'sink.buffer-flush.max-size' = '2mb',          -- Buffer size threshold (default: 2MB)
    'sink.buffer-flush.max-rows' = '1000',         -- Buffer row count threshold (default: 1000)  
    'sink.buffer-flush.interval' = '1s'            -- Time-based flush interval (default: 1s)
)

Buffer Flush Triggers:

  1. Size Threshold: When buffered mutations exceed max-size
  2. Row Count Threshold: When buffered row count exceeds max-rows
  3. Time Interval: When flush interval expires since last flush
  4. Checkpoint: On Flink checkpoint to ensure exactly-once semantics

Usage Examples:

-- High-throughput buffering for batch loads
CREATE TABLE batch_sink (
    id STRING,
    data ROW<value STRING, timestamp BIGINT>,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'batch_data',
    'zookeeper.quorum' = 'localhost:2181',
    'sink.buffer-flush.max-size' = '64mb',    -- Large buffer for batch
    'sink.buffer-flush.max-rows' = '100000',  -- High row count
    'sink.buffer-flush.interval' = '30s'      -- Longer flush interval
);

-- Low-latency streaming for real-time updates
CREATE TABLE realtime_sink (
    event_id STRING,
    event_data ROW<type STRING, payload STRING, timestamp TIMESTAMP(3)>,
    PRIMARY KEY (event_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'events',
    'zookeeper.quorum' = 'localhost:2181',
    'sink.buffer-flush.max-size' = '100kb',   -- Small buffer for low latency
    'sink.buffer-flush.max-rows' = '10',      -- Low row count
    'sink.buffer-flush.interval' = '100ms'    -- Fast flush interval
);

Parallelism Configuration

Control sink parallelism for optimal write throughput and resource utilization.

WITH (
    'sink.parallelism' = '4'                       -- Number of parallel sink operators
)

Parallelism Considerations:

  • Higher parallelism increases write throughput
  • Each parallel instance maintains separate buffers
  • HBase region distribution affects optimal parallelism
  • Memory usage scales with parallelism

Usage Examples:

-- High-throughput sink with multiple parallel writers
CREATE TABLE parallel_sink (
    partition_key STRING,
    metrics ROW<cpu_usage DOUBLE, memory_usage DOUBLE, timestamp TIMESTAMP(3)>,
    PRIMARY KEY (partition_key) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'system_metrics',
    'zookeeper.quorum' = 'localhost:2181',
    'sink.parallelism' = '8',                     -- 8 parallel writers
    'sink.buffer-flush.max-size' = '8mb',
    'sink.buffer-flush.max-rows' = '5000'
);

-- Single-writer sink for ordered operations
CREATE TABLE ordered_sink (
    sequence_id STRING,
    ordered_data ROW<value STRING, order_timestamp TIMESTAMP(3)>,
    PRIMARY KEY (sequence_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'ordered_events',
    'zookeeper.quorum' = 'localhost:2181',
    'sink.parallelism' = '1'                      -- Single writer for ordering
);

Exactly-Once Semantics

Checkpoint integration ensures exactly-once processing guarantees with HBase.

Checkpoint Behavior:

  • Buffers are flushed on checkpoint barriers
  • Failed checkpoints trigger buffer discard and restart
  • Recovery restores buffer state from last successful checkpoint
  • Two-phase commit protocol ensures data consistency

Configuration Example:

-- Sink with exactly-once guarantees
CREATE TABLE exactly_once_sink (
    transaction_id STRING,
    transaction ROW<amount DECIMAL(15,2), currency STRING, timestamp TIMESTAMP(3)>,
    PRIMARY KEY (transaction_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'financial_transactions',
    'zookeeper.quorum' = 'localhost:2181',
    'sink.buffer-flush.max-size' = '4mb',
    'sink.buffer-flush.max-rows' = '2000',
    'sink.buffer-flush.interval' = '5s'
);

Flink Job Configuration (for exactly-once):

// Configure checkpointing for exactly-once
env.enableCheckpointing(60000); // Checkpoint every 60 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);

Error Handling and Recovery

Comprehensive error handling for write failures and network issues.

Error Categories:

  1. Transient Errors: Connection timeouts, region server unavailability
  2. Permanent Errors: Table not found, schema mismatches, permission denied
  3. Data Errors: Serialization failures, constraint violations

Error Handling Strategies:

-- Robust sink with retry and error handling
CREATE TABLE robust_sink (
    record_id STRING,
    payload ROW<data STRING, metadata STRING>,
    PRIMARY KEY (record_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'reliable_data',
    'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
    'sink.buffer-flush.max-size' = '2mb',
    'sink.buffer-flush.max-rows' = '1000',
    'sink.buffer-flush.interval' = '10s'
);

Dead Letter Queue Pattern:

-- Main sink with error handling
INSERT INTO main_hbase_sink
SELECT * FROM input_stream
WHERE is_valid_record(data);

-- Error records to dead letter queue  
INSERT INTO error_sink
SELECT *, 'validation_failed' AS error_reason
FROM input_stream  
WHERE NOT is_valid_record(data);

Performance Tuning Guidelines

Buffer Size Tuning:

  • Start with default 2MB buffer size
  • Increase for batch workloads (up to 64MB)
  • Decrease for low-latency requirements (down to 100KB)
  • Monitor memory usage and adjust accordingly

Row Count Tuning:

  • Default 1000 rows works for most scenarios
  • Increase for small records (up to 100K rows)
  • Decrease for large records (down to 10 rows)
  • Balance with memory constraints

Flush Interval Tuning:

  • Default 1 second provides good balance
  • Decrease for real-time applications (100ms-500ms)
  • Increase for batch processing (10s-60s)
  • Consider checkpoint interval alignment

Complete Performance Configuration:

CREATE TABLE tuned_sink (
    key STRING,
    large_payload ROW<
        json_data STRING,        -- Large JSON payloads
        binary_data VARBINARY,   -- Binary attachments
        metadata ROW<
            size_bytes BIGINT,
            content_type STRING,
            created_at TIMESTAMP(3)
        >
    >,
    PRIMARY KEY (key) NOT ENFORCED
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'large_objects',
    'zookeeper.quorum' = 'localhost:2181',
    
    -- Tuned for large records
    'sink.buffer-flush.max-size' = '32mb',      -- Large buffer for big records
    'sink.buffer-flush.max-rows' = '100',       -- Few rows due to size
    'sink.buffer-flush.interval' = '15s',       -- Longer interval for batching
    'sink.parallelism' = '4'                    -- Moderate parallelism
);

Monitoring and Metrics

Key Metrics to Monitor:

  • Buffer flush frequency and size
  • Write throughput (records/second)
  • Write latency (end-to-end)
  • Error rates and types
  • Memory usage per sink task

Monitoring Query Example:

-- Monitor sink performance
SELECT 
    window_start,
    window_end,
    COUNT(*) AS records_written,
    COUNT(DISTINCT rowkey) AS unique_keys,
    COUNT(*) / EXTRACT(EPOCH FROM (window_end - window_start)) AS records_per_second
FROM TABLE(
    TUMBLE(TABLE sink_monitoring, DESCRIPTOR(proc_time), INTERVAL '1' MINUTE)
) 
GROUP BY window_start, window_end;

Troubleshooting Common Issues

High Memory Usage:

  • Reduce sink.buffer-flush.max-size
  • Decrease sink.parallelism
  • Monitor heap usage and adjust JVM settings

Slow Write Performance:

  • Increase sink.buffer-flush.max-size
  • Increase sink.parallelism
  • Check HBase region distribution
  • Verify network connectivity

Data Loss Concerns:

  • Enable Flink checkpointing
  • Verify exactly-once configuration
  • Monitor checkpoint success rates
  • Check HBase write acknowledgments

Connection Issues:

  • Verify Zookeeper connectivity
  • Check HBase region server health
  • Review network configuration
  • Monitor connection pool exhaustion

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hbase-2-2-2-11

docs

connection-config.md

data-types.md

index.md

lookup-operations.md

sink-operations.md

sql-ddl.md

tile.json