or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md
tile.json

utilities.mddocs/

Utilities

The HBase connector provides utility classes for type conversion, configuration management, and HBase operation helpers. These utilities handle the low-level details of data serialization, configuration serialization, and HBase client operations.

HBaseTypeUtils

Utility class for converting between Java objects and HBase byte arrays with support for various data types and character encodings.

class HBaseTypeUtils {
    // Core conversion methods
    public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset);
    public static byte[] serializeFromObject(Object value, int typeIdx, Charset stringCharset);
    
    // Type system utilities
    public static int getTypeIndex(TypeInformation typeInfo);
    public static boolean isSupportedType(Class<?> clazz);
}

Type Serialization and Deserialization

import org.apache.flink.addons.hbase.util.HBaseTypeUtils;
import java.nio.charset.StandardCharsets;

// Serialize Java objects to HBase byte arrays
String stringValue = "Hello World";
byte[] stringBytes = HBaseTypeUtils.serializeFromObject(stringValue, 
    HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);

Integer intValue = 42;
byte[] intBytes = HBaseTypeUtils.serializeFromObject(intValue,
    HBaseTypeUtils.getTypeIndex(Types.INT), StandardCharsets.UTF_8);

Double doubleValue = 3.14159;
byte[] doubleBytes = HBaseTypeUtils.serializeFromObject(doubleValue,
    HBaseTypeUtils.getTypeIndex(Types.DOUBLE), StandardCharsets.UTF_8);

// Deserialize HBase byte arrays to Java objects
String deserializedString = (String) HBaseTypeUtils.deserializeToObject(stringBytes,
    HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);

Integer deserializedInt = (Integer) HBaseTypeUtils.deserializeToObject(intBytes,
    HBaseTypeUtils.getTypeIndex(Types.INT), StandardCharsets.UTF_8);

Double deserializedDouble = (Double) HBaseTypeUtils.deserializeToObject(doubleBytes,
    HBaseTypeUtils.getTypeIndex(Types.DOUBLE), StandardCharsets.UTF_8);

Temporal Data Types

import java.sql.Timestamp;
import java.sql.Date;
import java.sql.Time;

// Serialize temporal types
Timestamp timestamp = new Timestamp(System.currentTimeMillis());
byte[] timestampBytes = HBaseTypeUtils.serializeFromObject(timestamp,
    HBaseTypeUtils.getTypeIndex(Types.SQL_TIMESTAMP), StandardCharsets.UTF_8);

Date date = Date.valueOf("2023-12-25");
byte[] dateBytes = HBaseTypeUtils.serializeFromObject(date,
    HBaseTypeUtils.getTypeIndex(Types.SQL_DATE), StandardCharsets.UTF_8);

Time time = Time.valueOf("14:30:00");
byte[] timeBytes = HBaseTypeUtils.serializeFromObject(time,
    HBaseTypeUtils.getTypeIndex(Types.SQL_TIME), StandardCharsets.UTF_8);

// Deserialize temporal types
Timestamp deserializedTimestamp = (Timestamp) HBaseTypeUtils.deserializeToObject(timestampBytes,
    HBaseTypeUtils.getTypeIndex(Types.SQL_TIMESTAMP), StandardCharsets.UTF_8);

Date deserializedDate = (Date) HBaseTypeUtils.deserializeToObject(dateBytes,
    HBaseTypeUtils.getTypeIndex(Types.SQL_DATE), StandardCharsets.UTF_8);

Time deserializedTime = (Time) HBaseTypeUtils.deserializeToObject(timeBytes,
    HBaseTypeUtils.getTypeIndex(Types.SQL_TIME), StandardCharsets.UTF_8);

Numeric Data Types

import java.math.BigDecimal;
import java.math.BigInteger;

// Serialize big numeric types
BigDecimal bigDecimal = new BigDecimal("12345.6789");
byte[] bigDecimalBytes = HBaseTypeUtils.serializeFromObject(bigDecimal,
    HBaseTypeUtils.getTypeIndex(Types.BIG_DEC), StandardCharsets.UTF_8);

BigInteger bigInteger = new BigInteger("123456789012345678901234567890");
byte[] bigIntegerBytes = HBaseTypeUtils.serializeFromObject(bigInteger,
    HBaseTypeUtils.getTypeIndex(Types.BIG_INT), StandardCharsets.UTF_8);

// Deserialize big numeric types
BigDecimal deserializedBigDecimal = (BigDecimal) HBaseTypeUtils.deserializeToObject(bigDecimalBytes,
    HBaseTypeUtils.getTypeIndex(Types.BIG_DEC), StandardCharsets.UTF_8);

