or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md
tile.json

schema-config.mddocs/

Schema and Configuration

The HBase connector provides comprehensive schema definition and configuration classes to map between Flink data types and HBase column families and qualifiers. This includes connection configuration, write performance tuning, and character encoding options.

HBaseTableSchema

The central class for defining the mapping between Flink table schema and HBase table structure.

class HBaseTableSchema {
    public HBaseTableSchema();
    
    // Schema definition
    public void addColumn(String family, String qualifier, Class<?> clazz);
    public void setRowKey(String rowKeyName, Class<?> clazz);
    public void setCharset(String charset);
    
    // Schema introspection
    public String[] getFamilyNames();
    public byte[][] getFamilyKeys();
    public byte[][] getQualifierKeys(String family);
    public TypeInformation<?>[] getQualifierTypes(String family);
    public String getStringCharset();
    public int getRowKeyIndex();
    public Optional<TypeInformation<?>> getRowKeyTypeInfo();
}

Basic Schema Definition

import org.apache.flink.addons.hbase.HBaseTableSchema;

// Create schema for user profile table
HBaseTableSchema schema = new HBaseTableSchema();

// Define row key
schema.setRowKey("user_id", String.class);

// Define column families and qualifiers
schema.addColumn("personal", "first_name", String.class);
schema.addColumn("personal", "last_name", String.class);
schema.addColumn("personal", "birth_date", java.sql.Date.class);
schema.addColumn("personal", "age", Integer.class);

schema.addColumn("contact", "email", String.class);
schema.addColumn("contact", "phone", String.class);
schema.addColumn("contact", "address", String.class);

schema.addColumn("activity", "last_login", java.sql.Timestamp.class);
schema.addColumn("activity", "login_count", Long.class);
schema.addColumn("activity", "is_active", Boolean.class);

schema.addColumn("preferences", "settings", String.class); // JSON as string
schema.addColumn("data", "profile_picture", byte[].class); // Binary data

Character Encoding Configuration

// Set character encoding for string serialization
HBaseTableSchema schema = new HBaseTableSchema();
schema.setCharset("UTF-8"); // Default encoding
// schema.setCharset("ISO-8859-1"); // Alternative encoding

// Add string columns that will use the specified charset
schema.setRowKey("user_id", String.class);
schema.addColumn("profile", "name", String.class);
schema.addColumn("profile", "description", String.class);

// Binary data is not affected by charset setting
schema.addColumn("data", "binary_content", byte[].class);

Schema Introspection

// Examine schema structure
HBaseTableSchema schema = // ... configured schema

// Get all column families
String[] families = schema.getFamilyNames();
System.out.println("Column families: " + Arrays.toString(families));

// Get qualifiers for a specific family
byte[][] qualifiers = schema.getQualifierKeys("personal");
for (byte[] qualifier : qualifiers) {
    System.out.println("Qualifier: " + Bytes.toString(qualifier));
}

// Get types for a family's qualifiers
TypeInformation<?>[] types = schema.getQualifierTypes("personal");
for (int i = 0; i < types.length; i++) {
    System.out.println("Type: " + types[i].getTypeClass().getSimpleName());
}

// Check row key configuration
int rowKeyIndex = schema.getRowKeyIndex();
Optional<TypeInformation<?>> rowKeyType = schema.getRowKeyTypeInfo();
if (rowKeyType.isPresent()) {
    System.out.println("Row key type: " + rowKeyType.get().getTypeClass().getSimpleName());
}

HBaseOptions

Connection and basic configuration options for HBase connectivity.

class HBaseOptions {
    public static Builder builder();
    
    static class Builder {
        public Builder setTableName(String tableName);        // Required
        public Builder setZkQuorum(String zkQuorum);          // Required
        public Builder setZkNodeParent(String zkNodeParent);  // Optional, default: "/hbase"
        public HBaseOptions build();
    }
}

Basic Connection Configuration

import org.apache.flink.addons.hbase.HBaseOptions;

// Simple configuration
HBaseOptions basicOptions = HBaseOptions.builder()
    .setTableName("user_profiles")
    .setZkQuorum("localhost:2181")
    .build();

// Production configuration with multiple ZooKeeper nodes
HBaseOptions productionOptions = HBaseOptions.builder()
    .setTableName("user_events")
    .setZkQuorum("zk1.prod.com:2181,zk2.prod.com:2181,zk3.prod.com:2181")
    .setZkNodeParent("/hbase-prod") // Custom ZNode parent
    .build();

