Apache Flink HBase 1.4 Connector provides bidirectional data integration between Flink streaming applications and HBase NoSQL database.
—
Core factory class for creating HBase table sources and sinks with comprehensive configuration options for the Apache Flink HBase 1.4 Connector.
Main factory class that implements both DynamicTableSourceFactory and DynamicTableSinkFactory interfaces to provide HBase table integration with Flink's Table API.
/**
* HBase connector factory for creating dynamic table sources and sinks
* Supports connector identifier "hbase-1.4" in Flink SQL DDL statements
*/
@Internal
public class HBase1DynamicTableFactory
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
/**
* Creates a dynamic table source for reading from HBase tables
* @param context Factory context containing catalog table information and options
* @return HBaseDynamicTableSource configured for the specified table
*/
public DynamicTableSource createDynamicTableSource(Context context);
/**
* Creates a dynamic table sink for writing to HBase tables
* @param context Factory context containing catalog table information and options
* @return HBaseDynamicTableSink configured for the specified table
*/
public DynamicTableSink createDynamicTableSink(Context context);
/**
* Returns the unique identifier for this connector factory
* @return String identifier "hbase-1.4" used in CREATE TABLE statements
*/
public String factoryIdentifier();
/**
* Returns the set of required configuration options
* @return Set containing TABLE_NAME and ZOOKEEPER_QUORUM options
*/
public Set<ConfigOption<?>> requiredOptions();
/**
* Returns the set of optional configuration options
* @return Set containing all optional configuration parameters
*/
public Set<ConfigOption<?>> optionalOptions();
}Usage Example:
CREATE TABLE my_hbase_table (
rowkey STRING,
cf1 ROW<col1 STRING, col2 BIGINT>,
cf2 ROW<status BOOLEAN, timestamp TIMESTAMP(3)>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'my_table',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
'zookeeper.znode.parent' = '/hbase'
);Configuration parameters that must be specified when creating HBase tables.
// Required: Name of the HBase table to connect to
public static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("The name of HBase table to connect.");
// Required: HBase Zookeeper quorum for cluster connection
public static final ConfigOption<String> ZOOKEEPER_QUORUM =
ConfigOptions.key("zookeeper.quorum")
.stringType()
.noDefaultValue()
.withDescription("The HBase Zookeeper quorum.");Configuration parameters with default values that can be customized for specific use cases.
// Optional: Zookeeper root directory for HBase cluster
public static final ConfigOption<String> ZOOKEEPER_ZNODE_PARENT =
ConfigOptions.key("zookeeper.znode.parent")
.stringType()
.defaultValue("/hbase")
.withDescription("The root dir in Zookeeper for HBase cluster.");
// Optional: Null value representation for string fields
public static final ConfigOption<String> NULL_STRING_LITERAL =
ConfigOptions.key("null-string-literal")
.stringType()
.defaultValue("null")
.withDescription("Representation for null values for string fields.");
// Optional: Maximum buffer size for write operations
public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_SIZE =
ConfigOptions.key("sink.buffer-flush.max-size")
.memoryType()
.defaultValue(MemorySize.parse("2mb"))
.withDescription("Maximum size in memory of buffered rows for each writing request.");
// Optional: Maximum number of buffered rows
public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
ConfigOptions.key("sink.buffer-flush.max-rows")
.intType()
.defaultValue(1000)
.withDescription("Maximum number of rows to buffer for each writing request.");
// Optional: Buffer flush interval
public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
ConfigOptions.key("sink.buffer-flush.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription("The interval to flush any buffered rows.");
// Optional: Sink operator parallelism
public static final ConfigOption<Integer> SINK_PARALLELISM =
FactoryUtil.SINK_PARALLELISM;
// Optional: Enable async lookup operations
public static final ConfigOption<Boolean> LOOKUP_ASYNC =
ConfigOptions.key("lookup.async")
.booleanType()
.defaultValue(false)
.withDescription("whether to set async lookup.");
// Optional: Maximum lookup cache size
public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
ConfigOptions.key("lookup.cache.max-rows")
.longType()
.defaultValue(-1L)
.withDescription("the max number of rows of lookup cache.");
// Optional: Lookup cache time-to-live
public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
ConfigOptions.key("lookup.cache.ttl")
.durationType()
.defaultValue(Duration.ofSeconds(0))
.withDescription("the cache time to live.");
// Optional: Maximum lookup retry attempts
public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
ConfigOptions.key("lookup.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if lookup database failed.");Configuration Example:
CREATE TABLE orders (
order_id STRING,
customer ROW<id STRING, name STRING>,
order_details ROW<amount DECIMAL(10,2), status STRING, created_at TIMESTAMP(3)>,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'orders_table',
'zookeeper.quorum' = 'zk1:2181,zk2:2181',
'zookeeper.znode.parent' = '/hbase',
'null-string-literal' = 'NULL',
'sink.buffer-flush.max-size' = '4mb',
'sink.buffer-flush.max-rows' = '2000',
'sink.buffer-flush.interval' = '2s',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '5min',
'lookup.max-retries' = '5'
);HBase tables in Flink require a primary key definition that maps to the HBase row key. The schema supports column families through nested ROW types.
Schema Validation:
Supported Data Types:
The factory provides comprehensive error handling and validation:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-hbase-1-4-2-11