CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-batch-connectors

Apache Flink batch processing connectors for Avro, JDBC, Hadoop, HBase, and HCatalog data sources

Pending
Overview
Eval results
Files

hbase.mddocs/

HBase Connector

Apache HBase database connectivity for Flink batch processing, providing region-aware table access with distributed processing capabilities.

Capabilities

TableInputFormat

Abstract base class for reading from HBase tables with region-aware splitting for optimal distributed processing.

/**
 * Abstract base class for reading from HBase tables in Flink
 * @param <T> Tuple type representing the HBase table row structure
 */
public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> {
    
    /**
     * Default constructor for TableInputFormat
     */
    public TableInputFormat();
    
    /**
     * Returns the HBase Scan instance for reading table data
     * Subclasses must implement this to define what data to read
     * @return Scan object configured with column families, filters, etc.
     */
    protected abstract Scan getScanner();
    
    /**
     * Returns the name of the HBase table to read from
     * @return HBase table name as a string
     */
    protected abstract String getTableName();
    
    /**
     * Maps an HBase Result to a Flink Tuple
     * This method defines how to convert HBase row data to Flink types
     * @param r HBase Result containing row data
     * @return Flink Tuple representing the row
     */
    protected abstract T mapResultToTuple(Result r);
    
    /**
     * Determines whether to include a specific HBase region in the input split
     * Can be overridden to filter regions based on key ranges
     * @param startKey Start key of the region
     * @param endKey End key of the region  
     * @return true to include the region, false to skip it
     */
    protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey);
}

Usage Example:

import org.apache.flink.addons.hbase.TableInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

// Custom implementation for reading user data from HBase
public class UserTableInputFormat extends TableInputFormat<Tuple3<String, String, Integer>> {
    
    @Override
    protected Scan getScanner() {
        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
        scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("email")); 
        scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));
        return scan;
    }
    
    @Override
    protected String getTableName() {
        return "users";
    }
    
    @Override
    protected Tuple3<String, String, Integer> mapResultToTuple(Result r) {
        String name = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
        String email = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("email")));
        Integer age = Bytes.toInt(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));
        return new Tuple3<>(name, email, age);
    }
    
    @Override
    protected boolean includeRegionInSplit(byte[] startKey, byte[] endKey) {
        // Include all regions by default
        return true;
    }
}

// Use in Flink program
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<String, String, Integer>> users = env.createInput(new UserTableInputFormat());
users.print();

TableInputSplit

Input split representing HBase table region ranges for distributed processing.

/**
 * Input split representing HBase table region ranges
 */
public class TableInputSplit extends LocatableInputSplit {
    
    /**
     * Creates a new TableInputSplit for an HBase region
     * @param splitNumber Unique identifier for this split
     * @param hostnames Array of hostnames where this region is located
     * @param tableName Name of the HBase table (as byte array)
     * @param startRow Start row key for this region (as byte array)
     * @param endRow End row key for this region (as byte array)
     */
    TableInputSplit(final int splitNumber, final String[] hostnames, 
                   final byte[] tableName, final byte[] startRow, final byte[] endRow);
    
    /**
     * Returns the HBase table name for this split
     * @return Table name as byte array
     */
    public byte[] getTableName();
    
    /**
     * Returns the start row key for this region split
     * @return Start row key as byte array
     */
    public byte[] getStartRow();
    
    /**
     * Returns the end row key for this region split  
     * @return End row key as byte array
     */
    public byte[] getEndRow();
}

Advanced Usage Patterns

Custom Row Key Filtering

public class FilteredUserTableInputFormat extends TableInputFormat<Tuple2<String, String>> {
    
    @Override
    protected Scan getScanner() {
        Scan scan = new Scan();
        // Add row key prefix filter
        scan.setRowPrefixFilter(Bytes.toBytes("user_"));
        
        // Add column filter
        scan.addColumn(Bytes.toBytes("profile"), Bytes.toBytes("name"));
        scan.addColumn(Bytes.toBytes("profile"), Bytes.toBytes("status"));
        
        // Set time range for recent data only
        try {
            long oneWeekAgo = System.currentTimeMillis() - (7 * 24 * 60 * 60 * 1000);
            scan.setTimeRange(oneWeekAgo, System.currentTimeMillis());
        } catch (IOException e) {
            throw new RuntimeException("Failed to set time range", e);
        }
        
        return scan;
    }
    