// Configuration with custom ZooKeeper port
HBaseOptions customPortOptions = HBaseOptions.builder()
    .setTableName("analytics_data")
    .setZkQuorum("zk-cluster:2182") // Non-standard port
    .setZkNodeParent("/hbase")
    .build();

HBaseWriteOptions

Performance tuning options for write operations with buffering configuration.

class HBaseWriteOptions {
    public static Builder builder();
    
    static class Builder {
        public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes);
        public Builder setBufferFlushMaxRows(long bufferFlushMaxRows);
        public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis);
        public HBaseWriteOptions build();
    }
}

Write Performance Configuration

import org.apache.flink.addons.hbase.HBaseWriteOptions;

// High-throughput configuration
HBaseWriteOptions highThroughputOptions = HBaseWriteOptions.builder()
    .setBufferFlushMaxSizeInBytes(16 * 1024 * 1024) // 16MB buffer
    .setBufferFlushMaxRows(10000)                   // 10,000 mutations per batch
    .setBufferFlushIntervalMillis(30000)            // 30 second flush interval
    .build();

// Low-latency configuration  
HBaseWriteOptions lowLatencyOptions = HBaseWriteOptions.builder()
    .setBufferFlushMaxSizeInBytes(512 * 1024)       // 512KB buffer
    .setBufferFlushMaxRows(100)                     // 100 mutations per batch
    .setBufferFlushIntervalMillis(1000)             // 1 second flush interval
    .build();

// Balanced configuration
HBaseWriteOptions balancedOptions = HBaseWriteOptions.builder()
    .setBufferFlushMaxSizeInBytes(4 * 1024 * 1024)  // 4MB buffer
    .setBufferFlushMaxRows(2000)                    // 2,000 mutations per batch
    .setBufferFlushIntervalMillis(5000)             // 5 second flush interval
    .build();

// Memory-constrained configuration
HBaseWriteOptions memoryConstrainedOptions = HBaseWriteOptions.builder()
    .setBufferFlushMaxSizeInBytes(256 * 1024)       // 256KB buffer
    .setBufferFlushMaxRows(50)                      // 50 mutations per batch
    .setBufferFlushIntervalMillis(2000)             // 2 second flush interval
    .build();

Hadoop Configuration

Advanced HBase configuration using Hadoop Configuration objects for fine-grained control.

Basic Hadoop Configuration

import org.apache.hadoop.conf.Configuration;

// Create and configure Hadoop Configuration
Configuration conf = new Configuration();

// Basic connection settings
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");

// Cluster configuration
conf.set("hbase.cluster.distributed", "true");
conf.set("hbase.master", "hbase-master:60000");

Performance Tuning Configuration

// Client-side performance tuning
Configuration perfConf = new Configuration();

// Connection settings
perfConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");

// Write performance
perfConf.setLong("hbase.client.write.buffer", 8 * 1024 * 1024); // 8MB write buffer
perfConf.setInt("hbase.client.max.total.tasks", 200);           // Max concurrent tasks
perfConf.setInt("hbase.client.max.perserver.tasks", 20);        // Max tasks per server
perfConf.setInt("hbase.client.max.perregion.tasks", 5);         // Max tasks per region

// Read performance
perfConf.setInt("hbase.client.scanner.caching", 1000);          // Scanner caching
perfConf.setLong("hbase.client.scanner.max.result.size", 4 * 1024 * 1024); // 4MB max result
perfConf.setBoolean("hbase.client.scanner.async.prefetch", true); // Async prefetch

// Connection pool
perfConf.setInt("hbase.client.ipc.pool.size", 10);              // Connection pool size
perfConf.setInt("hbase.client.ipc.pool.type", 1);               // RoundRobin pool

// Timeout settings
perfConf.setLong("hbase.rpc.timeout", 120000);                  // 2 minute RPC timeout
perfConf.setLong("hbase.client.operation.timeout", 300000);     // 5 minute operation timeout
perfConf.setLong("hbase.client.scanner.timeout.period", 600000); // 10 minute scanner timeout

