The HBase connector provides utility classes for type conversion, configuration management, and HBase operation helpers. These utilities handle the low-level details of data serialization, configuration serialization, and HBase client operations.
Utility class for converting between Java objects and HBase byte arrays with support for various data types and character encodings.
class HBaseTypeUtils {
// Core conversion methods
public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset);
public static byte[] serializeFromObject(Object value, int typeIdx, Charset stringCharset);
// Type system utilities
public static int getTypeIndex(TypeInformation typeInfo);
public static boolean isSupportedType(Class<?> clazz);
}import org.apache.flink.addons.hbase.util.HBaseTypeUtils;
import java.nio.charset.StandardCharsets;
// Serialize Java objects to HBase byte arrays
String stringValue = "Hello World";
byte[] stringBytes = HBaseTypeUtils.serializeFromObject(stringValue,
HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);
Integer intValue = 42;
byte[] intBytes = HBaseTypeUtils.serializeFromObject(intValue,
HBaseTypeUtils.getTypeIndex(Types.INT), StandardCharsets.UTF_8);
Double doubleValue = 3.14159;
byte[] doubleBytes = HBaseTypeUtils.serializeFromObject(doubleValue,
HBaseTypeUtils.getTypeIndex(Types.DOUBLE), StandardCharsets.UTF_8);
// Deserialize HBase byte arrays to Java objects
String deserializedString = (String) HBaseTypeUtils.deserializeToObject(stringBytes,
HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);
Integer deserializedInt = (Integer) HBaseTypeUtils.deserializeToObject(intBytes,
HBaseTypeUtils.getTypeIndex(Types.INT), StandardCharsets.UTF_8);
Double deserializedDouble = (Double) HBaseTypeUtils.deserializeToObject(doubleBytes,
HBaseTypeUtils.getTypeIndex(Types.DOUBLE), StandardCharsets.UTF_8);import java.sql.Timestamp;
import java.sql.Date;
import java.sql.Time;
// Serialize temporal types
Timestamp timestamp = new Timestamp(System.currentTimeMillis());
byte[] timestampBytes = HBaseTypeUtils.serializeFromObject(timestamp,
HBaseTypeUtils.getTypeIndex(Types.SQL_TIMESTAMP), StandardCharsets.UTF_8);
Date date = Date.valueOf("2023-12-25");
byte[] dateBytes = HBaseTypeUtils.serializeFromObject(date,
HBaseTypeUtils.getTypeIndex(Types.SQL_DATE), StandardCharsets.UTF_8);
Time time = Time.valueOf("14:30:00");
byte[] timeBytes = HBaseTypeUtils.serializeFromObject(time,
HBaseTypeUtils.getTypeIndex(Types.SQL_TIME), StandardCharsets.UTF_8);
// Deserialize temporal types
Timestamp deserializedTimestamp = (Timestamp) HBaseTypeUtils.deserializeToObject(timestampBytes,
HBaseTypeUtils.getTypeIndex(Types.SQL_TIMESTAMP), StandardCharsets.UTF_8);
Date deserializedDate = (Date) HBaseTypeUtils.deserializeToObject(dateBytes,
HBaseTypeUtils.getTypeIndex(Types.SQL_DATE), StandardCharsets.UTF_8);
Time deserializedTime = (Time) HBaseTypeUtils.deserializeToObject(timeBytes,
HBaseTypeUtils.getTypeIndex(Types.SQL_TIME), StandardCharsets.UTF_8);import java.math.BigDecimal;
import java.math.BigInteger;
// Serialize big numeric types
BigDecimal bigDecimal = new BigDecimal("12345.6789");
byte[] bigDecimalBytes = HBaseTypeUtils.serializeFromObject(bigDecimal,
HBaseTypeUtils.getTypeIndex(Types.BIG_DEC), StandardCharsets.UTF_8);
BigInteger bigInteger = new BigInteger("123456789012345678901234567890");
byte[] bigIntegerBytes = HBaseTypeUtils.serializeFromObject(bigInteger,
HBaseTypeUtils.getTypeIndex(Types.BIG_INT), StandardCharsets.UTF_8);
// Deserialize big numeric types
BigDecimal deserializedBigDecimal = (BigDecimal) HBaseTypeUtils.deserializeToObject(bigDecimalBytes,
HBaseTypeUtils.getTypeIndex(Types.BIG_DEC), StandardCharsets.UTF_8);
BigInteger deserializedBigInteger = (BigInteger) HBaseTypeUtils.deserializeToObject(bigIntegerBytes,
HBaseTypeUtils.getTypeIndex(Types.BIG_INT), StandardCharsets.UTF_8);// Check if a type is supported
boolean isStringSupported = HBaseTypeUtils.isSupportedType(String.class); // true
boolean isIntSupported = HBaseTypeUtils.isSupportedType(Integer.class); // true
boolean isBooleanSupported = HBaseTypeUtils.isSupportedType(Boolean.class); // true
boolean isCustomSupported = HBaseTypeUtils.isSupportedType(MyCustomClass.class); // false
// Get type index for supported types
int stringTypeIndex = HBaseTypeUtils.getTypeIndex(Types.STRING);
int intTypeIndex = HBaseTypeUtils.getTypeIndex(Types.INT);
int booleanTypeIndex = HBaseTypeUtils.getTypeIndex(Types.BOOLEAN);
// Validate types before processing
public void validateSchema(HBaseTableSchema schema) {
String[] families = schema.getFamilyNames();
for (String family : families) {
TypeInformation<?>[] types = schema.getQualifierTypes(family);
for (TypeInformation<?> type : types) {
if (!HBaseTypeUtils.isSupportedType(type.getTypeClass())) {
throw new IllegalArgumentException(
"Unsupported type in family " + family + ": " + type.getTypeClass().getName());
}
}
}
}Utility for serializing and deserializing Hadoop Configuration objects for distributed processing.
class HBaseConfigurationUtil {
public static byte[] serializeConfiguration(Configuration conf);
public static Configuration deserializeConfiguration(byte[] serializedConfig, Configuration targetConfig);
}import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
import org.apache.hadoop.conf.Configuration;
// Create and configure HBase configuration
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
conf.setInt("hbase.client.scanner.caching", 1000);
conf.setLong("hbase.rpc.timeout", 60000);
// Serialize configuration for distribution
byte[] serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
// Later, deserialize configuration on task managers
Configuration targetConf = new Configuration();
Configuration deserializedConf = HBaseConfigurationUtil.deserializeConfiguration(
serializedConfig, targetConf);
// Verify configuration was properly deserialized
String zkQuorum = deserializedConf.get("hbase.zookeeper.quorum");
int scannerCaching = deserializedConf.getInt("hbase.client.scanner.caching", 100);
long rpcTimeout = deserializedConf.getLong("hbase.rpc.timeout", 30000);// Pattern for distributing HBase configuration in Flink jobs
public class DistributedHBaseProcessor extends RichMapFunction<Row, Row> {
private byte[] serializedConfig;
private transient Configuration hbaseConfig;
private transient Connection hbaseConnection;
public DistributedHBaseProcessor(Configuration config) {
// Serialize configuration in constructor (on job manager)
this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(config);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Deserialize configuration on task manager
this.hbaseConfig = HBaseConfigurationUtil.deserializeConfiguration(
serializedConfig, new Configuration());
// Create HBase connection
this.hbaseConnection = ConnectionFactory.createConnection(hbaseConfig);
}
@Override
public Row map(Row value) throws Exception {
// Use HBase connection for processing
// ...
return value;
}
@Override
public void close() throws Exception {
if (hbaseConnection != null) {
hbaseConnection.close();
}
super.close();
}
}Helper class for creating HBase operations and converting between HBase Result objects and Flink Row objects.
class HBaseReadWriteHelper {
public HBaseReadWriteHelper(HBaseTableSchema hbaseTableSchema);
// Operation creation
public Get createGet(Object rowKey);
public Scan createScan();
public Put createPutMutation(Row row);
public Delete createDeleteMutation(Row row);
// Result conversion
public Row parseToRow(Result result);
public Row parseToRow(Result result, Object rowKey);
}import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Result;
// Create helper with schema
HBaseTableSchema schema = new HBaseTableSchema();
schema.setRowKey("user_id", String.class);
schema.addColumn("profile", "name", String.class);
schema.addColumn("profile", "age", Integer.class);
HBaseReadWriteHelper helper = new HBaseReadWriteHelper(schema);
// Create Get operation for point queries
String userId = "user123";
Get get = helper.createGet(userId);
// Execute get and parse result
Table table = connection.getTable(TableName.valueOf("users"));
Result result = table.get(get);
Row userRow = helper.parseToRow(result);
// Create Scan operation for range queries
Scan scan = helper.createScan();
ResultScanner scanner = table.getScanner(scan);
for (Result scanResult : scanner) {
Row row = helper.parseToRow(scanResult);
// Process row
String name = (String) row.getField(1);
Integer age = (Integer) row.getField(2);
}
scanner.close();import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.flink.types.Row;
// Create Row for insertion
Row userRow = Row.of(
"user456", // user_id (row key)
"Jane Doe", // name
28 // age
);
// Create Put mutation
Put put = helper.createPutMutation(userRow);
// Execute put
table.put(put);
// Create Row for deletion (only row key needed)
Row deleteRow = Row.of("user456", null, null);
// Create Delete mutation
Delete delete = helper.createDeleteMutation(deleteRow);
// Execute delete
table.delete(delete);import java.util.List;
import java.util.ArrayList;
// Batch write operations
List<Row> rows = Arrays.asList(
Row.of("user001", "Alice", 25),
Row.of("user002", "Bob", 30),
Row.of("user003", "Charlie", 35)
);
List<Put> puts = new ArrayList<>();
for (Row row : rows) {
puts.add(helper.createPutMutation(row));
}
// Execute batch put
table.put(puts);
// Batch read operations
List<Get> gets = Arrays.asList(
helper.createGet("user001"),
helper.createGet("user002"),
helper.createGet("user003")
);
Result[] results = table.get(gets);
for (Result result : results) {
if (!result.isEmpty()) {
Row row = helper.parseToRow(result);
// Process row
}
}// Extend HBaseTypeUtils for custom type handling
public class ExtendedTypeUtils {
// Custom serialization for complex types
public static byte[] serializeJson(Object jsonObject) {
try {
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(jsonObject);
return HBaseTypeUtils.serializeFromObject(json,
HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);
} catch (Exception e) {
throw new RuntimeException("JSON serialization failed", e);
}
}
// Custom deserialization for complex types
public static <T> T deserializeJson(byte[] bytes, Class<T> valueType) {
try {
String json = (String) HBaseTypeUtils.deserializeToObject(bytes,
HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(json, valueType);
} catch (Exception e) {
throw new RuntimeException("JSON deserialization failed", e);
}
}
}
// Usage example
MyCustomObject obj = new MyCustomObject("value1", 123);
byte[] serialized = ExtendedTypeUtils.serializeJson(obj);
MyCustomObject deserialized = ExtendedTypeUtils.deserializeJson(serialized, MyCustomObject.class);// Utility class for common HBase configurations
public class HBaseConfigTemplates {
public static Configuration createDevelopmentConfig() {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "localhost:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
// Development-friendly settings
conf.setLong("hbase.rpc.timeout", 30000);
conf.setLong("hbase.client.operation.timeout", 60000);
conf.setInt("hbase.client.retries.number", 3);
return conf;
}
public static Configuration createProductionConfig(String zkQuorum) {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", zkQuorum);
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
// Production-optimized settings
conf.setLong("hbase.rpc.timeout", 120000);
conf.setLong("hbase.client.operation.timeout", 300000);
conf.setInt("hbase.client.retries.number", 10);
conf.setLong("hbase.client.pause", 1000);
// Performance tuning
conf.setInt("hbase.client.ipc.pool.size", 10);
conf.setInt("hbase.client.max.total.tasks", 200);
conf.setInt("hbase.client.max.perserver.tasks", 20);
return conf;
}
public static Configuration createHighPerformanceConfig(String zkQuorum) {
Configuration conf = createProductionConfig(zkQuorum);
// High-performance settings
conf.setInt("hbase.client.scanner.caching", 2000);
conf.setLong("hbase.client.scanner.max.result.size", 8 * 1024 * 1024);
conf.setBoolean("hbase.client.scanner.async.prefetch", true);
conf.setLong("hbase.client.write.buffer", 16 * 1024 * 1024);
return conf;
}
}// Comprehensive schema validation
public class SchemaValidationUtils {
public static void validateTableSchema(HBaseTableSchema schema) {
validateRowKey(schema);
validateColumnFamilies(schema);
validateDataTypes(schema);
validateCharset(schema);
}
private static void validateRowKey(HBaseTableSchema schema) {
if (schema.getRowKeyIndex() < 0) {
throw new IllegalArgumentException("Row key must be defined");
}
Optional<TypeInformation<?>> rowKeyType = schema.getRowKeyTypeInfo();
if (!rowKeyType.isPresent()) {
throw new IllegalArgumentException("Row key type information missing");
}
if (!HBaseTypeUtils.isSupportedType(rowKeyType.get().getTypeClass())) {
throw new IllegalArgumentException("Unsupported row key type: " +
rowKeyType.get().getTypeClass().getSimpleName());
}
}
private static void validateColumnFamilies(HBaseTableSchema schema) {
String[] families = schema.getFamilyNames();
if (families == null || families.length == 0) {
throw new IllegalArgumentException("At least one column family must be defined");
}
for (String family : families) {
if (family == null || family.trim().isEmpty()) {
throw new IllegalArgumentException("Column family name cannot be null or empty");
}
byte[][] qualifiers = schema.getQualifierKeys(family);
if (qualifiers == null || qualifiers.length == 0) {
throw new IllegalArgumentException("Column family '" + family + "' has no qualifiers");
}
}
}
private static void validateDataTypes(HBaseTableSchema schema) {
String[] families = schema.getFamilyNames();
for (String family : families) {
TypeInformation<?>[] types = schema.getQualifierTypes(family);
for (TypeInformation<?> type : types) {
if (!HBaseTypeUtils.isSupportedType(type.getTypeClass())) {
throw new IllegalArgumentException("Unsupported type in family '" +
family + "': " + type.getTypeClass().getSimpleName());
}
}
}
}
private static void validateCharset(HBaseTableSchema schema) {
String charset = schema.getStringCharset();
try {
Charset.forName(charset);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid charset: " + charset, e);
}
}
}// Utility for monitoring HBase operations
public class HBaseMonitoringUtils {
public static class OperationTimer implements AutoCloseable {
private final String operationName;
private final long startTime;
private final Histogram latencyHistogram;
private final Counter operationCounter;
public OperationTimer(String operationName, MetricGroup metricGroup) {
this.operationName = operationName;
this.startTime = System.currentTimeMillis();
this.latencyHistogram = metricGroup.histogram(operationName + "_latency");
this.operationCounter = metricGroup.counter(operationName + "_count");
operationCounter.inc();
}
@Override
public void close() {
long duration = System.currentTimeMillis() - startTime;
latencyHistogram.update(duration);
}
}
// Usage in HBase operations
public Row performMonitoredGet(String rowKey, MetricGroup metricGroup) {
try (OperationTimer timer = new OperationTimer("hbase_get", metricGroup)) {
Get get = helper.createGet(rowKey);
Result result = table.get(get);
return helper.parseToRow(result);
} catch (Exception e) {
metricGroup.counter("hbase_get_errors").inc();
throw new RuntimeException("HBase get failed", e);
}
}
}| Java Type | HBase Storage | Type Index | Notes |
|---|---|---|---|
String | byte[] | STRING | UTF-8/configurable encoding |
byte[] | byte[] | PRIMITIVE_ARRAY | Direct binary storage |
Byte | byte[] | BYTE | Single byte value |
Short | byte[] | SHORT | 2-byte big-endian |
Integer | byte[] | INT | 4-byte big-endian |
Long | byte[] | LONG | 8-byte big-endian |
Float | byte[] | FLOAT | IEEE 754 format |
Double | byte[] | DOUBLE | IEEE 754 format |
Boolean | byte[] | BOOLEAN | Single byte (0/1) |
java.sql.Date | byte[] | SQL_DATE | Long timestamp |
java.sql.Time | byte[] | SQL_TIME | Long timestamp |
java.sql.Timestamp | byte[] | SQL_TIMESTAMP | Long timestamp |
java.math.BigDecimal | byte[] | BIG_DEC | String representation |
java.math.BigInteger | byte[] | BIG_INT | String representation |
// Supported character encodings
Charset utf8 = StandardCharsets.UTF_8; // Default
Charset utf16 = StandardCharsets.UTF_16; // Unicode
Charset iso88591 = StandardCharsets.ISO_8859_1; // Latin-1
Charset ascii = StandardCharsets.US_ASCII; // ASCII
// Usage in schema
HBaseTableSchema schema = new HBaseTableSchema();
schema.setCharset("UTF-8"); // Set encoding for string types