CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-hbase-1-4-2-11

Apache Flink HBase 1.4 Connector provides bidirectional data integration between Flink streaming applications and HBase NoSQL database.

Pending
Overview
Eval results
Files

source-operations.mddocs/

Source Operations

Reading data from HBase tables with support for batch and lookup operations, region-aware splitting, and configurable caching in the Apache Flink HBase 1.4 Connector.

Capabilities

HBaseDynamicTableSource

Table source implementation that enables reading data from HBase tables through Flink's Table API and SQL.

/**
 * HBase table source implementation for reading data from HBase tables
 * Extends AbstractHBaseDynamicTableSource with HBase 1.4 specific functionality
 */
@Internal
public class HBaseDynamicTableSource extends AbstractHBaseDynamicTableSource {
    
    /**
     * Creates a new HBase dynamic table source
     * @param conf Hadoop configuration for HBase connection
     * @param tableName Name of the HBase table to read from
     * @param hbaseSchema Schema mapping for the HBase table
     * @param nullStringLiteral String representation for null values
     * @param lookupOptions Configuration for lookup operations and caching
     */
    public HBaseDynamicTableSource(
        Configuration conf,
        String tableName,
        HBaseTableSchema hbaseSchema,
        String nullStringLiteral,
        HBaseLookupOptions lookupOptions
    );
    
    /**
     * Creates a copy of this table source for parallel execution
     * @return New HBaseDynamicTableSource instance with same configuration
     */
    public DynamicTableSource copy();
    
    /**
     * Returns the input format for reading HBase data
     * @return HBaseRowDataInputFormat configured for this table
     */
    public InputFormat<RowData, ?> getInputFormat();
    
    /**
     * Returns the lookup options configuration for testing purposes
     * @return HBaseLookupOptions instance with caching and retry settings
     */
    @VisibleForTesting
    public HBaseLookupOptions getLookupOptions();
}

Usage Example:

// Example: Reading from HBase table in Flink job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create HBase source table
tableEnv.executeSql(
    "CREATE TABLE user_profiles (" +
    "  user_id STRING," +
    "  profile ROW<name STRING, age INT, email STRING>," +
    "  activity ROW<last_login TIMESTAMP(3), login_count BIGINT>," +
    "  PRIMARY KEY (user_id) NOT ENFORCED" +
    ") WITH (" +
    "  'connector' = 'hbase-1.4'," +
    "  'table-name' = 'user_table'," +
    "  'zookeeper.quorum' = 'localhost:2181'" +
    ")"
);

// Query HBase data
Table result = tableEnv.sqlQuery(
    "SELECT user_id, profile.name, activity.login_count " +
    "FROM user_profiles " +
    "WHERE activity.login_count > 10"
);

HBaseRowDataInputFormat

Input format implementation that handles the actual reading of data from HBase tables and conversion to Flink's RowData format.

/**
 * InputFormat subclass that wraps access for HBase tables
 * Returns results as RowData for integration with Flink's Table API
 */
public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> {
    
    /**
     * Creates a new HBase row data input format
     * @param conf Hadoop configuration for HBase connection
     * @param tableName Name of the HBase table to read from
     * @param schema HBase table schema for data conversion
     * @param nullStringLiteral String representation for null values
     */
    public HBaseRowDataInputFormat(
        Configuration conf,
        String tableName,
        HBaseTableSchema schema,
        String nullStringLiteral
    );
    
    /**
     * Initializes the table connection and serialization components
     * @throws IOException if connection cannot be established
     */
    protected void initTable() throws IOException;
    
    /**
     * Creates an HBase Scan object for reading data
     * @return Configured Scan object for the table
     */
    protected Scan getScanner();
    
    /**
     * Returns the name of the HBase table being read
     * @return Table name as configured
     */
    public String getTableName();
    
    /**
     * Converts HBase Result to Flink RowData format
     * @param res HBase Result object from table scan
     * @return RowData representation of the HBase row
     */
    protected RowData mapResultToOutType(Result res);
}

AbstractTableInputFormat

Base class providing common functionality for all HBase input formats, including connection management, splitting, and error handling.

/**
 * Abstract InputFormat to read data from HBase tables
 * Provides common functionality for connection management and data reading
 */
