Apache Flink batch processing connectors for Avro, JDBC, Hadoop, HBase, and HCatalog data sources
—
Apache HBase database connectivity for Flink batch processing, providing region-aware table access with distributed processing capabilities.
Abstract base class for reading from HBase tables with region-aware splitting for optimal distributed processing.
/**
* Abstract base class for reading from HBase tables in Flink
* @param <T> Tuple type representing the HBase table row structure
*/
public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> {
/**
* Default constructor for TableInputFormat
*/
public TableInputFormat();
/**
* Returns the HBase Scan instance for reading table data
* Subclasses must implement this to define what data to read
* @return Scan object configured with column families, filters, etc.
*/
protected abstract Scan getScanner();
/**
* Returns the name of the HBase table to read from
* @return HBase table name as a string
*/
protected abstract String getTableName();
/**
* Maps an HBase Result to a Flink Tuple
* This method defines how to convert HBase row data to Flink types
* @param r HBase Result containing row data
* @return Flink Tuple representing the row
*/
protected abstract T mapResultToTuple(Result r);
/**
* Determines whether to include a specific HBase region in the input split
* Can be overridden to filter regions based on key ranges
* @param startKey Start key of the region
* @param endKey End key of the region
* @return true to include the region, false to skip it
*/
protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey);
}Usage Example:
import org.apache.flink.addons.hbase.TableInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
// Custom implementation for reading user data from HBase
public class UserTableInputFormat extends TableInputFormat<Tuple3<String, String, Integer>> {
@Override
protected Scan getScanner() {
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("email"));
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));
return scan;
}
@Override
protected String getTableName() {
return "users";
}
@Override
protected Tuple3<String, String, Integer> mapResultToTuple(Result r) {
String name = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
String email = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("email")));
Integer age = Bytes.toInt(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));
return new Tuple3<>(name, email, age);
}
@Override
protected boolean includeRegionInSplit(byte[] startKey, byte[] endKey) {
// Include all regions by default
return true;
}
}
// Use in Flink program
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<String, String, Integer>> users = env.createInput(new UserTableInputFormat());
users.print();Input split representing HBase table region ranges for distributed processing.
/**
* Input split representing HBase table region ranges
*/
public class TableInputSplit extends LocatableInputSplit {
/**
* Creates a new TableInputSplit for an HBase region
* @param splitNumber Unique identifier for this split
* @param hostnames Array of hostnames where this region is located
* @param tableName Name of the HBase table (as byte array)
* @param startRow Start row key for this region (as byte array)
* @param endRow End row key for this region (as byte array)
*/
TableInputSplit(final int splitNumber, final String[] hostnames,
final byte[] tableName, final byte[] startRow, final byte[] endRow);
/**
* Returns the HBase table name for this split
* @return Table name as byte array
*/
public byte[] getTableName();
/**
* Returns the start row key for this region split
* @return Start row key as byte array
*/
public byte[] getStartRow();
/**
* Returns the end row key for this region split
* @return End row key as byte array
*/
public byte[] getEndRow();
}public class FilteredUserTableInputFormat extends TableInputFormat<Tuple2<String, String>> {
@Override
protected Scan getScanner() {
Scan scan = new Scan();
// Add row key prefix filter
scan.setRowPrefixFilter(Bytes.toBytes("user_"));
// Add column filter
scan.addColumn(Bytes.toBytes("profile"), Bytes.toBytes("name"));
scan.addColumn(Bytes.toBytes("profile"), Bytes.toBytes("status"));
// Set time range for recent data only
try {
long oneWeekAgo = System.currentTimeMillis() - (7 * 24 * 60 * 60 * 1000);
scan.setTimeRange(oneWeekAgo, System.currentTimeMillis());
} catch (IOException e) {
throw new RuntimeException("Failed to set time range", e);
}
return scan;
}
@Override
protected String getTableName() {
return "user_profiles";
}
@Override
protected Tuple2<String, String> mapResultToTuple(Result r) {
String name = Bytes.toString(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("name")));
String status = Bytes.toString(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("status")));
return new Tuple2<>(name, status);
}
@Override
protected boolean includeRegionInSplit(byte[] startKey, byte[] endKey) {
// Only include regions that contain user data
if (startKey.length == 0) return true; // First region
if (endKey.length == 0) return true; // Last region
String startKeyStr = Bytes.toString(startKey);
return startKeyStr.startsWith("user_");
}
}public class CompleteUserTableInputFormat extends TableInputFormat<Tuple5<String, String, Integer, String, Long>> {
@Override
protected Scan getScanner() {
Scan scan = new Scan();
// Add multiple column families
scan.addFamily(Bytes.toBytes("basic")); // Basic info
scan.addFamily(Bytes.toBytes("contact")); // Contact info
scan.addFamily(Bytes.toBytes("activity")); // Activity data
// Configure caching for better performance
scan.setCaching(1000);
scan.setBatch(100);
return scan;
}
@Override
protected String getTableName() {
return "complete_users";
}
@Override
protected Tuple5<String, String, Integer, String, Long> mapResultToTuple(Result r) {
// Extract from basic column family
String name = Bytes.toString(r.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name")));
Integer age = Bytes.toInt(r.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age")));
// Extract from contact column family
String email = Bytes.toString(r.getValue(Bytes.toBytes("contact"), Bytes.toBytes("email")));
// Extract from activity column family
Long lastLogin = Bytes.toLong(r.getValue(Bytes.toBytes("activity"), Bytes.toBytes("last_login")));
// Get row key
String rowKey = Bytes.toString(r.getRow());
return new Tuple5<>(rowKey, name, age, email, lastLogin);
}
}import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
public class TypedUserTableInputFormat extends TableInputFormat<Tuple3<String, String, Integer>> {
// ... implement abstract methods as shown above ...
// Optional: Override to provide explicit type information
@Override
public TypeInformation<Tuple3<String, String, Integer>> getProducedType() {
return new TupleTypeInfo<>(
BasicTypeInfo.STRING_TYPE_INFO, // name
BasicTypeInfo.STRING_TYPE_INFO, // email
BasicTypeInfo.INT_TYPE_INFO // age
);
}
}HBase regions are automatically mapped to Flink parallel tasks for optimal data locality:
// Configure HBase client for better performance
Configuration hbaseConfig = HBaseConfiguration.create();
hbaseConfig.set("hbase.client.scanner.caching", "1000");
hbaseConfig.set("hbase.client.scanner.timeout.period", "600000");
// These settings are automatically picked up by TableInputFormat@Override
protected Scan getScanner() {
Scan scan = new Scan();
// Configure batch size to control memory usage
scan.setBatch(100); // Process 100 columns per RPC
// Configure caching for network efficiency
scan.setCaching(1000); // Cache 1000 rows per RPC
// Limit columns to reduce network traffic
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col1"));
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col2"));
return scan;
}import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.LocatableInputSplit;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;