or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md
tile.json

table-api.mddocs/

Table API Integration

The HBase connector provides comprehensive integration with Flink's Table API, enabling SQL-based data processing, declarative table definitions, and lookup joins. This includes table sources, sinks, factory classes, and descriptor-based configuration.

HBaseTableSource

A table source that provides HBase table data to the Table API with support for both batch and streaming queries, plus lookup functionality for temporal joins.

class HBaseTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> {
    public HBaseTableSource(Configuration conf, String tableName);
    
    // Schema configuration
    public void addColumn(String family, String qualifier, Class<?> clazz);
    public void setRowKey(String rowKeyName, Class<?> clazz);
    public void setCharset(String charset);
    
    // Table source methods
    public TypeInformation<Row> getReturnType();
    public TableSchema getTableSchema();
    public DataSet<Row> getDataSet(ExecutionEnvironment execEnv);
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv);
    public HBaseTableSource projectFields(int[] fields); 
    public String explainSource();
    
    // Lookup capabilities
    public TableFunction<Row> getLookupFunction(String[] lookupKeys);
    public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys); // Throws UnsupportedOperationException
    public boolean isAsyncEnabled(); // Returns false
    public boolean isBounded(); // Returns true
}

Basic Table Source Usage

import org.apache.flink.addons.hbase.HBaseTableSource;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.hadoop.conf.Configuration;

// Configure HBase connection
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "localhost:2181");

// Create table source
HBaseTableSource tableSource = new HBaseTableSource(conf, "user_profiles");

// Define schema
tableSource.setRowKey("user_id", String.class);
tableSource.addColumn("profile", "name", String.class);
tableSource.addColumn("profile", "age", Integer.class);
tableSource.addColumn("profile", "email", String.class);
tableSource.addColumn("activity", "last_login", java.sql.Timestamp.class);
tableSource.addColumn("activity", "login_count", Long.class);

// Register with Table Environment
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
tableEnv.registerTableSource("users", tableSource);

// Query the table
Table result = tableEnv.sqlQuery(
    "SELECT user_id, name, age FROM users WHERE age > 21 AND login_count > 10"
);

Column Projection

// Project specific fields for better performance
int[] projectedFields = {0, 1, 3}; // user_id, name, email
HBaseTableSource projectedSource = tableSource.projectFields(projectedFields);

tableEnv.registerTableSource("users_projected", projectedSource);
Table result = tableEnv.sqlQuery("SELECT * FROM users_projected");

HBaseUpsertTableSink

A table sink that handles upsert and delete operations for HBase tables through the Table API.

class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {
    public HBaseUpsertTableSink(HBaseTableSchema hbaseTableSchema, 
        HBaseOptions hbaseOptions, HBaseWriteOptions writeOptions);
    
    // Configuration methods
    public void setKeyFields(String[] keys); // Ignored - HBase always upserts on rowkey
    public void setIsAppendOnly(Boolean isAppendOnly);
    
    // Table sink methods
    public TypeInformation<Row> getRecordType();
    public TableSchema getTableSchema();
    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);
    public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);
    public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes);
}

Basic Table Sink Usage

import org.apache.flink.addons.hbase.HBaseUpsertTableSink;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.addons.hbase.HBaseOptions;
import org.apache.flink.addons.hbase.HBaseWriteOptions;

// Define HBase table schema
HBaseTableSchema schema = new HBaseTableSchema();
schema.setRowKey("user_id", String.class);
schema.addColumn("profile", "name", String.class);
schema.addColumn("profile", "age", Integer.class);
schema.addColumn("activity", "last_login", java.sql.Timestamp.class);

// Configure HBase connection
HBaseOptions hbaseOptions = HBaseOptions.builder()
    .setTableName("user_profiles")
    .setZkQuorum("localhost:2181")
    .build();

// Configure write options  
HBaseWriteOptions writeOptions = HBaseWriteOptions.builder()
    .setBufferFlushMaxSizeInBytes(2 * 1024 * 1024) // 2MB
    .setBufferFlushMaxRows(1000)
    .setBufferFlushIntervalMillis(5000) // 5 seconds
    .build();

// Create table sink
HBaseUpsertTableSink tableSink = new HBaseUpsertTableSink(schema, hbaseOptions, writeOptions);

// Register with Table Environment
tableEnv.registerTableSink("user_sink", tableSink);

// Insert data via SQL
tableEnv.sqlUpdate(
    "INSERT INTO user_sink " +
    "SELECT user_id, name, age, CURRENT_TIMESTAMP " +
    "FROM source_table WHERE age > 18"
);

HBaseTableFactory

Factory class for creating HBase table sources and sinks using connector properties.

