Apache Flink HBase 1.4 Connector provides bidirectional data integration between Flink streaming applications and HBase NoSQL database.
—
Writing data to HBase tables with configurable buffering, batching, and exactly-once processing guarantees in the Apache Flink HBase 1.4 Connector.
Table sink implementation that enables writing data to HBase tables through Flink's Table API and SQL with configurable write options and change data capture support.
/**
* HBase table sink implementation for writing data to HBase tables
* Supports UPSERT operations with configurable buffering and batching
*/
@Internal
public class HBaseDynamicTableSink implements DynamicTableSink {
/**
* Creates a new HBase dynamic table sink
* @param tableName Name of the HBase table to write to
* @param hbaseTableSchema Schema mapping for the HBase table
* @param hbaseConf Hadoop configuration for HBase connection
* @param writeOptions Configuration for buffering and write performance
* @param nullStringLiteral String representation for null values
*/
public HBaseDynamicTableSink(
String tableName,
HBaseTableSchema hbaseTableSchema,
Configuration hbaseConf,
HBaseWriteOptions writeOptions,
String nullStringLiteral
);
/**
* Returns the sink runtime provider with configured HBase sink function
* @param context Sink context for runtime configuration
* @return SinkFunctionProvider with HBaseSinkFunction and parallelism settings
*/
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
/**
* Returns the supported changelog mode for this sink
* @param requestedMode The changelog mode requested by the planner
* @return ChangelogMode supporting INSERT, UPDATE_AFTER, and DELETE operations
*/
public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
/**
* Creates a copy of this table sink for parallel execution
* @return New HBaseDynamicTableSink instance with same configuration
*/
public DynamicTableSink copy();
/**
* Returns a string summary of this sink
* @return "HBase" identifier string
*/
public String asSummaryString();
// Testing methods
/**
* Returns the HBase table schema for testing purposes
* @return HBaseTableSchema instance with column family mappings
*/
@VisibleForTesting
public HBaseTableSchema getHBaseTableSchema();
/**
* Returns the write options configuration for testing purposes
* @return HBaseWriteOptions instance with buffering settings
*/
@VisibleForTesting
public HBaseWriteOptions getWriteOptions();
/**
* Returns the Hadoop configuration for testing purposes
* @return Configuration instance with HBase connection settings
*/
@VisibleForTesting
public Configuration getConfiguration();
/**
* Returns the table name for testing purposes
* @return String name of the target HBase table
*/
@VisibleForTesting
public String getTableName();
}Usage Example:
// Example: Writing streaming data to HBase
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Create HBase sink table
tableEnv.executeSql(
"CREATE TABLE user_activity_sink (" +
" user_id STRING," +
" activity ROW<event_type STRING, timestamp TIMESTAMP(3), value DOUBLE>," +
" metadata ROW<source STRING, processed_time TIMESTAMP(3)>," +
" PRIMARY KEY (user_id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'hbase-1.4'," +
" 'table-name' = 'user_events'," +
" 'zookeeper.quorum' = 'localhost:2181'," +
" 'sink.buffer-flush.max-size' = '4mb'," +
" 'sink.buffer-flush.max-rows' = '2000'," +
" 'sink.buffer-flush.interval' = '2s'" +
")"
);
// Insert data into HBase
tableEnv.executeSql(
"INSERT INTO user_activity_sink " +
"SELECT user_id, activity, metadata FROM source_stream"
);The HBase sink supports UPSERT (INSERT/UPDATE/DELETE) operations through Flink's changelog mode, mapping to HBase's natural key-value storage model.
/**
* Supported row change types:
* - INSERT: Creates new HBase row or overwrites existing
* - UPDATE_AFTER: Updates existing HBase row (same as INSERT in HBase)
* - DELETE: Removes HBase row
*
* UPDATE_BEFORE operations are filtered out as HBase doesn't need them
*/
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (RowKind kind : requestedMode.getContainedKinds()) {
if (kind != RowKind.UPDATE_BEFORE) {
builder.addContainedKind(kind);
}
}
return builder.build();
}Changelog Example:
-- Example: Processing CDC stream to HBase
CREATE TABLE orders_cdc (
order_id STRING,
customer_id STRING,
amount DECIMAL(10,2),
status STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders-cdc',
'format' = 'debezium-json'
);
CREATE TABLE orders_hbase (
order_id STRING,
order_info ROW<customer_id STRING, amount DECIMAL(10,2), status STRING>,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'orders',
'zookeeper.quorum' = 'localhost:2181'
);
-- Process CDC events: INSERT, UPDATE, DELETE automatically handled
INSERT INTO orders_hbase
SELECT
order_id,
ROW(customer_id, amount, status) as order_info
FROM orders_cdc;The sink provides multiple buffering strategies to optimize write throughput and latency trade-offs.
Buffering Options:
-- Example: High-throughput configuration
CREATE TABLE high_volume_sink (
-- Table schema
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'events',
'zookeeper.quorum' = 'localhost:2181',
-- Large buffer for high throughput
'sink.buffer-flush.max-size' = '10mb',
'sink.buffer-flush.max-rows' = '5000',
'sink.buffer-flush.interval' = '5s',
-- High parallelism for write scaling
'sink.parallelism' = '8'
);
-- Example: Low-latency configuration
CREATE TABLE low_latency_sink (
-- Table schema
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'realtime_data',
'zookeeper.quorum' = 'localhost:2181',
-- Small buffer for low latency
'sink.buffer-flush.max-size' = '100kb',
'sink.buffer-flush.max-rows' = '100',
'sink.buffer-flush.interval' = '500ms'
);The sink supports configurable parallelism to scale write operations across multiple HBase region servers.
CREATE TABLE scalable_sink (
-- Table schema
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'large_table',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
-- Scale write operations
'sink.parallelism' = '12',
-- Optimize for distributed writes
'sink.buffer-flush.max-size' = '8mb',
'sink.buffer-flush.max-rows' = '4000'
);The sink automatically handles conversion from Flink's primary key to HBase row key format.
Conversion Rules:
// Example: Row key conversion
// Flink primary key: ("user_123", "2023-01-01")
// HBase row key: "user_123|2023-01-01"Flink ROW types are mapped to HBase column families with individual fields becoming column qualifiers.
-- Flink schema
CREATE TABLE user_data (
user_id STRING,
profile ROW<name STRING, age INT, email STRING>,
settings ROW<theme STRING, notifications BOOLEAN>,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (...);
-- Maps to HBase structure:
-- Row key: user_id value
-- Column family 'profile': columns 'name', 'age', 'email'
-- Column family 'settings': columns 'theme', 'notifications'All Flink data types are automatically serialized to HBase-compatible byte arrays.
Supported Type Conversions:
The sink integrates with Flink's checkpointing mechanism to provide exactly-once processing guarantees.
Reliability Features:
Comprehensive error handling for various failure scenarios:
// Common error scenarios and handling:
// 1. Connection failures
// - Automatic retry with exponential backoff
// - Connection pool management
// - Failover to backup region servers
// 2. Table schema mismatches
// - Schema validation during sink creation
// - Clear error messages for incompatible types
// - Graceful handling of missing column families
// 3. Write buffer overflows
// - Configurable buffer sizes and timeouts
// - Automatic flushing on resource pressure
// - Memory usage monitoring and alerts
// 4. HBase cluster unavailability
// - Circuit breaker pattern for failures
// - Graceful degradation and recovery
// - Integration with Flink's restart strategiesError Configuration:
CREATE TABLE resilient_sink (
-- Table schema
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'critical_data',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
-- Aggressive retry for critical data
'sink.buffer-flush.max-size' = '2mb',
'sink.buffer-flush.interval' = '1s',
-- Multiple ZK nodes for high availability
);The sink provides comprehensive metrics for monitoring write performance and health:
Available Metrics:
Integration with Flink Metrics:
// Metrics are automatically registered with Flink's metric system
// Available in Flink UI and external monitoring systems
// - numRecordsOut: Total records written
// - numBytesOut: Total bytes written
// - currentSendTime: Current write latency
// - bufferUsage: Current buffer utilization percentageInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-hbase-1-4-2-11