0
# Utilities
1
2
The HBase connector provides utility classes for type conversion, configuration management, and HBase operation helpers. These utilities handle the low-level details of data serialization, configuration serialization, and HBase client operations.
3
4
## HBaseTypeUtils
5
6
Utility class for converting between Java objects and HBase byte arrays with support for various data types and character encodings.
7
8
```java { .api }
9
class HBaseTypeUtils {
10
// Core conversion methods
11
public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset);
12
public static byte[] serializeFromObject(Object value, int typeIdx, Charset stringCharset);
13
14
// Type system utilities
15
public static int getTypeIndex(TypeInformation typeInfo);
16
public static boolean isSupportedType(Class<?> clazz);
17
}
18
```
19
20
### Type Serialization and Deserialization
21
22
```java
23
import org.apache.flink.addons.hbase.util.HBaseTypeUtils;
24
import java.nio.charset.StandardCharsets;
25
26
// Serialize Java objects to HBase byte arrays
27
String stringValue = "Hello World";
28
byte[] stringBytes = HBaseTypeUtils.serializeFromObject(stringValue,
29
HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);
30
31
Integer intValue = 42;
32
byte[] intBytes = HBaseTypeUtils.serializeFromObject(intValue,
33
HBaseTypeUtils.getTypeIndex(Types.INT), StandardCharsets.UTF_8);
34
35
Double doubleValue = 3.14159;
36
byte[] doubleBytes = HBaseTypeUtils.serializeFromObject(doubleValue,
37
HBaseTypeUtils.getTypeIndex(Types.DOUBLE), StandardCharsets.UTF_8);
38
39
// Deserialize HBase byte arrays to Java objects
40
String deserializedString = (String) HBaseTypeUtils.deserializeToObject(stringBytes,
41
HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);
42
43
Integer deserializedInt = (Integer) HBaseTypeUtils.deserializeToObject(intBytes,
44
HBaseTypeUtils.getTypeIndex(Types.INT), StandardCharsets.UTF_8);
45
46
Double deserializedDouble = (Double) HBaseTypeUtils.deserializeToObject(doubleBytes,
47
HBaseTypeUtils.getTypeIndex(Types.DOUBLE), StandardCharsets.UTF_8);
48
```
49
50
### Temporal Data Types
51
52
```java
53
import java.sql.Timestamp;
54
import java.sql.Date;
55
import java.sql.Time;
56
57
// Serialize temporal types
58
Timestamp timestamp = new Timestamp(System.currentTimeMillis());
59
byte[] timestampBytes = HBaseTypeUtils.serializeFromObject(timestamp,
60
HBaseTypeUtils.getTypeIndex(Types.SQL_TIMESTAMP), StandardCharsets.UTF_8);
61
62
Date date = Date.valueOf("2023-12-25");
63
byte[] dateBytes = HBaseTypeUtils.serializeFromObject(date,
64
HBaseTypeUtils.getTypeIndex(Types.SQL_DATE), StandardCharsets.UTF_8);
65
66
Time time = Time.valueOf("14:30:00");
67
byte[] timeBytes = HBaseTypeUtils.serializeFromObject(time,
68
HBaseTypeUtils.getTypeIndex(Types.SQL_TIME), StandardCharsets.UTF_8);
69
70
// Deserialize temporal types
71
Timestamp deserializedTimestamp = (Timestamp) HBaseTypeUtils.deserializeToObject(timestampBytes,
72
HBaseTypeUtils.getTypeIndex(Types.SQL_TIMESTAMP), StandardCharsets.UTF_8);
73
74
Date deserializedDate = (Date) HBaseTypeUtils.deserializeToObject(dateBytes,
75
HBaseTypeUtils.getTypeIndex(Types.SQL_DATE), StandardCharsets.UTF_8);
76
77
Time deserializedTime = (Time) HBaseTypeUtils.deserializeToObject(timeBytes,
78
HBaseTypeUtils.getTypeIndex(Types.SQL_TIME), StandardCharsets.UTF_8);
79
```
80
81
### Numeric Data Types
82
83
```java
84
import java.math.BigDecimal;
85
import java.math.BigInteger;
86
87
// Serialize big numeric types
88
BigDecimal bigDecimal = new BigDecimal("12345.6789");
89
byte[] bigDecimalBytes = HBaseTypeUtils.serializeFromObject(bigDecimal,
90
HBaseTypeUtils.getTypeIndex(Types.BIG_DEC), StandardCharsets.UTF_8);
91
92
BigInteger bigInteger = new BigInteger("123456789012345678901234567890");
93
byte[] bigIntegerBytes = HBaseTypeUtils.serializeFromObject(bigInteger,
94
HBaseTypeUtils.getTypeIndex(Types.BIG_INT), StandardCharsets.UTF_8);
95
96
// Deserialize big numeric types
97
BigDecimal deserializedBigDecimal = (BigDecimal) HBaseTypeUtils.deserializeToObject(bigDecimalBytes,
98
HBaseTypeUtils.getTypeIndex(Types.BIG_DEC), StandardCharsets.UTF_8);
99
100
BigInteger deserializedBigInteger = (BigInteger) HBaseTypeUtils.deserializeToObject(bigIntegerBytes,
101
HBaseTypeUtils.getTypeIndex(Types.BIG_INT), StandardCharsets.UTF_8);
102
```
103
104
### Type Support Validation
105
106
```java
107
// Check if a type is supported
108
boolean isStringSupported = HBaseTypeUtils.isSupportedType(String.class); // true
109
boolean isIntSupported = HBaseTypeUtils.isSupportedType(Integer.class); // true
110
boolean isBooleanSupported = HBaseTypeUtils.isSupportedType(Boolean.class); // true
111
boolean isCustomSupported = HBaseTypeUtils.isSupportedType(MyCustomClass.class); // false
112
113
// Get type index for supported types
114
int stringTypeIndex = HBaseTypeUtils.getTypeIndex(Types.STRING);
115
int intTypeIndex = HBaseTypeUtils.getTypeIndex(Types.INT);
116
int booleanTypeIndex = HBaseTypeUtils.getTypeIndex(Types.BOOLEAN);
117
118
// Validate types before processing
119
public void validateSchema(HBaseTableSchema schema) {
120
String[] families = schema.getFamilyNames();
121
for (String family : families) {
122
TypeInformation<?>[] types = schema.getQualifierTypes(family);
123
for (TypeInformation<?> type : types) {
124
if (!HBaseTypeUtils.isSupportedType(type.getTypeClass())) {
125
throw new IllegalArgumentException(
126
"Unsupported type in family " + family + ": " + type.getTypeClass().getName());
127
}
128
}
129
}
130
}
131
```
132
133
## HBaseConfigurationUtil
134
135
Utility for serializing and deserializing Hadoop Configuration objects for distributed processing.
136
137
```java { .api }
138
class HBaseConfigurationUtil {
139
public static byte[] serializeConfiguration(Configuration conf);
140
public static Configuration deserializeConfiguration(byte[] serializedConfig, Configuration targetConfig);
141
}
142
```
143
144
### Configuration Serialization
145
146
```java
147
import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
148
import org.apache.hadoop.conf.Configuration;
149
150
// Create and configure HBase configuration
151
Configuration conf = new Configuration();
152
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
153
conf.set("hbase.zookeeper.property.clientPort", "2181");
154
conf.set("zookeeper.znode.parent", "/hbase");
155
conf.setInt("hbase.client.scanner.caching", 1000);
156
conf.setLong("hbase.rpc.timeout", 60000);
157
158
// Serialize configuration for distribution
159
byte[] serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
160
161
// Later, deserialize configuration on task managers
162
Configuration targetConf = new Configuration();
163
Configuration deserializedConf = HBaseConfigurationUtil.deserializeConfiguration(
164
serializedConfig, targetConf);
165
166
// Verify configuration was properly deserialized
167
String zkQuorum = deserializedConf.get("hbase.zookeeper.quorum");
168
int scannerCaching = deserializedConf.getInt("hbase.client.scanner.caching", 100);
169
long rpcTimeout = deserializedConf.getLong("hbase.rpc.timeout", 30000);
170
```
171
172
### Configuration Distribution Pattern
173
174
```java
175
// Pattern for distributing HBase configuration in Flink jobs
176
public class DistributedHBaseProcessor extends RichMapFunction<Row, Row> {
177
private byte[] serializedConfig;
178
private transient Configuration hbaseConfig;
179
private transient Connection hbaseConnection;
180
181
public DistributedHBaseProcessor(Configuration config) {
182
// Serialize configuration in constructor (on job manager)
183
this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(config);
184
}
185
186
@Override
187
public void open(Configuration parameters) throws Exception {
188
super.open(parameters);
189
190
// Deserialize configuration on task manager
191
this.hbaseConfig = HBaseConfigurationUtil.deserializeConfiguration(
192
serializedConfig, new Configuration());
193
194
// Create HBase connection
195
this.hbaseConnection = ConnectionFactory.createConnection(hbaseConfig);
196
}
197
198
@Override
199
public Row map(Row value) throws Exception {
200
// Use HBase connection for processing
201
// ...
202
return value;
203
}
204
205
@Override
206
public void close() throws Exception {
207
if (hbaseConnection != null) {
208
hbaseConnection.close();
209
}
210
super.close();
211
}
212
}
213
```
214
215
## HBaseReadWriteHelper
216
217
Helper class for creating HBase operations and converting between HBase Result objects and Flink Row objects.
218
219
```java { .api }
220
class HBaseReadWriteHelper {
221
public HBaseReadWriteHelper(HBaseTableSchema hbaseTableSchema);
222
223
// Operation creation
224
public Get createGet(Object rowKey);
225
public Scan createScan();
226
public Put createPutMutation(Row row);
227
public Delete createDeleteMutation(Row row);
228
229
// Result conversion
230
public Row parseToRow(Result result);
231
public Row parseToRow(Result result, Object rowKey);
232
}
233
```
234
235
### Read Operations
236
237
```java
238
import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
239
import org.apache.hadoop.hbase.client.Get;
240
import org.apache.hadoop.hbase.client.Scan;
241
import org.apache.hadoop.hbase.client.Result;
242
243
// Create helper with schema
244
HBaseTableSchema schema = new HBaseTableSchema();
245
schema.setRowKey("user_id", String.class);
246
schema.addColumn("profile", "name", String.class);
247
schema.addColumn("profile", "age", Integer.class);
248
249
HBaseReadWriteHelper helper = new HBaseReadWriteHelper(schema);
250
251
// Create Get operation for point queries
252
String userId = "user123";
253
Get get = helper.createGet(userId);
254
255
// Execute get and parse result
256
Table table = connection.getTable(TableName.valueOf("users"));
257
Result result = table.get(get);
258
Row userRow = helper.parseToRow(result);
259
260
// Create Scan operation for range queries
261
Scan scan = helper.createScan();
262
ResultScanner scanner = table.getScanner(scan);
263
264
for (Result scanResult : scanner) {
265
Row row = helper.parseToRow(scanResult);
266
// Process row
267
String name = (String) row.getField(1);
268
Integer age = (Integer) row.getField(2);
269
}
270
scanner.close();
271
```
272
273
### Write Operations
274
275
```java
276
import org.apache.hadoop.hbase.client.Put;
277
import org.apache.hadoop.hbase.client.Delete;
278
import org.apache.flink.types.Row;
279
280
// Create Row for insertion
281
Row userRow = Row.of(
282
"user456", // user_id (row key)
283
"Jane Doe", // name
284
28 // age
285
);
286
287
// Create Put mutation
288
Put put = helper.createPutMutation(userRow);
289
290
// Execute put
291
table.put(put);
292
293
// Create Row for deletion (only row key needed)
294
Row deleteRow = Row.of("user456", null, null);
295
296
// Create Delete mutation
297
Delete delete = helper.createDeleteMutation(deleteRow);
298
299
// Execute delete
300
table.delete(delete);
301
```
302
303
### Batch Operations
304
305
```java
306
import java.util.List;
307
import java.util.ArrayList;
308
309
// Batch write operations
310
List<Row> rows = Arrays.asList(
311
Row.of("user001", "Alice", 25),
312
Row.of("user002", "Bob", 30),
313
Row.of("user003", "Charlie", 35)
314
);
315
316
List<Put> puts = new ArrayList<>();
317
for (Row row : rows) {
318
puts.add(helper.createPutMutation(row));
319
}
320
321
// Execute batch put
322
table.put(puts);
323
324
// Batch read operations
325
List<Get> gets = Arrays.asList(
326
helper.createGet("user001"),
327
helper.createGet("user002"),
328
helper.createGet("user003")
329
);
330
331
Result[] results = table.get(gets);
332
for (Result result : results) {
333
if (!result.isEmpty()) {
334
Row row = helper.parseToRow(result);
335
// Process row
336
}
337
}
338
```
339
340
## Advanced Utility Patterns
341
342
### Custom Type Conversion
343
344
```java
345
// Extend HBaseTypeUtils for custom type handling
346
public class ExtendedTypeUtils {
347
348
// Custom serialization for complex types
349
public static byte[] serializeJson(Object jsonObject) {
350
try {
351
ObjectMapper mapper = new ObjectMapper();
352
String json = mapper.writeValueAsString(jsonObject);
353
return HBaseTypeUtils.serializeFromObject(json,
354
HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);
355
} catch (Exception e) {
356
throw new RuntimeException("JSON serialization failed", e);
357
}
358
}
359
360
// Custom deserialization for complex types
361
public static <T> T deserializeJson(byte[] bytes, Class<T> valueType) {
362
try {
363
String json = (String) HBaseTypeUtils.deserializeToObject(bytes,
364
HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);
365
ObjectMapper mapper = new ObjectMapper();
366
return mapper.readValue(json, valueType);
367
} catch (Exception e) {
368
throw new RuntimeException("JSON deserialization failed", e);
369
}
370
}
371
}
372
373
// Usage example
374
MyCustomObject obj = new MyCustomObject("value1", 123);
375
byte[] serialized = ExtendedTypeUtils.serializeJson(obj);
376
MyCustomObject deserialized = ExtendedTypeUtils.deserializeJson(serialized, MyCustomObject.class);
377
```
378
379
### Configuration Templates
380
381
```java
382
// Utility class for common HBase configurations
383
public class HBaseConfigTemplates {
384
385
public static Configuration createDevelopmentConfig() {
386
Configuration conf = new Configuration();
387
conf.set("hbase.zookeeper.quorum", "localhost:2181");
388
conf.set("hbase.zookeeper.property.clientPort", "2181");
389
conf.set("zookeeper.znode.parent", "/hbase");
390
391
// Development-friendly settings
392
conf.setLong("hbase.rpc.timeout", 30000);
393
conf.setLong("hbase.client.operation.timeout", 60000);
394
conf.setInt("hbase.client.retries.number", 3);
395
396
return conf;
397
}
398
399
public static Configuration createProductionConfig(String zkQuorum) {
400
Configuration conf = new Configuration();
401
conf.set("hbase.zookeeper.quorum", zkQuorum);
402
conf.set("hbase.zookeeper.property.clientPort", "2181");
403
conf.set("zookeeper.znode.parent", "/hbase");
404
405
// Production-optimized settings
406
conf.setLong("hbase.rpc.timeout", 120000);
407
conf.setLong("hbase.client.operation.timeout", 300000);
408
conf.setInt("hbase.client.retries.number", 10);
409
conf.setLong("hbase.client.pause", 1000);
410
411
// Performance tuning
412
conf.setInt("hbase.client.ipc.pool.size", 10);
413
conf.setInt("hbase.client.max.total.tasks", 200);
414
conf.setInt("hbase.client.max.perserver.tasks", 20);
415
416
return conf;
417
}
418
419
public static Configuration createHighPerformanceConfig(String zkQuorum) {
420
Configuration conf = createProductionConfig(zkQuorum);
421
422
// High-performance settings
423
conf.setInt("hbase.client.scanner.caching", 2000);
424
conf.setLong("hbase.client.scanner.max.result.size", 8 * 1024 * 1024);
425
conf.setBoolean("hbase.client.scanner.async.prefetch", true);
426
conf.setLong("hbase.client.write.buffer", 16 * 1024 * 1024);
427
428
return conf;
429
}
430
}
431
```
432
433
### Schema Validation Utilities
434
435
```java
436
// Comprehensive schema validation
437
public class SchemaValidationUtils {
438
439
public static void validateTableSchema(HBaseTableSchema schema) {
440
validateRowKey(schema);
441
validateColumnFamilies(schema);
442
validateDataTypes(schema);
443
validateCharset(schema);
444
}
445
446
private static void validateRowKey(HBaseTableSchema schema) {
447
if (schema.getRowKeyIndex() < 0) {
448
throw new IllegalArgumentException("Row key must be defined");
449
}
450
451
Optional<TypeInformation<?>> rowKeyType = schema.getRowKeyTypeInfo();
452
if (!rowKeyType.isPresent()) {
453
throw new IllegalArgumentException("Row key type information missing");
454
}
455
456
if (!HBaseTypeUtils.isSupportedType(rowKeyType.get().getTypeClass())) {
457
throw new IllegalArgumentException("Unsupported row key type: " +
458
rowKeyType.get().getTypeClass().getSimpleName());
459
}
460
}
461
462
private static void validateColumnFamilies(HBaseTableSchema schema) {
463
String[] families = schema.getFamilyNames();
464
if (families == null || families.length == 0) {
465
throw new IllegalArgumentException("At least one column family must be defined");
466
}
467
468
for (String family : families) {
469
if (family == null || family.trim().isEmpty()) {
470
throw new IllegalArgumentException("Column family name cannot be null or empty");
471
}
472
473
byte[][] qualifiers = schema.getQualifierKeys(family);
474
if (qualifiers == null || qualifiers.length == 0) {
475
throw new IllegalArgumentException("Column family '" + family + "' has no qualifiers");
476
}
477
}
478
}
479
480
private static void validateDataTypes(HBaseTableSchema schema) {
481
String[] families = schema.getFamilyNames();
482
for (String family : families) {
483
TypeInformation<?>[] types = schema.getQualifierTypes(family);
484
for (TypeInformation<?> type : types) {
485
if (!HBaseTypeUtils.isSupportedType(type.getTypeClass())) {
486
throw new IllegalArgumentException("Unsupported type in family '" +
487
family + "': " + type.getTypeClass().getSimpleName());
488
}
489
}
490
}
491
}
492
493
private static void validateCharset(HBaseTableSchema schema) {
494
String charset = schema.getStringCharset();
495
try {
496
Charset.forName(charset);
497
} catch (Exception e) {
498
throw new IllegalArgumentException("Invalid charset: " + charset, e);
499
}
500
}
501
}
502
```
503
504
## Performance Monitoring Utilities
505
506
```java
507
// Utility for monitoring HBase operations
508
public class HBaseMonitoringUtils {
509
510
public static class OperationTimer implements AutoCloseable {
511
private final String operationName;
512
private final long startTime;
513
private final Histogram latencyHistogram;
514
private final Counter operationCounter;
515
516
public OperationTimer(String operationName, MetricGroup metricGroup) {
517
this.operationName = operationName;
518
this.startTime = System.currentTimeMillis();
519
this.latencyHistogram = metricGroup.histogram(operationName + "_latency");
520
this.operationCounter = metricGroup.counter(operationName + "_count");
521
operationCounter.inc();
522
}
523
524
@Override
525
public void close() {
526
long duration = System.currentTimeMillis() - startTime;
527
latencyHistogram.update(duration);
528
}
529
}
530
531
// Usage in HBase operations
532
public Row performMonitoredGet(String rowKey, MetricGroup metricGroup) {
533
try (OperationTimer timer = new OperationTimer("hbase_get", metricGroup)) {
534
Get get = helper.createGet(rowKey);
535
Result result = table.get(get);
536
return helper.parseToRow(result);
537
} catch (Exception e) {
538
metricGroup.counter("hbase_get_errors").inc();
539
throw new RuntimeException("HBase get failed", e);
540
}
541
}
542
}
543
```
544
545
## Data Type Reference
546
547
### Complete Type Support Matrix
548
549
| Java Type | HBase Storage | Type Index | Notes |
550
|-----------|---------------|------------|-------|
551
| `String` | `byte[]` | STRING | UTF-8/configurable encoding |
552
| `byte[]` | `byte[]` | PRIMITIVE_ARRAY | Direct binary storage |
553
| `Byte` | `byte[]` | BYTE | Single byte value |
554
| `Short` | `byte[]` | SHORT | 2-byte big-endian |
555
| `Integer` | `byte[]` | INT | 4-byte big-endian |
556
| `Long` | `byte[]` | LONG | 8-byte big-endian |
557
| `Float` | `byte[]` | FLOAT | IEEE 754 format |
558
| `Double` | `byte[]` | DOUBLE | IEEE 754 format |
559
| `Boolean` | `byte[]` | BOOLEAN | Single byte (0/1) |
560
| `java.sql.Date` | `byte[]` | SQL_DATE | Long timestamp |
561
| `java.sql.Time` | `byte[]` | SQL_TIME | Long timestamp |
562
| `java.sql.Timestamp` | `byte[]` | SQL_TIMESTAMP | Long timestamp |
563
| `java.math.BigDecimal` | `byte[]` | BIG_DEC | String representation |
564
| `java.math.BigInteger` | `byte[]` | BIG_INT | String representation |
565
566
### Character Encoding Support
567
568
```java
569
// Supported character encodings
570
Charset utf8 = StandardCharsets.UTF_8; // Default
571
Charset utf16 = StandardCharsets.UTF_16; // Unicode
572
Charset iso88591 = StandardCharsets.ISO_8859_1; // Latin-1
573
Charset ascii = StandardCharsets.US_ASCII; // ASCII
574
575
// Usage in schema
576
HBaseTableSchema schema = new HBaseTableSchema();
577
schema.setCharset("UTF-8"); // Set encoding for string types
578
```