BigInteger deserializedBigInteger = (BigInteger) HBaseTypeUtils.deserializeToObject(bigIntegerBytes,
    HBaseTypeUtils.getTypeIndex(Types.BIG_INT), StandardCharsets.UTF_8);

Type Support Validation

// Check if a type is supported
boolean isStringSupported = HBaseTypeUtils.isSupportedType(String.class);        // true
boolean isIntSupported = HBaseTypeUtils.isSupportedType(Integer.class);          // true
boolean isBooleanSupported = HBaseTypeUtils.isSupportedType(Boolean.class);      // true
boolean isCustomSupported = HBaseTypeUtils.isSupportedType(MyCustomClass.class); // false

// Get type index for supported types
int stringTypeIndex = HBaseTypeUtils.getTypeIndex(Types.STRING);
int intTypeIndex = HBaseTypeUtils.getTypeIndex(Types.INT);
int booleanTypeIndex = HBaseTypeUtils.getTypeIndex(Types.BOOLEAN);

// Validate types before processing
public void validateSchema(HBaseTableSchema schema) {
    String[] families = schema.getFamilyNames();
    for (String family : families) {
        TypeInformation<?>[] types = schema.getQualifierTypes(family);
        for (TypeInformation<?> type : types) {
            if (!HBaseTypeUtils.isSupportedType(type.getTypeClass())) {
                throw new IllegalArgumentException(
                    "Unsupported type in family " + family + ": " + type.getTypeClass().getName());
            }
        }
    }
}

HBaseConfigurationUtil

Utility for serializing and deserializing Hadoop Configuration objects for distributed processing.

class HBaseConfigurationUtil {
    public static byte[] serializeConfiguration(Configuration conf);
    public static Configuration deserializeConfiguration(byte[] serializedConfig, Configuration targetConfig);
}

Configuration Serialization

import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
import org.apache.hadoop.conf.Configuration;

// Create and configure HBase configuration
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
conf.setInt("hbase.client.scanner.caching", 1000);
conf.setLong("hbase.rpc.timeout", 60000);

// Serialize configuration for distribution
byte[] serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);

// Later, deserialize configuration on task managers
Configuration targetConf = new Configuration();
Configuration deserializedConf = HBaseConfigurationUtil.deserializeConfiguration(
    serializedConfig, targetConf);

// Verify configuration was properly deserialized
String zkQuorum = deserializedConf.get("hbase.zookeeper.quorum");
int scannerCaching = deserializedConf.getInt("hbase.client.scanner.caching", 100);
long rpcTimeout = deserializedConf.getLong("hbase.rpc.timeout", 30000);

Configuration Distribution Pattern

// Pattern for distributing HBase configuration in Flink jobs
public class DistributedHBaseProcessor extends RichMapFunction<Row, Row> {
    private byte[] serializedConfig;
    private transient Configuration hbaseConfig;
    private transient Connection hbaseConnection;
    
    public DistributedHBaseProcessor(Configuration config) {
        // Serialize configuration in constructor (on job manager)
        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(config);
    }
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // Deserialize configuration on task manager
        this.hbaseConfig = HBaseConfigurationUtil.deserializeConfiguration(
            serializedConfig, new Configuration());
        
        // Create HBase connection
        this.hbaseConnection = ConnectionFactory.createConnection(hbaseConfig);
    }
    
    @Override
    public Row map(Row value) throws Exception {
        // Use HBase connection for processing
        // ...
        return value;
    }
    
    @Override
    public void close() throws Exception {
        if (hbaseConnection != null) {
            hbaseConnection.close();
        }
        super.close();
    }
}

HBaseReadWriteHelper

Helper class for creating HBase operations and converting between HBase Result objects and Flink Row objects.

class HBaseReadWriteHelper {
    public HBaseReadWriteHelper(HBaseTableSchema hbaseTableSchema);
    
    // Operation creation
    public Get createGet(Object rowKey);
    public Scan createScan();
    public Put createPutMutation(Row row);
    public Delete createDeleteMutation(Row row);
    
    // Result conversion
    public Row parseToRow(Result result);
    public Row parseToRow(Result result, Object rowKey);
}

Read Operations

import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Result;

// Create helper with schema
HBaseTableSchema schema = new HBaseTableSchema();
schema.setRowKey("user_id", String.class);
schema.addColumn("profile", "name", String.class);
schema.addColumn("profile", "age", Integer.class);

HBaseReadWriteHelper helper = new HBaseReadWriteHelper(schema);

// Create Get operation for point queries
String userId = "user123";
Get get = helper.createGet(userId);

// Execute get and parse result
Table table = connection.getTable(TableName.valueOf("users")); 
Result result = table.get(get);
Row userRow = helper.parseToRow(result);

// Create Scan operation for range queries
Scan scan = helper.createScan();
ResultScanner scanner = table.getScanner(scan);