class HBaseTableFactory implements 
    StreamTableSourceFactory<Row>, StreamTableSinkFactory<Tuple2<Boolean, Row>> {
    
    public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties);
    public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties);
    public Map<String, String> requiredContext();
    public List<String> supportedProperties();
}

Using Table Factory with Properties

import org.apache.flink.table.descriptors.ConnectorDescriptor;
import java.util.Map;
import java.util.HashMap;

// Define connector properties
Map<String, String> properties = new HashMap<>();
properties.put("connector.type", "hbase");
properties.put("connector.version", "1.4.3");
properties.put("connector.table-name", "user_profiles");
properties.put("connector.zookeeper.quorum", "localhost:2181");
properties.put("connector.zookeeper.znode.parent", "/hbase");

// Schema properties
properties.put("schema.0.name", "user_id");
properties.put("schema.0.data-type", "VARCHAR");
properties.put("schema.1.name", "name");
properties.put("schema.1.data-type", "VARCHAR");
properties.put("schema.2.name", "age");
properties.put("schema.2.data-type", "INT");

// Create source via factory
HBaseTableFactory factory = new HBaseTableFactory();
StreamTableSource<Row> source = factory.createStreamTableSource(properties);

Table Descriptors

Declarative configuration using Flink's descriptor API for Table DDL.

HBase Descriptor

class HBase extends ConnectorDescriptor {
    public HBase();
    
    // Required configuration
    public HBase version(String version);
    public HBase tableName(String tableName);
    public HBase zookeeperQuorum(String zookeeperQuorum);
    
    // Optional configuration
    public HBase zookeeperNodeParent(String zookeeperNodeParent);
    public HBase writeBufferFlushMaxSize(String maxSize);
    public HBase writeBufferFlushMaxRows(int writeBufferFlushMaxRows);
    public HBase writeBufferFlushInterval(String interval);
}

Descriptor Usage Example

import org.apache.flink.table.descriptors.HBase;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.api.DataTypes;

// Create HBase table using descriptors
tableEnv.connect(
    new HBase()
        .version("1.4.3")
        .tableName("user_events") 
        .zookeeperQuorum("zk1:2181,zk2:2181,zk3:2181")
        .zookeeperNodeParent("/hbase")
        .writeBufferFlushMaxSize("4mb")
        .writeBufferFlushMaxRows(2000)
        .writeBufferFlushInterval("10s")
)
.withSchema(
    new Schema()
        .field("rowkey", DataTypes.STRING())
        .field("event_type", DataTypes.STRING())
        .field("user_id", DataTypes.STRING())
        .field("timestamp", DataTypes.TIMESTAMP(3))
        .field("properties", DataTypes.STRING())
)
.createTemporaryTable("events");

// Use the table in SQL
Table events = tableEnv.sqlQuery(
    "SELECT user_id, event_type, timestamp " +
    "FROM events " + 
    "WHERE event_type = 'login' AND user_id IS NOT NULL"
);

SQL DDL Support

Create HBase tables using SQL DDL statements:

-- Create HBase source table
CREATE TABLE user_profiles (
    user_id STRING,
    name STRING,
    age INT,
    email STRING,
    last_login TIMESTAMP(3),
    login_count BIGINT
) WITH (
    'connector' = 'hbase',
    'table-name' = 'user_profiles',
    'zookeeper.quorum' = 'localhost:2181'
);

-- Create HBase sink table with write options
CREATE TABLE user_sink (
    user_id STRING,
    name STRING, 
    age INT,
    registration_time TIMESTAMP(3)
) WITH (
    'connector' = 'hbase',
    'table-name' = 'users',
    'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
    'write.buffer-flush.max-size' = '4mb',
    'write.buffer-flush.max-rows' = '2000',
    'write.buffer-flush.interval' = '10s'
);

Temporal Table Joins (Lookup)

Use HBase as a dimension table for enriching streaming data with lookup joins:

// Register HBase dimension table
tableEnv.connect(
    new HBase()
        .version("1.4.3")
        .tableName("user_profiles")
        .zookeeperQuorum("localhost:2181")
)
.withSchema(
    new Schema()
        .field("user_id", DataTypes.STRING())
        .field("name", DataTypes.STRING())
        .field("age", DataTypes.INT())
        .field("email", DataTypes.STRING())
)
.createTemporaryTable("user_dim");

// Create a processing time temporal table
tableEnv.sqlUpdate(
    "CREATE VIEW user_dim_temporal AS " +
    "SELECT *, PROCTIME() as proc_time FROM user_dim"
);

// Perform lookup join
Table enrichedEvents = tableEnv.sqlQuery(
    "SELECT " +
    "    e.event_id, " +
    "    e.user_id, " +
    "    e.event_type, " +
    "    u.name, " +
    "    u.email " +
    "FROM events e " +
    "JOIN user_dim_temporal FOR SYSTEM_TIME AS OF e.proc_time AS u " +
    "ON e.user_id = u.user_id"
);

