or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md
tile.json

tessl/maven-org-apache-flink--flink-hbase_2-11

Apache Flink HBase connector library that enables seamless integration between Flink streaming and batch processing applications with Apache HBase.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-hbase_2.11@1.10.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-hbase_2-11@1.10.0

index.mddocs/

Apache Flink HBase Connector

The Apache Flink HBase connector provides comprehensive integration between Apache Flink's stream processing capabilities and Apache HBase's NoSQL database. It supports both DataStream API and Table API operations for reading from and writing to HBase tables, with features like lookup joins, buffered writes, and flexible serialization.

Package Information

  • Package Name: org.apache.flink:flink-hbase_2.11
  • Package Type: maven
  • Language: Java
  • Version: 1.10.3
  • Installation: Add to your Maven pom.xml:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hbase_${scala.binary.version}</artifactId>
    <version>1.10.3</version>
</dependency>

Core Imports

// DataStream API - Input Format
import org.apache.flink.addons.hbase.HBaseRowInputFormat;
import org.apache.flink.addons.hbase.HBaseTableSchema;

// DataStream API - Sink Function  
import org.apache.flink.addons.hbase.HBaseUpsertSinkFunction;

// Table API - Source and Sink
import org.apache.flink.addons.hbase.HBaseTableSource;
import org.apache.flink.addons.hbase.HBaseUpsertTableSink;
import org.apache.flink.addons.hbase.HBaseTableFactory;

// Configuration
import org.apache.flink.addons.hbase.HBaseOptions;
import org.apache.flink.addons.hbase.HBaseWriteOptions;

// Table API Descriptors
import org.apache.flink.table.descriptors.HBase;

Basic Usage

Reading from HBase (DataStream API)

import org.apache.flink.addons.hbase.HBaseRowInputFormat;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;

// Configure HBase connection
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "localhost:2181");

// Define table schema
HBaseTableSchema schema = new HBaseTableSchema();
schema.setRowKey("rowkey", String.class);
schema.addColumn("cf1", "col1", String.class);
schema.addColumn("cf1", "col2", Integer.class);

// Create input format
HBaseRowInputFormat inputFormat = new HBaseRowInputFormat(conf, "my_table", schema);

// Read as DataStream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Row> hbaseData = env.createInput(inputFormat);

Writing to HBase (DataStream API)

import org.apache.flink.addons.hbase.HBaseUpsertSinkFunction;
import org.apache.flink.types.Row;

// Create sink function with buffering
HBaseUpsertSinkFunction sinkFunction = new HBaseUpsertSinkFunction(
    "my_table",           // table name
    schema,               // table schema
    conf,                 // HBase configuration
    2 * 1024 * 1024,      // buffer flush max size (2MB)
    1000,                 // buffer flush max mutations
    5000                  // buffer flush interval (5 seconds)
);

// Apply to DataStream
DataStream<Tuple2<Boolean, Row>> upsertStream = // your stream of upserts
upsertStream.addSink(sinkFunction);

Table API Usage

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.descriptors.HBase;
import org.apache.flink.table.descriptors.Schema;

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());

// Register HBase table
tableEnv.connect(
    new HBase()
        .version("1.4.3")
        .tableName("my_table")
        .zookeeperQuorum("localhost:2181")
)
.withSchema(
    new Schema()
        .field("rowkey", DataTypes.STRING())
        .field("cf1_col1", DataTypes.STRING())
        .field("cf1_col2", DataTypes.INT())
)
.createTemporaryTable("hbase_table");

// Query the table
Table result = tableEnv.sqlQuery("SELECT * FROM hbase_table WHERE cf1_col2 > 100");

Architecture

The HBase connector is organized into several key components:

  • Input Formats: For batch and streaming reads (HBaseRowInputFormat, AbstractTableInputFormat)
  • Sink Functions: For streaming writes with buffering (HBaseUpsertSinkFunction)
  • Table API Integration: Source/sink factories and descriptors (HBaseTableFactory, HBaseTableSource)
  • Schema Definition: Type-safe column family and qualifier mapping (HBaseTableSchema)
  • Configuration: Connection and write performance options (HBaseOptions, HBaseWriteOptions)
  • Utilities: Type conversion and HBase operation helpers

Capabilities

DataStream API Input Formats

Read data from HBase tables using InputFormat classes with full control over scanning and result mapping.

