CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-hbase-1-4-2-11

Apache Flink HBase 1.4 Connector provides bidirectional data integration between Flink streaming applications and HBase NoSQL database.

Pending
Overview
Eval results
Files

sink-operations.mddocs/

Sink Operations

Writing data to HBase tables with configurable buffering, batching, and exactly-once processing guarantees in the Apache Flink HBase 1.4 Connector.

Capabilities

HBaseDynamicTableSink

Table sink implementation that enables writing data to HBase tables through Flink's Table API and SQL with configurable write options and change data capture support.

/**
 * HBase table sink implementation for writing data to HBase tables
 * Supports UPSERT operations with configurable buffering and batching
 */
@Internal
public class HBaseDynamicTableSink implements DynamicTableSink {
    
    /**
     * Creates a new HBase dynamic table sink
     * @param tableName Name of the HBase table to write to
     * @param hbaseTableSchema Schema mapping for the HBase table
     * @param hbaseConf Hadoop configuration for HBase connection
     * @param writeOptions Configuration for buffering and write performance
     * @param nullStringLiteral String representation for null values
     */
    public HBaseDynamicTableSink(
        String tableName,
        HBaseTableSchema hbaseTableSchema,
        Configuration hbaseConf,
        HBaseWriteOptions writeOptions,
        String nullStringLiteral
    );
    
    /**
     * Returns the sink runtime provider with configured HBase sink function
     * @param context Sink context for runtime configuration
     * @return SinkFunctionProvider with HBaseSinkFunction and parallelism settings
     */
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
    
    /**
     * Returns the supported changelog mode for this sink
     * @param requestedMode The changelog mode requested by the planner
     * @return ChangelogMode supporting INSERT, UPDATE_AFTER, and DELETE operations
     */
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
    
    /**
     * Creates a copy of this table sink for parallel execution
     * @return New HBaseDynamicTableSink instance with same configuration
     */
    public DynamicTableSink copy();
    
    /**
     * Returns a string summary of this sink
     * @return "HBase" identifier string
     */
    public String asSummaryString();
    
    // Testing methods
    /**
     * Returns the HBase table schema for testing purposes
     * @return HBaseTableSchema instance with column family mappings
     */
    @VisibleForTesting
    public HBaseTableSchema getHBaseTableSchema();
    
    /**
     * Returns the write options configuration for testing purposes
     * @return HBaseWriteOptions instance with buffering settings
     */
    @VisibleForTesting
    public HBaseWriteOptions getWriteOptions();
    
    /**
     * Returns the Hadoop configuration for testing purposes
     * @return Configuration instance with HBase connection settings
     */
    @VisibleForTesting
    public Configuration getConfiguration();
    
    /**
     * Returns the table name for testing purposes
     * @return String name of the target HBase table
     */
    @VisibleForTesting
    public String getTableName();
}

Usage Example:

// Example: Writing streaming data to HBase
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create HBase sink table
tableEnv.executeSql(
    "CREATE TABLE user_activity_sink (" +
    "  user_id STRING," +
    "  activity ROW<event_type STRING, timestamp TIMESTAMP(3), value DOUBLE>," +
    "  metadata ROW<source STRING, processed_time TIMESTAMP(3)>," +
    "  PRIMARY KEY (user_id) NOT ENFORCED" +
    ") WITH (" +
    "  'connector' = 'hbase-1.4'," +
    "  'table-name' = 'user_events'," +
    "  'zookeeper.quorum' = 'localhost:2181'," +
    "  'sink.buffer-flush.max-size' = '4mb'," +
    "  'sink.buffer-flush.max-rows' = '2000'," +
    "  'sink.buffer-flush.interval' = '2s'" +
    ")"
);

// Insert data into HBase
tableEnv.executeSql(
    "INSERT INTO user_activity_sink " +
    "SELECT user_id, activity, metadata FROM source_stream"
);

Changelog Mode Support

UPSERT Operations

The HBase sink supports UPSERT (INSERT/UPDATE/DELETE) operations through Flink's changelog mode, mapping to HBase's natural key-value storage model.

/**
 * Supported row change types:
 * - INSERT: Creates new HBase row or overwrites existing
 * - UPDATE_AFTER: Updates existing HBase row (same as INSERT in HBase)
 * - DELETE: Removes HBase row
 * 
 * UPDATE_BEFORE operations are filtered out as HBase doesn't need them
 */
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
    ChangelogMode.Builder builder = ChangelogMode.newBuilder();
    for (RowKind kind : requestedMode.getContainedKinds()) {
        if (kind != RowKind.UPDATE_BEFORE) {
            builder.addContainedKind(kind);
        }
    }
    return builder.build();
}

Changelog Example:

-- Example: Processing CDC stream to HBase
CREATE TABLE orders_cdc (
    order_id STRING,
    customer_id STRING,
    amount DECIMAL(10,2),
    status STRING,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders-cdc',
    'format' = 'debezium-json'
);

CREATE TABLE orders_hbase (
    order_id STRING,
    order_info ROW<customer_id STRING, amount DECIMAL(10,2), status STRING>,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'orders',
    'zookeeper.quorum' = 'localhost:2181'
);