// Retry settings
perfConf.setInt("hbase.client.retries.number", 10);             // Max retries
perfConf.setLong("hbase.client.pause", 200);                    // Retry pause (ms)
perfConf.setLong("hbase.client.pause.cqtbe", 1000);             // Quota exceeded pause

Security Configuration

// Kerberos authentication
Configuration secureConf = new Configuration();
secureConf.set("hbase.zookeeper.quorum", "secure-zk1:2181,secure-zk2:2181");

// Enable security
secureConf.set("hbase.security.authentication", "kerberos");
secureConf.set("hbase.security.authorization", "true");
secureConf.set("hbase.master.kerberos.principal", "hbase/_HOST@REALM.COM");
secureConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@REALM.COM");

// Client principal and keytab
secureConf.set("hbase.client.kerberos.principal", "flink-user@REALM.COM");
secureConf.set("hbase.client.keytab.file", "/path/to/flink-user.keytab");

// HDFS security (if applicable)
secureConf.set("dfs.nameservices", "hdfs-cluster");
secureConf.set("hadoop.security.authentication", "kerberos");

Data Type Mapping

Comprehensive mapping between Java types and HBase storage formats.

Supported Data Types

HBaseTableSchema schema = new HBaseTableSchema();

// Primitive types
schema.addColumn("primitives", "byte_val", Byte.class);
schema.addColumn("primitives", "short_val", Short.class);
schema.addColumn("primitives", "int_val", Integer.class);
schema.addColumn("primitives", "long_val", Long.class);
schema.addColumn("primitives", "float_val", Float.class);
schema.addColumn("primitives", "double_val", Double.class);
schema.addColumn("primitives", "boolean_val", Boolean.class);

// String and binary types
schema.addColumn("text", "string_val", String.class);
schema.addColumn("binary", "byte_array", byte[].class);

// Temporal types
schema.addColumn("time", "timestamp_val", java.sql.Timestamp.class);
schema.addColumn("time", "date_val", java.sql.Date.class);
schema.addColumn("time", "time_val", java.sql.Time.class);

// Numeric types
schema.addColumn("numbers", "decimal_val", java.math.BigDecimal.class);
schema.addColumn("numbers", "bigint_val", java.math.BigInteger.class);

Type Conversion Examples

// Example data insertion with proper types
Row userRow = Row.of(
    "user123",                                    // String row key
    "John",                                       // String (first_name)
    "Doe",                                        // String (last_name) 
    Date.valueOf("1990-05-15"),                   // Date (birth_date)
    33,                                           // Integer (age)
    "john.doe@email.com",                         // String (email)
    "+1-555-123-4567",                           // String (phone)
    true,                                         // Boolean (is_active)
    new Timestamp(System.currentTimeMillis()),    // Timestamp (last_login)
    1247L,                                        // Long (login_count)
    new BigDecimal("99.99"),                      // BigDecimal (balance)
    "profile_data".getBytes("UTF-8")              // byte[] (binary_data)
);

Configuration Patterns

Environment-Specific Configuration

// Development environment
public static HBaseOptions createDevConfig(String tableName) {
    return HBaseOptions.builder()
        .setTableName(tableName)
        .setZkQuorum("localhost:2181")
        .setZkNodeParent("/hbase")
        .build();
}

// Staging environment
public static HBaseOptions createStagingConfig(String tableName) {
    return HBaseOptions.builder()
        .setTableName("staging_" + tableName)
        .setZkQuorum("staging-zk1:2181,staging-zk2:2181")
        .setZkNodeParent("/hbase-staging")
        .build();
}

// Production environment
public static HBaseOptions createProdConfig(String tableName) {
    return HBaseOptions.builder()
        .setTableName("prod_" + tableName)
        .setZkQuorum("prod-zk1:2181,prod-zk2:2181,prod-zk3:2181")
        .setZkNodeParent("/hbase-prod")
        .build();
}

Workload-Specific Write Options

// Real-time analytics workload
public static HBaseWriteOptions createRealTimeWriteOptions() {
    return HBaseWriteOptions.builder()
        .setBufferFlushMaxSizeInBytes(1 * 1024 * 1024)  // 1MB - small buffer
        .setBufferFlushMaxRows(500)                     // 500 mutations
        .setBufferFlushIntervalMillis(2000)             // 2 second interval
        .build();
}