class HBaseRowInputFormat extends AbstractTableInputFormat<Row> {
    public HBaseRowInputFormat(Configuration conf, String tableName, HBaseTableSchema schema);
    public void configure(Configuration parameters);
    public String getTableName();
    public TypeInformation<Row> getProducedType();
}

DataStream Input Formats

DataStream API Sink Functions

Write data to HBase tables with configurable buffering and automatic batching for optimal performance.

class HBaseUpsertSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>>
    implements CheckpointedFunction, BufferedMutator.ExceptionListener {
    public HBaseUpsertSinkFunction(String hTableName, HBaseTableSchema schema, 
        Configuration conf, long bufferFlushMaxSizeInBytes, 
        long bufferFlushMaxMutations, long bufferFlushIntervalMillis);
    public void open(Configuration parameters);
    public void invoke(Tuple2<Boolean, Row> value, Context context);
    public void close();
}

DataStream Sink Functions

Table API Sources and Sinks

Integrate HBase with Flink's Table API for SQL-based data processing and lookup joins.

class HBaseTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> {
    public HBaseTableSource(Configuration conf, String tableName);
    public void addColumn(String family, String qualifier, Class<?> clazz);
    public void setRowKey(String rowKeyName, Class<?> clazz);
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv);
    public TableFunction<Row> getLookupFunction(String[] lookupKeys);
}
class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {
    public HBaseUpsertTableSink(HBaseTableSchema hbaseTableSchema, 
        HBaseOptions hbaseOptions, HBaseWriteOptions writeOptions);
    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);
}

Table API Integration

Schema Configuration

Define type-safe mappings between Flink data types and HBase column families and qualifiers.

class HBaseTableSchema {
    public void addColumn(String family, String qualifier, Class<?> clazz);
    public void setRowKey(String rowKeyName, Class<?> clazz);
    public void setCharset(String charset);
    public String[] getFamilyNames();
    public TypeInformation<?>[] getQualifierTypes(String family);
}

Schema and Configuration

Lookup Functions

Enable temporal table joins by looking up dimension data from HBase in real-time.

class HBaseLookupFunction extends TableFunction<Row> {
    public HBaseLookupFunction(Configuration configuration, String hTableName, 
        HBaseTableSchema hbaseTableSchema);
    public void eval(Object rowKey);
    public TypeInformation<Row> getResultType();
}

Lookup Functions

Utility Classes

Helper classes for type conversion, configuration serialization, and HBase operation creation.

class HBaseTypeUtils {
    public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset);
    public static byte[] serializeFromObject(Object value, int typeIdx, Charset stringCharset);
    public static int getTypeIndex(TypeInformation typeInfo);
    public static boolean isSupportedType(Class<?> clazz);
}

Utilities

Configuration

Connection Configuration

class HBaseOptions {
    public static Builder builder();
    
    static class Builder {
        public Builder setTableName(String tableName);        // Required
        public Builder setZkQuorum(String zkQuorum);          // Required  
        public Builder setZkNodeParent(String zkNodeParent);  // Optional
        public HBaseOptions build();
    }
}

Write Performance Configuration

class HBaseWriteOptions {
    public static Builder builder();
    
    static class Builder {
        public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes);
        public Builder setBufferFlushMaxRows(long bufferFlushMaxRows);
        public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis);
        public HBaseWriteOptions build();
    }
}

Supported Data Types

The connector supports automatic serialization/deserialization for:

  • Primitive types: byte[], String, Byte, Short, Integer, Long, Float, Double, Boolean
  • Temporal types: java.sql.Timestamp, java.sql.Date, java.sql.Time
  • Numeric types: java.math.BigDecimal, java.math.BigInteger

Error Handling

Common exceptions and their meanings:

  • RuntimeException: HBase connection or configuration errors
  • IllegalArgumentException: Invalid parameters or unsupported data types
  • IOException: HBase I/O operation failures
  • ValidationException: Table configuration validation errors
  • TableNotFoundException: Specified HBase table doesn't exist
  • RetriesExhaustedWithDetailsException: BufferedMutator operation failures

Requirements

  • HBase Version: 1.4.3
  • Flink Version: 1.10.3
  • Java Version: 8+
  • Hadoop Configuration: Properly configured hbase-site.xml or programmatic configuration