-- Process CDC events: INSERT, UPDATE, DELETE automatically handled
INSERT INTO orders_hbase
SELECT 
    order_id,
    ROW(customer_id, amount, status) as order_info
FROM orders_cdc;

Write Performance Configuration

Buffer Configuration

The sink provides multiple buffering strategies to optimize write throughput and latency trade-offs.

Buffering Options:

  1. Size-based flushing: Flush when buffer reaches specified memory size
  2. Count-based flushing: Flush when buffer reaches specified row count
  3. Time-based flushing: Flush at regular intervals regardless of buffer size
  4. Combined strategies: Use multiple triggers for optimal performance
-- Example: High-throughput configuration
CREATE TABLE high_volume_sink (
    -- Table schema
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'events',
    'zookeeper.quorum' = 'localhost:2181',
    -- Large buffer for high throughput
    'sink.buffer-flush.max-size' = '10mb',
    'sink.buffer-flush.max-rows' = '5000',
    'sink.buffer-flush.interval' = '5s',
    -- High parallelism for write scaling
    'sink.parallelism' = '8'
);

-- Example: Low-latency configuration
CREATE TABLE low_latency_sink (
    -- Table schema
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'realtime_data',
    'zookeeper.quorum' = 'localhost:2181',
    -- Small buffer for low latency
    'sink.buffer-flush.max-size' = '100kb',
    'sink.buffer-flush.max-rows' = '100',
    'sink.buffer-flush.interval' = '500ms'
);

Parallelism Control

The sink supports configurable parallelism to scale write operations across multiple HBase region servers.

CREATE TABLE scalable_sink (
    -- Table schema
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'large_table',
    'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
    -- Scale write operations
    'sink.parallelism' = '12',
    -- Optimize for distributed writes
    'sink.buffer-flush.max-size' = '8mb',
    'sink.buffer-flush.max-rows' = '4000'
);

Data Conversion and Serialization

Row Key Mapping

The sink automatically handles conversion from Flink's primary key to HBase row key format.

Conversion Rules:

  • Simple primary keys: Direct string conversion
  • Composite primary keys: Concatenated with separators
  • Complex types: Serialized to byte arrays
// Example: Row key conversion
// Flink primary key: ("user_123", "2023-01-01")
// HBase row key: "user_123|2023-01-01"

Column Family Mapping

Flink ROW types are mapped to HBase column families with individual fields becoming column qualifiers.

-- Flink schema
CREATE TABLE user_data (
    user_id STRING,
    profile ROW<name STRING, age INT, email STRING>,
    settings ROW<theme STRING, notifications BOOLEAN>,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (...);

-- Maps to HBase structure:
-- Row key: user_id value
-- Column family 'profile': columns 'name', 'age', 'email'  
-- Column family 'settings': columns 'theme', 'notifications'

Type Serialization

All Flink data types are automatically serialized to HBase-compatible byte arrays.

Supported Type Conversions:

  • Primitive types: Direct byte serialization
  • Timestamp types: Long milliseconds representation
  • Decimal types: Precision-preserving byte encoding
  • String types: UTF-8 byte encoding
  • Complex types: JSON or binary serialization

Error Handling and Reliability

Exactly-Once Guarantees

The sink integrates with Flink's checkpointing mechanism to provide exactly-once processing guarantees.

Reliability Features:

  • Transactional writes aligned with Flink checkpoints
  • Automatic retry on temporary failures
  • Dead letter queue support for failed records
  • Connection pooling and management

Exception Handling

Comprehensive error handling for various failure scenarios:

// Common error scenarios and handling:

// 1. Connection failures
// - Automatic retry with exponential backoff
// - Connection pool management
// - Failover to backup region servers

// 2. Table schema mismatches
// - Schema validation during sink creation
// - Clear error messages for incompatible types
// - Graceful handling of missing column families

// 3. Write buffer overflows
// - Configurable buffer sizes and timeouts
// - Automatic flushing on resource pressure
// - Memory usage monitoring and alerts

// 4. HBase cluster unavailability
// - Circuit breaker pattern for failures
// - Graceful degradation and recovery
// - Integration with Flink's restart strategies

Error Configuration:

CREATE TABLE resilient_sink (
    -- Table schema
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'critical_data',
    'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
    -- Aggressive retry for critical data
    'sink.buffer-flush.max-size' = '2mb',
    'sink.buffer-flush.interval' = '1s',
    -- Multiple ZK nodes for high availability
);

Monitoring and Metrics

The sink provides comprehensive metrics for monitoring write performance and health:

Available Metrics:

  • Write throughput (records/second, bytes/second)
  • Buffer utilization and flush frequency
  • Error rates and retry counts
  • Connection pool status
  • Latency percentiles for write operations

Integration with Flink Metrics:

// Metrics are automatically registered with Flink's metric system
// Available in Flink UI and external monitoring systems
// - numRecordsOut: Total records written
// - numBytesOut: Total bytes written  
// - currentSendTime: Current write latency
// - bufferUsage: Current buffer utilization percentage

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-hbase-1-4-2-11

docs

index.md

lookup-options.md

sink-operations.md

source-operations.md

table-factory.md

write-options.md

tile.json