Apache Flink HBase 1.4 Connector provides bidirectional data integration between Flink streaming applications and HBase NoSQL database.
—
Reading data from HBase tables with support for batch and lookup operations, region-aware splitting, and configurable caching in the Apache Flink HBase 1.4 Connector.
Table source implementation that enables reading data from HBase tables through Flink's Table API and SQL.
/**
* HBase table source implementation for reading data from HBase tables
* Extends AbstractHBaseDynamicTableSource with HBase 1.4 specific functionality
*/
@Internal
public class HBaseDynamicTableSource extends AbstractHBaseDynamicTableSource {
/**
* Creates a new HBase dynamic table source
* @param conf Hadoop configuration for HBase connection
* @param tableName Name of the HBase table to read from
* @param hbaseSchema Schema mapping for the HBase table
* @param nullStringLiteral String representation for null values
* @param lookupOptions Configuration for lookup operations and caching
*/
public HBaseDynamicTableSource(
Configuration conf,
String tableName,
HBaseTableSchema hbaseSchema,
String nullStringLiteral,
HBaseLookupOptions lookupOptions
);
/**
* Creates a copy of this table source for parallel execution
* @return New HBaseDynamicTableSource instance with same configuration
*/
public DynamicTableSource copy();
/**
* Returns the input format for reading HBase data
* @return HBaseRowDataInputFormat configured for this table
*/
public InputFormat<RowData, ?> getInputFormat();
/**
* Returns the lookup options configuration for testing purposes
* @return HBaseLookupOptions instance with caching and retry settings
*/
@VisibleForTesting
public HBaseLookupOptions getLookupOptions();
}Usage Example:
// Example: Reading from HBase table in Flink job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Create HBase source table
tableEnv.executeSql(
"CREATE TABLE user_profiles (" +
" user_id STRING," +
" profile ROW<name STRING, age INT, email STRING>," +
" activity ROW<last_login TIMESTAMP(3), login_count BIGINT>," +
" PRIMARY KEY (user_id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'hbase-1.4'," +
" 'table-name' = 'user_table'," +
" 'zookeeper.quorum' = 'localhost:2181'" +
")"
);
// Query HBase data
Table result = tableEnv.sqlQuery(
"SELECT user_id, profile.name, activity.login_count " +
"FROM user_profiles " +
"WHERE activity.login_count > 10"
);Input format implementation that handles the actual reading of data from HBase tables and conversion to Flink's RowData format.
/**
* InputFormat subclass that wraps access for HBase tables
* Returns results as RowData for integration with Flink's Table API
*/
public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> {
/**
* Creates a new HBase row data input format
* @param conf Hadoop configuration for HBase connection
* @param tableName Name of the HBase table to read from
* @param schema HBase table schema for data conversion
* @param nullStringLiteral String representation for null values
*/
public HBaseRowDataInputFormat(
Configuration conf,
String tableName,
HBaseTableSchema schema,
String nullStringLiteral
);
/**
* Initializes the table connection and serialization components
* @throws IOException if connection cannot be established
*/
protected void initTable() throws IOException;
/**
* Creates an HBase Scan object for reading data
* @return Configured Scan object for the table
*/
protected Scan getScanner();
/**
* Returns the name of the HBase table being read
* @return Table name as configured
*/
public String getTableName();
/**
* Converts HBase Result to Flink RowData format
* @param res HBase Result object from table scan
* @return RowData representation of the HBase row
*/
protected RowData mapResultToOutType(Result res);
}Base class providing common functionality for all HBase input formats, including connection management, splitting, and error handling.
/**
* Abstract InputFormat to read data from HBase tables
* Provides common functionality for connection management and data reading
*/
@Internal
public abstract class AbstractTableInputFormat<T>
extends RichInputFormat<T, TableInputSplit> {
/**
* Opens a connection for reading a specific table split
* @param split The input split to read from
* @throws IOException if the split cannot be opened
*/
public void open(TableInputSplit split) throws IOException;
/**
* Reads the next record from the HBase table
* @param reuse Reusable object for the result (can be null)
* @return Next record of type T, or null if end is reached
* @throws IOException if reading fails
*/
public T nextRecord(T reuse) throws IOException;
/**
* Checks if the end of the input has been reached
* @return true if no more records are available
* @throws IOException if status check fails
*/
public boolean reachedEnd() throws IOException;
/**
* Closes all connections and resources
* @throws IOException if cleanup fails
*/
public void close() throws IOException;
/**
* Creates input splits for parallel reading based on HBase regions
* @param minNumSplits Minimum number of splits to create
* @return Array of TableInputSplit objects for parallel execution
* @throws IOException if split creation fails
*/
public TableInputSplit[] createInputSplits(int minNumSplits) throws IOException;
/**
* Returns an input split assigner for the given splits
* @param inputSplits Array of input splits to assign
* @return InputSplitAssigner for managing split distribution
*/
public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits);
/**
* Returns statistics about the input data (not implemented)
* @param cachedStatistics Previously cached statistics
* @return null (statistics not supported)
*/
public BaseStatistics getStatistics(BaseStatistics cachedStatistics);
// Abstract methods to be implemented by subclasses
protected abstract void initTable() throws IOException;
protected abstract Scan getScanner();
protected abstract String getTableName();
protected abstract T mapResultToOutType(Result r);
}The HBase connector automatically creates input splits based on HBase table regions for optimal parallel processing.
Split Creation Process:
Performance Benefits:
// Example: The connector automatically handles splitting
// No manual configuration required - splits are created based on:
// - HBase table regions
// - Scan start/stop keys
// - Region server locationsHBase tables can be used as dimension tables in lookup joins with configurable caching and retry mechanisms.
Lookup Join Example:
-- Orders stream joined with customer dimension table in HBase
SELECT
o.order_id,
o.amount,
c.customer.name,
c.customer.segment
FROM orders_stream o
JOIN customer_hbase FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id;Lookup operations support configurable caching to reduce HBase load and improve performance.
-- Configure lookup caching
CREATE TABLE customer_dim (
customer_id STRING,
customer ROW<name STRING, segment STRING, region STRING>,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'customers',
'zookeeper.quorum' = 'localhost:2181',
'lookup.cache.max-rows' = '50000',
'lookup.cache.ttl' = '10min',
'lookup.max-retries' = '3'
);The source operations include comprehensive error handling for connection failures and timeouts.
Automatic Recovery Features:
// Common exceptions and their handling:
// TableNotFoundException: Thrown when HBase table doesn't exist
try {
// Table operations
} catch (TableNotFoundException e) {
throw new RuntimeException("HBase table '" + tableName + "' not found.", e);
}
// IOException: Connection and I/O failures
// Automatically retried with configurable limits
// Timeout exceptions: Scanner automatically recreated
// Progress tracked by current row key for resumptionConfiguration for Resilience:
CREATE TABLE resilient_source (
-- Table definition
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'my_table',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
'lookup.max-retries' = '5',
-- Multiple Zookeeper nodes for failover
-- Retry configuration for lookup operations
);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-hbase-1-4-2-11