CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-hbase-1-4-2-11

Apache Flink HBase 1.4 Connector provides bidirectional data integration between Flink streaming applications and HBase NoSQL database.

Pending
Overview
Eval results
Files

table-factory.mddocs/

Table Factory and Configuration

Core factory class for creating HBase table sources and sinks with comprehensive configuration options for the Apache Flink HBase 1.4 Connector.

Capabilities

HBase1DynamicTableFactory

Main factory class that implements both DynamicTableSourceFactory and DynamicTableSinkFactory interfaces to provide HBase table integration with Flink's Table API.

/**
 * HBase connector factory for creating dynamic table sources and sinks
 * Supports connector identifier "hbase-1.4" in Flink SQL DDL statements
 */
@Internal
public class HBase1DynamicTableFactory 
    implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    
    /**
     * Creates a dynamic table source for reading from HBase tables
     * @param context Factory context containing catalog table information and options
     * @return HBaseDynamicTableSource configured for the specified table
     */
    public DynamicTableSource createDynamicTableSource(Context context);
    
    /**
     * Creates a dynamic table sink for writing to HBase tables
     * @param context Factory context containing catalog table information and options
     * @return HBaseDynamicTableSink configured for the specified table
     */
    public DynamicTableSink createDynamicTableSink(Context context);
    
    /**
     * Returns the unique identifier for this connector factory
     * @return String identifier "hbase-1.4" used in CREATE TABLE statements
     */
    public String factoryIdentifier();
    
    /**
     * Returns the set of required configuration options
     * @return Set containing TABLE_NAME and ZOOKEEPER_QUORUM options
     */
    public Set<ConfigOption<?>> requiredOptions();
    
    /**
     * Returns the set of optional configuration options
     * @return Set containing all optional configuration parameters
     */
    public Set<ConfigOption<?>> optionalOptions();
}

Usage Example:

CREATE TABLE my_hbase_table (
    rowkey STRING,
    cf1 ROW<col1 STRING, col2 BIGINT>,
    cf2 ROW<status BOOLEAN, timestamp TIMESTAMP(3)>,
    PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'my_table',
    'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
    'zookeeper.znode.parent' = '/hbase'
);

Configuration Options

Required Options

Configuration parameters that must be specified when creating HBase tables.

// Required: Name of the HBase table to connect to
public static final ConfigOption<String> TABLE_NAME =
    ConfigOptions.key("table-name")
        .stringType()
        .noDefaultValue()
        .withDescription("The name of HBase table to connect.");

// Required: HBase Zookeeper quorum for cluster connection
public static final ConfigOption<String> ZOOKEEPER_QUORUM =
    ConfigOptions.key("zookeeper.quorum")
        .stringType()
        .noDefaultValue()
        .withDescription("The HBase Zookeeper quorum.");

Optional Options

Configuration parameters with default values that can be customized for specific use cases.

// Optional: Zookeeper root directory for HBase cluster
public static final ConfigOption<String> ZOOKEEPER_ZNODE_PARENT =
    ConfigOptions.key("zookeeper.znode.parent")
        .stringType()
        .defaultValue("/hbase")
        .withDescription("The root dir in Zookeeper for HBase cluster.");

// Optional: Null value representation for string fields
public static final ConfigOption<String> NULL_STRING_LITERAL =
    ConfigOptions.key("null-string-literal")
        .stringType()
        .defaultValue("null")
        .withDescription("Representation for null values for string fields.");

// Optional: Maximum buffer size for write operations
public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_SIZE =
    ConfigOptions.key("sink.buffer-flush.max-size")
        .memoryType()
        .defaultValue(MemorySize.parse("2mb"))
        .withDescription("Maximum size in memory of buffered rows for each writing request.");

// Optional: Maximum number of buffered rows
public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
    ConfigOptions.key("sink.buffer-flush.max-rows")
        .intType()
        .defaultValue(1000)
        .withDescription("Maximum number of rows to buffer for each writing request.");

// Optional: Buffer flush interval
public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
    ConfigOptions.key("sink.buffer-flush.interval")
        .durationType()
        .defaultValue(Duration.ofSeconds(1))
        .withDescription("The interval to flush any buffered rows.");

// Optional: Sink operator parallelism
public static final ConfigOption<Integer> SINK_PARALLELISM =
    FactoryUtil.SINK_PARALLELISM;

// Optional: Enable async lookup operations
public static final ConfigOption<Boolean> LOOKUP_ASYNC =
    ConfigOptions.key("lookup.async")
        .booleanType()
        .defaultValue(false)
        .withDescription("whether to set async lookup.");

// Optional: Maximum lookup cache size
public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
    ConfigOptions.key("lookup.cache.max-rows")
        .longType()
        .defaultValue(-1L)
        .withDescription("the max number of rows of lookup cache.");

// Optional: Lookup cache time-to-live
public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
    ConfigOptions.key("lookup.cache.ttl")
        .durationType()
        .defaultValue(Duration.ofSeconds(0))
        .withDescription("the cache time to live.");

// Optional: Maximum lookup retry attempts
public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
    ConfigOptions.key("lookup.max-retries")
        .intType()
        .defaultValue(3)
        .withDescription("the max retry times if lookup database failed.");

Configuration Example:

CREATE TABLE orders (
    order_id STRING,
    customer ROW<id STRING, name STRING>,
    order_details ROW<amount DECIMAL(10,2), status STRING, created_at TIMESTAMP(3)>,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'orders_table',
    'zookeeper.quorum' = 'zk1:2181,zk2:2181',
    'zookeeper.znode.parent' = '/hbase',
    'null-string-literal' = 'NULL',
    'sink.buffer-flush.max-size' = '4mb',
    'sink.buffer-flush.max-rows' = '2000',
    'sink.buffer-flush.interval' = '2s',
    'lookup.cache.max-rows' = '10000',
    'lookup.cache.ttl' = '5min',
    'lookup.max-retries' = '5'
);

Integration with Flink Table API

Table Schema Requirements

HBase tables in Flink require a primary key definition that maps to the HBase row key. The schema supports column families through nested ROW types.

Schema Validation:

  • Primary key must be defined (maps to HBase row key)
  • Column families represented as ROW types
  • All Flink data types supported through serialization

Supported Data Types:

  • Primitive types: STRING, BIGINT, INT, DOUBLE, FLOAT, BOOLEAN, etc.
  • Timestamp types: TIMESTAMP(3), TIMESTAMP_LTZ(3)
  • Complex types: ROW (for column families), ARRAY, MAP
  • Decimal types: DECIMAL(precision, scale)

Error Handling

The factory provides comprehensive error handling and validation:

  • Table validation: Ensures primary key is properly defined
  • Configuration validation: Validates required options are present
  • Connection validation: Verifies HBase cluster connectivity
  • Schema validation: Ensures schema compatibility with HBase storage model

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-hbase-1-4-2-11

docs

index.md

lookup-options.md

sink-operations.md

source-operations.md

table-factory.md

write-options.md

tile.json