for (Result scanResult : scanner) {
    Row row = helper.parseToRow(scanResult);
    // Process row
    String name = (String) row.getField(1);
    Integer age = (Integer) row.getField(2);
}
scanner.close();

Write Operations

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.flink.types.Row;

// Create Row for insertion
Row userRow = Row.of(
    "user456",           // user_id (row key)
    "Jane Doe",          // name
    28                   // age
);

// Create Put mutation
Put put = helper.createPutMutation(userRow);

// Execute put
table.put(put);

// Create Row for deletion (only row key needed)
Row deleteRow = Row.of("user456", null, null);

// Create Delete mutation
Delete delete = helper.createDeleteMutation(deleteRow);

// Execute delete
table.delete(delete);

Batch Operations

import java.util.List;
import java.util.ArrayList;

// Batch write operations
List<Row> rows = Arrays.asList(
    Row.of("user001", "Alice", 25),
    Row.of("user002", "Bob", 30),
    Row.of("user003", "Charlie", 35)
);

List<Put> puts = new ArrayList<>();
for (Row row : rows) {
    puts.add(helper.createPutMutation(row));
}

// Execute batch put
table.put(puts);

// Batch read operations
List<Get> gets = Arrays.asList(
    helper.createGet("user001"),
    helper.createGet("user002"),
    helper.createGet("user003")
);

Result[] results = table.get(gets);
for (Result result : results) {
    if (!result.isEmpty()) {
        Row row = helper.parseToRow(result);
        // Process row
    }
}

Advanced Utility Patterns

Custom Type Conversion

// Extend HBaseTypeUtils for custom type handling
public class ExtendedTypeUtils {
    
    // Custom serialization for complex types
    public static byte[] serializeJson(Object jsonObject) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            String json = mapper.writeValueAsString(jsonObject);
            return HBaseTypeUtils.serializeFromObject(json, 
                HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);
        } catch (Exception e) {
            throw new RuntimeException("JSON serialization failed", e);
        }
    }
    
    // Custom deserialization for complex types
    public static <T> T deserializeJson(byte[] bytes, Class<T> valueType) {
        try {
            String json = (String) HBaseTypeUtils.deserializeToObject(bytes,
                HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);
            ObjectMapper mapper = new ObjectMapper();
            return mapper.readValue(json, valueType);
        } catch (Exception e) {
            throw new RuntimeException("JSON deserialization failed", e);
        }
    }
}

// Usage example
MyCustomObject obj = new MyCustomObject("value1", 123);
byte[] serialized = ExtendedTypeUtils.serializeJson(obj);
MyCustomObject deserialized = ExtendedTypeUtils.deserializeJson(serialized, MyCustomObject.class);

Configuration Templates

// Utility class for common HBase configurations
public class HBaseConfigTemplates {
    
    public static Configuration createDevelopmentConfig() {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "localhost:2181");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("zookeeper.znode.parent", "/hbase");
        
        // Development-friendly settings
        conf.setLong("hbase.rpc.timeout", 30000);
        conf.setLong("hbase.client.operation.timeout", 60000);
        conf.setInt("hbase.client.retries.number", 3);
        
        return conf;
    }
    
    public static Configuration createProductionConfig(String zkQuorum) {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", zkQuorum);
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("zookeeper.znode.parent", "/hbase");
        
        // Production-optimized settings
        conf.setLong("hbase.rpc.timeout", 120000);
        conf.setLong("hbase.client.operation.timeout", 300000);
        conf.setInt("hbase.client.retries.number", 10);
        conf.setLong("hbase.client.pause", 1000);
        
        // Performance tuning
        conf.setInt("hbase.client.ipc.pool.size", 10);
        conf.setInt("hbase.client.max.total.tasks", 200);
        conf.setInt("hbase.client.max.perserver.tasks", 20);
        
        return conf;
    }
    
    public static Configuration createHighPerformanceConfig(String zkQuorum) {
        Configuration conf = createProductionConfig(zkQuorum);
        
        // High-performance settings
        conf.setInt("hbase.client.scanner.caching", 2000);
        conf.setLong("hbase.client.scanner.max.result.size", 8 * 1024 * 1024);
        conf.setBoolean("hbase.client.scanner.async.prefetch", true);
        conf.setLong("hbase.client.write.buffer", 16 * 1024 * 1024);
        
        return conf;
    }
}

Schema Validation Utilities

// Comprehensive schema validation
public class SchemaValidationUtils {
    
    public static void validateTableSchema(HBaseTableSchema schema) {
        validateRowKey(schema);
        validateColumnFamilies(schema);
        validateDataTypes(schema);
        validateCharset(schema);
    }
    
