Apache Flink HBase 1.4 Connector provides bidirectional data integration between Flink streaming applications and HBase NoSQL database.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-hbase-1-4-2-11@1.14.0Apache Flink HBase 1.4 Connector provides comprehensive bidirectional data integration between Apache Flink stream processing framework and HBase 1.4 NoSQL database. The connector enables reading from and writing to HBase tables through Flink's Table API and SQL, with support for exactly-once processing guarantees and high-performance stream processing applications.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-1.4_2.11</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.connector.hbase1.HBase1DynamicTableFactory;
import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource;
import org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.table.HBaseConnectorOptions;CREATE TABLE hbase_table (
rowkey STRING,
family1 ROW<col1 STRING, col2 BIGINT>,
family2 ROW<col1 STRING, col2 BOOLEAN>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'my_hbase_table',
'zookeeper.quorum' = 'localhost:2181'
);SELECT rowkey, family1.col1, family2.col2
FROM hbase_table
WHERE family1.col2 > 100;INSERT INTO hbase_table
SELECT rowkey, family1, family2
FROM source_table;The Apache Flink HBase 1.4 Connector is built around several key components:
HBase1DynamicTableFactory serves as the main entry point for creating HBase table sources and sinks through Flink's Table APIHBaseDynamicTableSource and HBaseRowDataInputFormat handle reading data from HBase tables with support for region-aware splittingHBaseDynamicTableSink provides buffered write operations with configurable flushing strategiesCore factory class for creating HBase table sources and sinks with comprehensive configuration options. Handles connector registration and table creation.
public class HBase1DynamicTableFactory
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
public DynamicTableSource createDynamicTableSource(Context context);
public DynamicTableSink createDynamicTableSink(Context context);
public String factoryIdentifier(); // Returns "hbase-1.4"
public Set<ConfigOption<?>> requiredOptions();
public Set<ConfigOption<?>> optionalOptions();
}Table Factory and Configuration
Reading data from HBase tables with support for batch and lookup operations, region-aware splitting, and configurable caching.
public class HBaseDynamicTableSource extends AbstractHBaseDynamicTableSource {
public HBaseDynamicTableSource(
Configuration conf,
String tableName,
HBaseTableSchema hbaseSchema,
String nullStringLiteral,
HBaseLookupOptions lookupOptions
);
public DynamicTableSource copy();
public InputFormat<RowData, ?> getInputFormat();
public HBaseLookupOptions getLookupOptions();
}Writing data to HBase tables with configurable buffering, batching, and exactly-once processing guarantees.
public class HBaseDynamicTableSink implements DynamicTableSink {
public HBaseDynamicTableSink(
String tableName,
HBaseTableSchema hbaseTableSchema,
Configuration hbaseConf,
HBaseWriteOptions writeOptions,
String nullStringLiteral
);
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
public DynamicTableSink copy();
}Configuration options for optimizing write performance through buffering, batching, and parallelism control.
public class HBaseWriteOptions implements Serializable {
public static Builder builder();
public long getBufferFlushMaxSizeInBytes();
public long getBufferFlushMaxRows();
public long getBufferFlushIntervalMillis();
public Integer getParallelism();
}
public static class Builder {
public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes);
public Builder setBufferFlushMaxRows(long bufferFlushMaxRows);
public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis);
public Builder setParallelism(Integer parallelism);
public HBaseWriteOptions build();
}Configuration for lookup join operations with caching, retry mechanisms, and async processing options.
public class HBaseLookupOptions implements Serializable {
public static Builder builder();
public long getCacheMaxSize();
public long getCacheExpireMs();
public int getMaxRetryTimes();
public boolean getLookupAsync();
}
public static class Builder {
public Builder setCacheMaxSize(long cacheMaxSize);
public Builder setCacheExpireMs(long cacheExpireMs);
public Builder setMaxRetryTimes(int maxRetryTimes);
public Builder setLookupAsync(boolean lookupAsync);
public HBaseLookupOptions build();
}// Core configuration options
public class HBaseConnectorOptions {
public static final ConfigOption<String> TABLE_NAME;
public static final ConfigOption<String> ZOOKEEPER_QUORUM;
public static final ConfigOption<String> ZOOKEEPER_ZNODE_PARENT;
public static final ConfigOption<String> NULL_STRING_LITERAL;
public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_SIZE;
public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS;
public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL;
public static final ConfigOption<Boolean> LOOKUP_ASYNC;
public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS;
public static final ConfigOption<Duration> LOOKUP_CACHE_TTL;
public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES;
public static final ConfigOption<Integer> SINK_PARALLELISM;
}
// Input format for reading HBase data
public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> {
public HBaseRowDataInputFormat(
Configuration conf,
String tableName,
HBaseTableSchema schema,
String nullStringLiteral
);
}
// Abstract base class for HBase input formats
public abstract class AbstractTableInputFormat<T>
extends RichInputFormat<T, TableInputSplit> {
protected abstract void initTable() throws IOException;
protected abstract Scan getScanner();
protected abstract String getTableName();
protected abstract T mapResultToOutType(Result r);
}