The HBase connector provides comprehensive schema definition and configuration classes to map between Flink data types and HBase column families and qualifiers. This includes connection configuration, write performance tuning, and character encoding options.
The central class for defining the mapping between Flink table schema and HBase table structure.
class HBaseTableSchema {
public HBaseTableSchema();
// Schema definition
public void addColumn(String family, String qualifier, Class<?> clazz);
public void setRowKey(String rowKeyName, Class<?> clazz);
public void setCharset(String charset);
// Schema introspection
public String[] getFamilyNames();
public byte[][] getFamilyKeys();
public byte[][] getQualifierKeys(String family);
public TypeInformation<?>[] getQualifierTypes(String family);
public String getStringCharset();
public int getRowKeyIndex();
public Optional<TypeInformation<?>> getRowKeyTypeInfo();
}import org.apache.flink.addons.hbase.HBaseTableSchema;
// Create schema for user profile table
HBaseTableSchema schema = new HBaseTableSchema();
// Define row key
schema.setRowKey("user_id", String.class);
// Define column families and qualifiers
schema.addColumn("personal", "first_name", String.class);
schema.addColumn("personal", "last_name", String.class);
schema.addColumn("personal", "birth_date", java.sql.Date.class);
schema.addColumn("personal", "age", Integer.class);
schema.addColumn("contact", "email", String.class);
schema.addColumn("contact", "phone", String.class);
schema.addColumn("contact", "address", String.class);
schema.addColumn("activity", "last_login", java.sql.Timestamp.class);
schema.addColumn("activity", "login_count", Long.class);
schema.addColumn("activity", "is_active", Boolean.class);
schema.addColumn("preferences", "settings", String.class); // JSON as string
schema.addColumn("data", "profile_picture", byte[].class); // Binary data// Set character encoding for string serialization
HBaseTableSchema schema = new HBaseTableSchema();
schema.setCharset("UTF-8"); // Default encoding
// schema.setCharset("ISO-8859-1"); // Alternative encoding
// Add string columns that will use the specified charset
schema.setRowKey("user_id", String.class);
schema.addColumn("profile", "name", String.class);
schema.addColumn("profile", "description", String.class);
// Binary data is not affected by charset setting
schema.addColumn("data", "binary_content", byte[].class);// Examine schema structure
HBaseTableSchema schema = // ... configured schema
// Get all column families
String[] families = schema.getFamilyNames();
System.out.println("Column families: " + Arrays.toString(families));
// Get qualifiers for a specific family
byte[][] qualifiers = schema.getQualifierKeys("personal");
for (byte[] qualifier : qualifiers) {
System.out.println("Qualifier: " + Bytes.toString(qualifier));
}
// Get types for a family's qualifiers
TypeInformation<?>[] types = schema.getQualifierTypes("personal");
for (int i = 0; i < types.length; i++) {
System.out.println("Type: " + types[i].getTypeClass().getSimpleName());
}
// Check row key configuration
int rowKeyIndex = schema.getRowKeyIndex();
Optional<TypeInformation<?>> rowKeyType = schema.getRowKeyTypeInfo();
if (rowKeyType.isPresent()) {
System.out.println("Row key type: " + rowKeyType.get().getTypeClass().getSimpleName());
}Connection and basic configuration options for HBase connectivity.
class HBaseOptions {
public static Builder builder();
static class Builder {
public Builder setTableName(String tableName); // Required
public Builder setZkQuorum(String zkQuorum); // Required
public Builder setZkNodeParent(String zkNodeParent); // Optional, default: "/hbase"
public HBaseOptions build();
}
}import org.apache.flink.addons.hbase.HBaseOptions;
// Simple configuration
HBaseOptions basicOptions = HBaseOptions.builder()
.setTableName("user_profiles")
.setZkQuorum("localhost:2181")
.build();
// Production configuration with multiple ZooKeeper nodes
HBaseOptions productionOptions = HBaseOptions.builder()
.setTableName("user_events")
.setZkQuorum("zk1.prod.com:2181,zk2.prod.com:2181,zk3.prod.com:2181")
.setZkNodeParent("/hbase-prod") // Custom ZNode parent
.build();
// Configuration with custom ZooKeeper port
HBaseOptions customPortOptions = HBaseOptions.builder()
.setTableName("analytics_data")
.setZkQuorum("zk-cluster:2182") // Non-standard port
.setZkNodeParent("/hbase")
.build();Performance tuning options for write operations with buffering configuration.
class HBaseWriteOptions {
public static Builder builder();
static class Builder {
public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes);
public Builder setBufferFlushMaxRows(long bufferFlushMaxRows);
public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis);
public HBaseWriteOptions build();
}
}import org.apache.flink.addons.hbase.HBaseWriteOptions;
// High-throughput configuration
HBaseWriteOptions highThroughputOptions = HBaseWriteOptions.builder()
.setBufferFlushMaxSizeInBytes(16 * 1024 * 1024) // 16MB buffer
.setBufferFlushMaxRows(10000) // 10,000 mutations per batch
.setBufferFlushIntervalMillis(30000) // 30 second flush interval
.build();
// Low-latency configuration
HBaseWriteOptions lowLatencyOptions = HBaseWriteOptions.builder()
.setBufferFlushMaxSizeInBytes(512 * 1024) // 512KB buffer
.setBufferFlushMaxRows(100) // 100 mutations per batch
.setBufferFlushIntervalMillis(1000) // 1 second flush interval
.build();
// Balanced configuration
HBaseWriteOptions balancedOptions = HBaseWriteOptions.builder()
.setBufferFlushMaxSizeInBytes(4 * 1024 * 1024) // 4MB buffer
.setBufferFlushMaxRows(2000) // 2,000 mutations per batch
.setBufferFlushIntervalMillis(5000) // 5 second flush interval
.build();
// Memory-constrained configuration
HBaseWriteOptions memoryConstrainedOptions = HBaseWriteOptions.builder()
.setBufferFlushMaxSizeInBytes(256 * 1024) // 256KB buffer
.setBufferFlushMaxRows(50) // 50 mutations per batch
.setBufferFlushIntervalMillis(2000) // 2 second flush interval
.build();Advanced HBase configuration using Hadoop Configuration objects for fine-grained control.
import org.apache.hadoop.conf.Configuration;
// Create and configure Hadoop Configuration
Configuration conf = new Configuration();
// Basic connection settings
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
// Cluster configuration
conf.set("hbase.cluster.distributed", "true");
conf.set("hbase.master", "hbase-master:60000");// Client-side performance tuning
Configuration perfConf = new Configuration();
// Connection settings
perfConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
// Write performance
perfConf.setLong("hbase.client.write.buffer", 8 * 1024 * 1024); // 8MB write buffer
perfConf.setInt("hbase.client.max.total.tasks", 200); // Max concurrent tasks
perfConf.setInt("hbase.client.max.perserver.tasks", 20); // Max tasks per server
perfConf.setInt("hbase.client.max.perregion.tasks", 5); // Max tasks per region
// Read performance
perfConf.setInt("hbase.client.scanner.caching", 1000); // Scanner caching
perfConf.setLong("hbase.client.scanner.max.result.size", 4 * 1024 * 1024); // 4MB max result
perfConf.setBoolean("hbase.client.scanner.async.prefetch", true); // Async prefetch
// Connection pool
perfConf.setInt("hbase.client.ipc.pool.size", 10); // Connection pool size
perfConf.setInt("hbase.client.ipc.pool.type", 1); // RoundRobin pool
// Timeout settings
perfConf.setLong("hbase.rpc.timeout", 120000); // 2 minute RPC timeout
perfConf.setLong("hbase.client.operation.timeout", 300000); // 5 minute operation timeout
perfConf.setLong("hbase.client.scanner.timeout.period", 600000); // 10 minute scanner timeout
// Retry settings
perfConf.setInt("hbase.client.retries.number", 10); // Max retries
perfConf.setLong("hbase.client.pause", 200); // Retry pause (ms)
perfConf.setLong("hbase.client.pause.cqtbe", 1000); // Quota exceeded pause// Kerberos authentication
Configuration secureConf = new Configuration();
secureConf.set("hbase.zookeeper.quorum", "secure-zk1:2181,secure-zk2:2181");
// Enable security
secureConf.set("hbase.security.authentication", "kerberos");
secureConf.set("hbase.security.authorization", "true");
secureConf.set("hbase.master.kerberos.principal", "hbase/_HOST@REALM.COM");
secureConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@REALM.COM");
// Client principal and keytab
secureConf.set("hbase.client.kerberos.principal", "flink-user@REALM.COM");
secureConf.set("hbase.client.keytab.file", "/path/to/flink-user.keytab");
// HDFS security (if applicable)
secureConf.set("dfs.nameservices", "hdfs-cluster");
secureConf.set("hadoop.security.authentication", "kerberos");Comprehensive mapping between Java types and HBase storage formats.
HBaseTableSchema schema = new HBaseTableSchema();
// Primitive types
schema.addColumn("primitives", "byte_val", Byte.class);
schema.addColumn("primitives", "short_val", Short.class);
schema.addColumn("primitives", "int_val", Integer.class);
schema.addColumn("primitives", "long_val", Long.class);
schema.addColumn("primitives", "float_val", Float.class);
schema.addColumn("primitives", "double_val", Double.class);
schema.addColumn("primitives", "boolean_val", Boolean.class);
// String and binary types
schema.addColumn("text", "string_val", String.class);
schema.addColumn("binary", "byte_array", byte[].class);
// Temporal types
schema.addColumn("time", "timestamp_val", java.sql.Timestamp.class);
schema.addColumn("time", "date_val", java.sql.Date.class);
schema.addColumn("time", "time_val", java.sql.Time.class);
// Numeric types
schema.addColumn("numbers", "decimal_val", java.math.BigDecimal.class);
schema.addColumn("numbers", "bigint_val", java.math.BigInteger.class);// Example data insertion with proper types
Row userRow = Row.of(
"user123", // String row key
"John", // String (first_name)
"Doe", // String (last_name)
Date.valueOf("1990-05-15"), // Date (birth_date)
33, // Integer (age)
"john.doe@email.com", // String (email)
"+1-555-123-4567", // String (phone)
true, // Boolean (is_active)
new Timestamp(System.currentTimeMillis()), // Timestamp (last_login)
1247L, // Long (login_count)
new BigDecimal("99.99"), // BigDecimal (balance)
"profile_data".getBytes("UTF-8") // byte[] (binary_data)
);// Development environment
public static HBaseOptions createDevConfig(String tableName) {
return HBaseOptions.builder()
.setTableName(tableName)
.setZkQuorum("localhost:2181")
.setZkNodeParent("/hbase")
.build();
}
// Staging environment
public static HBaseOptions createStagingConfig(String tableName) {
return HBaseOptions.builder()
.setTableName("staging_" + tableName)
.setZkQuorum("staging-zk1:2181,staging-zk2:2181")
.setZkNodeParent("/hbase-staging")
.build();
}
// Production environment
public static HBaseOptions createProdConfig(String tableName) {
return HBaseOptions.builder()
.setTableName("prod_" + tableName)
.setZkQuorum("prod-zk1:2181,prod-zk2:2181,prod-zk3:2181")
.setZkNodeParent("/hbase-prod")
.build();
}// Real-time analytics workload
public static HBaseWriteOptions createRealTimeWriteOptions() {
return HBaseWriteOptions.builder()
.setBufferFlushMaxSizeInBytes(1 * 1024 * 1024) // 1MB - small buffer
.setBufferFlushMaxRows(500) // 500 mutations
.setBufferFlushIntervalMillis(2000) // 2 second interval
.build();
}
// Batch processing workload
public static HBaseWriteOptions createBatchWriteOptions() {
return HBaseWriteOptions.builder()
.setBufferFlushMaxSizeInBytes(32 * 1024 * 1024) // 32MB - large buffer
.setBufferFlushMaxRows(20000) // 20,000 mutations
.setBufferFlushIntervalMillis(60000) // 60 second interval
.build();
}
// Mixed workload
public static HBaseWriteOptions createMixedWriteOptions() {
return HBaseWriteOptions.builder()
.setBufferFlushMaxSizeInBytes(8 * 1024 * 1024) // 8MB buffer
.setBufferFlushMaxRows(4000) // 4,000 mutations
.setBufferFlushIntervalMillis(10000) // 10 second interval
.build();
}// Original schema
HBaseTableSchema v1Schema = new HBaseTableSchema();
v1Schema.setRowKey("user_id", String.class);
v1Schema.addColumn("profile", "name", String.class);
v1Schema.addColumn("profile", "email", String.class);
// Evolved schema - adding new columns
HBaseTableSchema v2Schema = new HBaseTableSchema();
v2Schema.setRowKey("user_id", String.class);
v2Schema.addColumn("profile", "name", String.class);
v2Schema.addColumn("profile", "email", String.class);
// New columns
v2Schema.addColumn("profile", "phone", String.class); // New optional field
v2Schema.addColumn("preferences", "theme", String.class); // New column family
v2Schema.addColumn("activity", "last_seen", java.sql.Timestamp.class); // New activity tracking// Schema-aware processing that handles missing columns gracefully
public class SchemaAwareProcessor {
public void processRow(Row row, HBaseTableSchema schema) {
// Always present fields
String userId = (String) row.getField(schema.getRowKeyIndex());
// Handle potentially missing fields
String name = getFieldSafely(row, schema, "profile", "name", String.class);
String phone = getFieldSafely(row, schema, "profile", "phone", String.class);
// Use defaults for missing fields
if (phone == null) {
phone = "N/A";
}
// Process with null-safe logic
processUserData(userId, name, phone);
}
private <T> T getFieldSafely(Row row, HBaseTableSchema schema,
String family, String qualifier, Class<T> type) {
try {
// Implementation would need schema introspection to find field index
// This is a conceptual example
return type.cast(row.getField(findFieldIndex(schema, family, qualifier)));
} catch (Exception e) {
return null; // Field not present in this schema version
}
}
}public class SchemaValidator {
public static void validateSchema(HBaseTableSchema schema) {
// Check row key is defined
if (schema.getRowKeyIndex() < 0) {
throw new IllegalArgumentException("Row key must be defined");
}
// Check at least one column family exists
String[] families = schema.getFamilyNames();
if (families == null || families.length == 0) {
throw new IllegalArgumentException("At least one column family must be defined");
}
// Validate supported types
for (String family : families) {
TypeInformation<?>[] types = schema.getQualifierTypes(family);
for (TypeInformation<?> type : types) {
if (!HBaseTypeUtils.isSupportedType(type.getTypeClass())) {
throw new IllegalArgumentException(
"Unsupported type: " + type.getTypeClass().getSimpleName());
}
}
}
}
}public class ConnectionValidator {
public static void validateConfiguration(Configuration conf) {
// Check required properties
String zkQuorum = conf.get("hbase.zookeeper.quorum");
if (zkQuorum == null || zkQuorum.trim().isEmpty()) {
throw new IllegalArgumentException("hbase.zookeeper.quorum must be set");
}
// Validate ZooKeeper addresses
String[] zkNodes = zkQuorum.split(",");
for (String node : zkNodes) {
if (!isValidZkAddress(node.trim())) {
throw new IllegalArgumentException("Invalid ZooKeeper address: " + node);
}
}
}
private static boolean isValidZkAddress(String address) {
// Basic validation for hostname:port format
return address.matches("^[a-zA-Z0-9.-]+:[0-9]+$");
}
}