Lookup Join with SQL DDL

-- Source stream table
CREATE TABLE user_events (
    event_id STRING,
    user_id STRING,
    event_type STRING,
    event_time TIMESTAMP(3),
    proc_time AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'user-events'
);

-- HBase lookup table
CREATE TABLE user_profiles (
    user_id STRING,
    name STRING,
    age INT,
    email STRING
) WITH (
    'connector' = 'hbase',
    'table-name' = 'user_profiles',
    'zookeeper.quorum' = 'localhost:2181'
);

-- Lookup join query
SELECT 
    e.event_id,
    e.user_id,
    e.event_type,
    u.name,
    u.email
FROM user_events e
JOIN user_profiles FOR SYSTEM_TIME AS OF e.proc_time AS u
ON e.user_id = u.user_id;

HBaseValidator Constants

Property key constants for HBase connector configuration:

class HBaseValidator {
    public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
    public static final String CONNECTOR_VERSION_VALUE_143 = "1.4.3";
    public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
    public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum";
    public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent";
    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size";
    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows";
    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval";
    
    public void validate(DescriptorProperties properties);
}

Configuration Properties

Complete list of supported connector properties:

Required Properties

PropertyDescriptionExample
connector.typeConnector type (must be "hbase")"hbase"
connector.versionHBase version"1.4.3"
connector.table-nameHBase table name"my_table"
connector.zookeeper.quorumZooKeeper ensemble"zk1:2181,zk2:2181"

Optional Properties

PropertyDescriptionDefaultExample
connector.zookeeper.znode.parentZK parent node"/hbase""/hbase-prod"
connector.write.buffer-flush.max-sizeMax buffer size"2mb""4mb"
connector.write.buffer-flush.max-rowsMax buffered rows10002000
connector.write.buffer-flush.intervalFlush interval"5s""10s"

Advanced Patterns

Partitioned Table Processing

// Process HBase table data by row key ranges
Table rangeQuery = tableEnv.sqlQuery(
    "SELECT * FROM users " +
    "WHERE user_id BETWEEN 'user_00001' AND 'user_99999'"
);

// Use row key prefixes for efficient scanning
Table prefixQuery = tableEnv.sqlQuery(
    "SELECT * FROM events " + 
    "WHERE rowkey LIKE 'user123_%'"
);

Aggregations and Analytics

// Aggregate data from HBase
Table analytics = tableEnv.sqlQuery(
    "SELECT " +
    "    DATE_FORMAT(last_login, 'yyyy-MM-dd') as login_date, " +
    "    COUNT(*) as user_count, " +
    "    AVG(age) as avg_age " +
    "FROM user_profiles " +
    "WHERE last_login >= CURRENT_DATE - INTERVAL '7' DAY " +
    "GROUP BY DATE_FORMAT(last_login, 'yyyy-MM-dd')"
);

Complex Data Types

// Handle complex data types in HBase
tableEnv.connect(
    new HBase()
        .version("1.4.3")
        .tableName("complex_data")
        .zookeeperQuorum("localhost:2181")
)
.withSchema(
    new Schema()
        .field("id", DataTypes.STRING())
        .field("binary_data", DataTypes.BYTES()) // byte[] data
        .field("json_data", DataTypes.STRING())  // JSON as string
        .field("decimal_value", DataTypes.DECIMAL(10, 2))
        .field("timestamp_value", DataTypes.TIMESTAMP(3))
)
.createTemporaryTable("complex_table");

Performance Tuning

Source Performance

// Optimize HBase source scanning
HBaseTableSource optimizedSource = new HBaseTableSource(conf, "large_table");

// Configure HBase client for better scan performance
conf.setInt("hbase.client.scanner.caching", 1000);
conf.setInt("hbase.client.scanner.max.result.size", 2 * 1024 * 1024);
conf.setBoolean("hbase.client.scanner.async.prefetch", true);

Sink Performance

// High-throughput sink configuration
HBaseWriteOptions highThroughputOptions = HBaseWriteOptions.builder()
    .setBufferFlushMaxSizeInBytes(16 * 1024 * 1024) // 16MB
    .setBufferFlushMaxRows(10000)
    .setBufferFlushIntervalMillis(30000) // 30 seconds
    .build();

Parallelism Configuration

// Configure parallelism for HBase operations
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8); // Match number of HBase regions

// Set specific parallelism for HBase operations
DataStream<Row> hbaseStream = tableEnv.toAppendStream(hbaseTable, Row.class);
hbaseStream.addSink(hbaseSink).setParallelism(4);