or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md
tile.json

input-formats.mddocs/

DataStream Input Formats

The HBase connector provides input formats for reading data from HBase tables in both batch and streaming Flink applications. These formats handle table scanning, row key ranges, and automatic result mapping to Flink data types.

HBaseRowInputFormat

The primary input format for reading HBase tables and converting results to Flink Row objects.

class HBaseRowInputFormat extends AbstractTableInputFormat<Row> {
    public HBaseRowInputFormat(Configuration conf, String tableName, HBaseTableSchema schema);
    public void configure(Configuration parameters);
    public String getTableName();
    public TypeInformation<Row> getProducedType();
}

Usage Example

import org.apache.flink.addons.hbase.HBaseRowInputFormat;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;

// Configure HBase connection
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");

// Define table schema  
HBaseTableSchema schema = new HBaseTableSchema();
schema.setRowKey("user_id", String.class);
schema.addColumn("profile", "name", String.class);
schema.addColumn("profile", "age", Integer.class);
schema.addColumn("activity", "last_login", java.sql.Timestamp.class);

// Create input format
HBaseRowInputFormat inputFormat = new HBaseRowInputFormat(conf, "user_table", schema);

// Use in streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Row> userStream = env.createInput(inputFormat);

// Process the data
userStream.filter(row -> (Integer) row.getField(2) > 18)
          .print();

env.execute("HBase Read Job");

AbstractTableInputFormat<T>

Base class for creating custom HBase input formats with different output types.

abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit> {
    // Abstract methods to implement
    public abstract Scan getScanner();
    public abstract String getTableName();
    public abstract T mapResultToOutType(Result r);
    public abstract void configure(Configuration parameters);
    
    // Implemented methods
    public void open(TableInputSplit split) throws IOException;
    public T nextRecord(T reuse) throws IOException;
    public boolean reachedEnd() throws IOException;
    public void close() throws IOException;
    public void closeInputFormat() throws IOException;
    public TableInputSplit[] createInputSplits(int minNumSplits) throws IOException;
    public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits);
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;
}

Custom Input Format Example

import org.apache.flink.addons.hbase.AbstractTableInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

public class CustomHBaseInputFormat extends AbstractTableInputFormat<Tuple3<String, String, Integer>> {
    private String tableName;
    private Configuration conf;
    
    public CustomHBaseInputFormat(Configuration conf, String tableName) {
        this.conf = conf;
        this.tableName = tableName;
    }
    
    @Override
    public Scan getScanner() {
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes("profile"));
        scan.addFamily(Bytes.toBytes("activity"));
        return scan;
    }
    
    @Override
    public String getTableName() {
        return tableName;
    }
    
    @Override
    public Tuple3<String, String, Integer> mapResultToOutType(Result r) {
        String rowKey = Bytes.toString(r.getRow());
        String name = Bytes.toString(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("name")));
        Integer age = Bytes.toInt(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("age")));
        return new Tuple3<>(rowKey, name, age);
    }
    
    @Override
    public void configure(Configuration parameters) {
        // Configuration logic
    }
}

TableInputFormat<T>

Abstract input format specialized for Tuple output types.

abstract class TableInputFormat<T> extends AbstractTableInputFormat<T> {
    // Abstract methods specific to Tuple mapping
    public abstract Scan getScanner();
    public abstract String getTableName();
    public abstract T mapResultToTuple(Result r);
    
    // Implementation of configure method
    public void configure(Configuration parameters);
}

TableInputSplit

Represents a split of an HBase table for parallel processing.

class TableInputSplit implements InputSplit {
    public byte[] getTableName();
    public byte[] getStartRow();
    public byte[] getEndRow();
    public int getSplitNumber();
    public String[] getLocations() throws IOException;
}

Scan Configuration

The input formats use HBase Scan objects to define which data to read:

Basic Scanning

// Scan all rows and columns
Scan scan = new Scan();

// Scan specific column families
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("cf1"));
scan.addFamily(Bytes.toBytes("cf2"));

// Scan specific columns
Scan scan = new Scan();  
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col1"));
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col2"));

Row Key Range Scanning

// Scan row key range
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes("user_00001"));
scan.setStopRow(Bytes.toBytes("user_99999"));

// Scan with row key prefix
Scan scan = new Scan();
scan.setRowPrefixFilter(Bytes.toBytes("user_2023"));

Filtering

import org.apache.hadoop.hbase.filter.*;

// Single column value filter
SingleColumnValueFilter filter = new SingleColumnValueFilter(
    Bytes.toBytes("profile"),
    Bytes.toBytes("age"),
    CompareFilter.CompareOp.GREATER,
    Bytes.toBytes(18)
);
Scan scan = new Scan();
scan.setFilter(filter);

// Multiple filters
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(new SingleColumnValueFilter(
    Bytes.toBytes("profile"), Bytes.toBytes("active"), 
    CompareFilter.CompareOp.EQUAL, Bytes.toBytes(true)));
filterList.addFilter(new SingleColumnValueFilter(
    Bytes.toBytes("profile"), Bytes.toBytes("age"),
    CompareFilter.CompareOp.GREATER, Bytes.toBytes(21)));

Scan scan = new Scan();
scan.setFilter(filterList);

Performance Considerations

Parallelism

The input formats automatically create splits based on HBase region boundaries for optimal parallel processing:

// Control the minimum number of splits
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8); // This will influence split creation

DataSet<Row> data = env.createInput(inputFormat);

Caching and Batching

Configure HBase scan caching for better performance:

Scan scan = new Scan();
scan.setCaching(1000);  // Number of rows to cache per RPC
scan.setBatch(10);      // Number of columns to retrieve per RPC

Memory Management

For large scans, consider memory usage:

// Limit scan to specific time range
Scan scan = new Scan();
scan.setTimeRange(startTime, endTime);

// Use filters to reduce data transfer
scan.setFilter(new PageFilter(10000)); // Limit results per region

Error Handling

Common exceptions when using input formats:

try {
    DataStream<Row> stream = env.createInput(inputFormat);
    // Process stream
} catch (IOException e) {
    // HBase connection or read errors
    log.error("HBase read failed", e);
} catch (IllegalArgumentException e) {
    // Invalid configuration or schema
    log.error("Configuration error", e);
}

Type Mapping

The input formats handle automatic type conversion from HBase byte arrays:

Java TypeHBase StorageNotes
Stringbyte[]UTF-8 encoding
Integerbyte[]4-byte big-endian
Longbyte[]8-byte big-endian
Doublebyte[]IEEE 754 format
Booleanbyte[]Single byte (0 or 1)
java.sql.Timestampbyte[]Long timestamp
byte[]byte[]Direct storage