// Batch processing workload
public static HBaseWriteOptions createBatchWriteOptions() {
    return HBaseWriteOptions.builder()
        .setBufferFlushMaxSizeInBytes(32 * 1024 * 1024) // 32MB - large buffer
        .setBufferFlushMaxRows(20000)                   // 20,000 mutations
        .setBufferFlushIntervalMillis(60000)            // 60 second interval
        .build();
}

// Mixed workload
public static HBaseWriteOptions createMixedWriteOptions() {
    return HBaseWriteOptions.builder()
        .setBufferFlushMaxSizeInBytes(8 * 1024 * 1024)  // 8MB buffer
        .setBufferFlushMaxRows(4000)                    // 4,000 mutations
        .setBufferFlushIntervalMillis(10000)            // 10 second interval  
        .build();
}

Schema Evolution and Versioning

Adding New Columns

// Original schema
HBaseTableSchema v1Schema = new HBaseTableSchema();
v1Schema.setRowKey("user_id", String.class);
v1Schema.addColumn("profile", "name", String.class);
v1Schema.addColumn("profile", "email", String.class);

// Evolved schema - adding new columns
HBaseTableSchema v2Schema = new HBaseTableSchema();
v2Schema.setRowKey("user_id", String.class);
v2Schema.addColumn("profile", "name", String.class);
v2Schema.addColumn("profile", "email", String.class);
// New columns
v2Schema.addColumn("profile", "phone", String.class);        // New optional field
v2Schema.addColumn("preferences", "theme", String.class);    // New column family
v2Schema.addColumn("activity", "last_seen", java.sql.Timestamp.class); // New activity tracking

Handling Schema Changes

// Schema-aware processing that handles missing columns gracefully
public class SchemaAwareProcessor {
    
    public void processRow(Row row, HBaseTableSchema schema) {
        // Always present fields
        String userId = (String) row.getField(schema.getRowKeyIndex());
        
        // Handle potentially missing fields
        String name = getFieldSafely(row, schema, "profile", "name", String.class);
        String phone = getFieldSafely(row, schema, "profile", "phone", String.class);
        
        // Use defaults for missing fields
        if (phone == null) {
            phone = "N/A";
        }
        
        // Process with null-safe logic
        processUserData(userId, name, phone);
    }
    
    private <T> T getFieldSafely(Row row, HBaseTableSchema schema, 
                                 String family, String qualifier, Class<T> type) {
        try {
            // Implementation would need schema introspection to find field index
            // This is a conceptual example
            return type.cast(row.getField(findFieldIndex(schema, family, qualifier)));
        } catch (Exception e) {
            return null; // Field not present in this schema version
        }
    }
}

Configuration Validation

Schema Validation

public class SchemaValidator {
    
    public static void validateSchema(HBaseTableSchema schema) {
        // Check row key is defined
        if (schema.getRowKeyIndex() < 0) {
            throw new IllegalArgumentException("Row key must be defined");
        }
        
        // Check at least one column family exists
        String[] families = schema.getFamilyNames();
        if (families == null || families.length == 0) {
            throw new IllegalArgumentException("At least one column family must be defined");
        }
        
        // Validate supported types
        for (String family : families) {
            TypeInformation<?>[] types = schema.getQualifierTypes(family);
            for (TypeInformation<?> type : types) {
                if (!HBaseTypeUtils.isSupportedType(type.getTypeClass())) {
                    throw new IllegalArgumentException(
                        "Unsupported type: " + type.getTypeClass().getSimpleName());
                }
            }
        }
    }
}

Connection Validation

public class ConnectionValidator {
    
    public static void validateConfiguration(Configuration conf) {
        // Check required properties
        String zkQuorum = conf.get("hbase.zookeeper.quorum");
        if (zkQuorum == null || zkQuorum.trim().isEmpty()) {
            throw new IllegalArgumentException("hbase.zookeeper.quorum must be set");
        }
        
        // Validate ZooKeeper addresses
        String[] zkNodes = zkQuorum.split(",");
        for (String node : zkNodes) {
            if (!isValidZkAddress(node.trim())) {
                throw new IllegalArgumentException("Invalid ZooKeeper address: " + node);
            }
        }
    }
    
    private static boolean isValidZkAddress(String address) {
        // Basic validation for hostname:port format
        return address.matches("^[a-zA-Z0-9.-]+:[0-9]+$");
    }
}