    private static void validateRowKey(HBaseTableSchema schema) {
        if (schema.getRowKeyIndex() < 0) {
            throw new IllegalArgumentException("Row key must be defined");
        }
        
        Optional<TypeInformation<?>> rowKeyType = schema.getRowKeyTypeInfo();
        if (!rowKeyType.isPresent()) {
            throw new IllegalArgumentException("Row key type information missing");
        }
        
        if (!HBaseTypeUtils.isSupportedType(rowKeyType.get().getTypeClass())) {
            throw new IllegalArgumentException("Unsupported row key type: " + 
                rowKeyType.get().getTypeClass().getSimpleName());
        }
    }
    
    private static void validateColumnFamilies(HBaseTableSchema schema) {
        String[] families = schema.getFamilyNames();
        if (families == null || families.length == 0) {
            throw new IllegalArgumentException("At least one column family must be defined");
        }
        
        for (String family : families) {
            if (family == null || family.trim().isEmpty()) {
                throw new IllegalArgumentException("Column family name cannot be null or empty");
            }
            
            byte[][] qualifiers = schema.getQualifierKeys(family);
            if (qualifiers == null || qualifiers.length == 0) {
                throw new IllegalArgumentException("Column family '" + family + "' has no qualifiers");
            }
        }
    }
    
    private static void validateDataTypes(HBaseTableSchema schema) {
        String[] families = schema.getFamilyNames();
        for (String family : families) {
            TypeInformation<?>[] types = schema.getQualifierTypes(family);
            for (TypeInformation<?> type : types) {
                if (!HBaseTypeUtils.isSupportedType(type.getTypeClass())) {
                    throw new IllegalArgumentException("Unsupported type in family '" + 
                        family + "': " + type.getTypeClass().getSimpleName());
                }
            }
        }
    }
    
    private static void validateCharset(HBaseTableSchema schema) {
        String charset = schema.getStringCharset();
        try {
            Charset.forName(charset);
        } catch (Exception e) {
            throw new IllegalArgumentException("Invalid charset: " + charset, e);
        }
    }
}

Performance Monitoring Utilities

// Utility for monitoring HBase operations
public class HBaseMonitoringUtils {
    
    public static class OperationTimer implements AutoCloseable {
        private final String operationName;
        private final long startTime;
        private final Histogram latencyHistogram;
        private final Counter operationCounter;
        
        public OperationTimer(String operationName, MetricGroup metricGroup) {
            this.operationName = operationName;
            this.startTime = System.currentTimeMillis();
            this.latencyHistogram = metricGroup.histogram(operationName + "_latency");
            this.operationCounter = metricGroup.counter(operationName + "_count");
            operationCounter.inc();
        }
        
        @Override
        public void close() {
            long duration = System.currentTimeMillis() - startTime;
            latencyHistogram.update(duration);
        }
    }
    
    // Usage in HBase operations
    public Row performMonitoredGet(String rowKey, MetricGroup metricGroup) {
        try (OperationTimer timer = new OperationTimer("hbase_get", metricGroup)) {
            Get get = helper.createGet(rowKey);
            Result result = table.get(get);
            return helper.parseToRow(result);
        } catch (Exception e) {
            metricGroup.counter("hbase_get_errors").inc();
            throw new RuntimeException("HBase get failed", e);
        }
    }
}

Data Type Reference

Complete Type Support Matrix

Java TypeHBase StorageType IndexNotes
Stringbyte[]STRINGUTF-8/configurable encoding
byte[]byte[]PRIMITIVE_ARRAYDirect binary storage
Bytebyte[]BYTESingle byte value
Shortbyte[]SHORT2-byte big-endian
Integerbyte[]INT4-byte big-endian
Longbyte[]LONG8-byte big-endian
Floatbyte[]FLOATIEEE 754 format
Doublebyte[]DOUBLEIEEE 754 format
Booleanbyte[]BOOLEANSingle byte (0/1)
java.sql.Datebyte[]SQL_DATELong timestamp
java.sql.Timebyte[]SQL_TIMELong timestamp
java.sql.Timestampbyte[]SQL_TIMESTAMPLong timestamp
java.math.BigDecimalbyte[]BIG_DECString representation
java.math.BigIntegerbyte[]BIG_INTString representation

Character Encoding Support

// Supported character encodings
Charset utf8 = StandardCharsets.UTF_8;           // Default
Charset utf16 = StandardCharsets.UTF_16;         // Unicode
Charset iso88591 = StandardCharsets.ISO_8859_1;  // Latin-1
Charset ascii = StandardCharsets.US_ASCII;       // ASCII

// Usage in schema
HBaseTableSchema schema = new HBaseTableSchema();
schema.setCharset("UTF-8");  // Set encoding for string types