    @Override
    protected String getTableName() {
        return "user_profiles";
    }
    
    @Override
    protected Tuple2<String, String> mapResultToTuple(Result r) {
        String name = Bytes.toString(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("name")));
        String status = Bytes.toString(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("status")));
        return new Tuple2<>(name, status);
    }
    
    @Override
    protected boolean includeRegionInSplit(byte[] startKey, byte[] endKey) {
        // Only include regions that contain user data
        if (startKey.length == 0) return true; // First region
        if (endKey.length == 0) return true;   // Last region
        
        String startKeyStr = Bytes.toString(startKey);
        return startKeyStr.startsWith("user_");
    }
}

Multi-Column Family Access

public class CompleteUserTableInputFormat extends TableInputFormat<Tuple5<String, String, Integer, String, Long>> {
    
    @Override
    protected Scan getScanner() {
        Scan scan = new Scan();
        // Add multiple column families
        scan.addFamily(Bytes.toBytes("basic"));      // Basic info
        scan.addFamily(Bytes.toBytes("contact"));    // Contact info  
        scan.addFamily(Bytes.toBytes("activity"));   // Activity data
        
        // Configure caching for better performance
        scan.setCaching(1000);
        scan.setBatch(100);
        
        return scan;
    }
    
    @Override
    protected String getTableName() {
        return "complete_users";
    }
    
    @Override
    protected Tuple5<String, String, Integer, String, Long> mapResultToTuple(Result r) {
        // Extract from basic column family
        String name = Bytes.toString(r.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name")));
        Integer age = Bytes.toInt(r.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age")));
        
        // Extract from contact column family
        String email = Bytes.toString(r.getValue(Bytes.toBytes("contact"), Bytes.toBytes("email")));
        
        // Extract from activity column family
        Long lastLogin = Bytes.toLong(r.getValue(Bytes.toBytes("activity"), Bytes.toBytes("last_login")));
        
        // Get row key
        String rowKey = Bytes.toString(r.getRow());
        
        return new Tuple5<>(rowKey, name, age, email, lastLogin);
    }
}

Integration with Flink's Type System

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

public class TypedUserTableInputFormat extends TableInputFormat<Tuple3<String, String, Integer>> {
    
    // ... implement abstract methods as shown above ...
    
    // Optional: Override to provide explicit type information
    @Override
    public TypeInformation<Tuple3<String, String, Integer>> getProducedType() {
        return new TupleTypeInfo<>(
            BasicTypeInfo.STRING_TYPE_INFO,  // name
            BasicTypeInfo.STRING_TYPE_INFO,  // email
            BasicTypeInfo.INT_TYPE_INFO      // age
        );
    }
}

Performance Considerations

Region-Aware Processing

HBase regions are automatically mapped to Flink parallel tasks for optimal data locality:

// Configure HBase client for better performance
Configuration hbaseConfig = HBaseConfiguration.create();
hbaseConfig.set("hbase.client.scanner.caching", "1000");
hbaseConfig.set("hbase.client.scanner.timeout.period", "600000");

// These settings are automatically picked up by TableInputFormat

Memory Management

@Override
protected Scan getScanner() {
    Scan scan = new Scan();
    
    // Configure batch size to control memory usage
    scan.setBatch(100);  // Process 100 columns per RPC
    
    // Configure caching for network efficiency
    scan.setCaching(1000);  // Cache 1000 rows per RPC
    
    // Limit columns to reduce network traffic
    scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col1"));
    scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col2"));
    
    return scan;
}

Common Types

import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.LocatableInputSplit;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-batch-connectors

docs

avro.md

hadoop.md

hbase.md

hcatalog.md

index.md

jdbc.md

tile.json