@Internal
public abstract class AbstractTableInputFormat<T> 
    extends RichInputFormat<T, TableInputSplit> {
    
    /**
     * Opens a connection for reading a specific table split
     * @param split The input split to read from
     * @throws IOException if the split cannot be opened
     */
    public void open(TableInputSplit split) throws IOException;
    
    /**
     * Reads the next record from the HBase table
     * @param reuse Reusable object for the result (can be null)
     * @return Next record of type T, or null if end is reached
     * @throws IOException if reading fails
     */
    public T nextRecord(T reuse) throws IOException;
    
    /**
     * Checks if the end of the input has been reached
     * @return true if no more records are available
     * @throws IOException if status check fails
     */
    public boolean reachedEnd() throws IOException;
    
    /**
     * Closes all connections and resources
     * @throws IOException if cleanup fails
     */
    public void close() throws IOException;
    
    /**
     * Creates input splits for parallel reading based on HBase regions
     * @param minNumSplits Minimum number of splits to create
     * @return Array of TableInputSplit objects for parallel execution
     * @throws IOException if split creation fails
     */
    public TableInputSplit[] createInputSplits(int minNumSplits) throws IOException;
    
    /**
     * Returns an input split assigner for the given splits
     * @param inputSplits Array of input splits to assign
     * @return InputSplitAssigner for managing split distribution
     */
    public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits);
    
    /**
     * Returns statistics about the input data (not implemented)
     * @param cachedStatistics Previously cached statistics
     * @return null (statistics not supported)
     */
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics);
    
    // Abstract methods to be implemented by subclasses
    protected abstract void initTable() throws IOException;
    protected abstract Scan getScanner();
    protected abstract String getTableName();
    protected abstract T mapResultToOutType(Result r);
}

Region-Aware Splitting

The HBase connector automatically creates input splits based on HBase table regions for optimal parallel processing.

Split Creation Process:

  1. Region Discovery: Queries HBase for region start/end keys
  2. Scan Range Mapping: Maps Flink scan ranges to HBase regions
  3. Split Generation: Creates one split per relevant region
  4. Locality Preservation: Assigns splits to nodes hosting the regions

Performance Benefits:

  • Parallel reading across multiple regions
  • Data locality optimization
  • Automatic load balancing
  • Scan pushdown to HBase region servers
// Example: The connector automatically handles splitting
// No manual configuration required - splits are created based on:
// - HBase table regions
// - Scan start/stop keys
// - Region server locations

Lookup Operations

Lookup Join Support

HBase tables can be used as dimension tables in lookup joins with configurable caching and retry mechanisms.

Lookup Join Example:

-- Orders stream joined with customer dimension table in HBase
SELECT 
    o.order_id,
    o.amount,
    c.customer.name,
    c.customer.segment
FROM orders_stream o
JOIN customer_hbase FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id;

Caching Configuration

Lookup operations support configurable caching to reduce HBase load and improve performance.

-- Configure lookup caching
CREATE TABLE customer_dim (
    customer_id STRING,
    customer ROW<name STRING, segment STRING, region STRING>,
    PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'customers',
    'zookeeper.quorum' = 'localhost:2181',
    'lookup.cache.max-rows' = '50000',
    'lookup.cache.ttl' = '10min',
    'lookup.max-retries' = '3'
);

Error Handling and Resilience

Connection Management

The source operations include comprehensive error handling for connection failures and timeouts.

Automatic Recovery Features:

  • Connection retry with exponential backoff
  • Scanner recreation on timeout
  • Region failover handling
  • Checkpoint-based recovery

Exception Types

// Common exceptions and their handling:

// TableNotFoundException: Thrown when HBase table doesn't exist
try {
    // Table operations
} catch (TableNotFoundException e) {
    throw new RuntimeException("HBase table '" + tableName + "' not found.", e);
}

// IOException: Connection and I/O failures
// Automatically retried with configurable limits

// Timeout exceptions: Scanner automatically recreated
// Progress tracked by current row key for resumption

Configuration for Resilience:

CREATE TABLE resilient_source (
    -- Table definition
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'my_table',
    'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
    'lookup.max-retries' = '5',
    -- Multiple Zookeeper nodes for failover
    -- Retry configuration for lookup operations
);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-hbase-1-4-2-11

docs

index.md

lookup-options.md

sink-operations.md

source-operations.md

table-factory.md

write-options.md

tile.json