or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md

utilities.mddocs/

0

# Utilities

1

2

The HBase connector provides utility classes for type conversion, configuration management, and HBase operation helpers. These utilities handle the low-level details of data serialization, configuration serialization, and HBase client operations.

3

4

## HBaseTypeUtils

5

6

Utility class for converting between Java objects and HBase byte arrays with support for various data types and character encodings.

7

8

```java { .api }

9

class HBaseTypeUtils {

10

// Core conversion methods

11

public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset);

12

public static byte[] serializeFromObject(Object value, int typeIdx, Charset stringCharset);

13

14

// Type system utilities

15

public static int getTypeIndex(TypeInformation typeInfo);

16

public static boolean isSupportedType(Class<?> clazz);

17

}

18

```

19

20

### Type Serialization and Deserialization

21

22

```java

23

import org.apache.flink.addons.hbase.util.HBaseTypeUtils;

24

import java.nio.charset.StandardCharsets;

25

26

// Serialize Java objects to HBase byte arrays

27

String stringValue = "Hello World";

28

byte[] stringBytes = HBaseTypeUtils.serializeFromObject(stringValue,

29

HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);

30

31

Integer intValue = 42;

32

byte[] intBytes = HBaseTypeUtils.serializeFromObject(intValue,

33

HBaseTypeUtils.getTypeIndex(Types.INT), StandardCharsets.UTF_8);

34

35

Double doubleValue = 3.14159;

36

byte[] doubleBytes = HBaseTypeUtils.serializeFromObject(doubleValue,

37

HBaseTypeUtils.getTypeIndex(Types.DOUBLE), StandardCharsets.UTF_8);

38

39

// Deserialize HBase byte arrays to Java objects

40

String deserializedString = (String) HBaseTypeUtils.deserializeToObject(stringBytes,

41

HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);

42

43

Integer deserializedInt = (Integer) HBaseTypeUtils.deserializeToObject(intBytes,

44

HBaseTypeUtils.getTypeIndex(Types.INT), StandardCharsets.UTF_8);

45

46

Double deserializedDouble = (Double) HBaseTypeUtils.deserializeToObject(doubleBytes,

47

HBaseTypeUtils.getTypeIndex(Types.DOUBLE), StandardCharsets.UTF_8);

48

```

49

50

### Temporal Data Types

51

52

```java

53

import java.sql.Timestamp;

54

import java.sql.Date;

55

import java.sql.Time;

56

57

// Serialize temporal types

58

Timestamp timestamp = new Timestamp(System.currentTimeMillis());

59

byte[] timestampBytes = HBaseTypeUtils.serializeFromObject(timestamp,

60

HBaseTypeUtils.getTypeIndex(Types.SQL_TIMESTAMP), StandardCharsets.UTF_8);

61

62

Date date = Date.valueOf("2023-12-25");

63

byte[] dateBytes = HBaseTypeUtils.serializeFromObject(date,

64

HBaseTypeUtils.getTypeIndex(Types.SQL_DATE), StandardCharsets.UTF_8);

65

66

Time time = Time.valueOf("14:30:00");

67

byte[] timeBytes = HBaseTypeUtils.serializeFromObject(time,

68

HBaseTypeUtils.getTypeIndex(Types.SQL_TIME), StandardCharsets.UTF_8);

69

70

// Deserialize temporal types

71

Timestamp deserializedTimestamp = (Timestamp) HBaseTypeUtils.deserializeToObject(timestampBytes,

72

HBaseTypeUtils.getTypeIndex(Types.SQL_TIMESTAMP), StandardCharsets.UTF_8);

73

74

Date deserializedDate = (Date) HBaseTypeUtils.deserializeToObject(dateBytes,

75

HBaseTypeUtils.getTypeIndex(Types.SQL_DATE), StandardCharsets.UTF_8);

76

77

Time deserializedTime = (Time) HBaseTypeUtils.deserializeToObject(timeBytes,

78

HBaseTypeUtils.getTypeIndex(Types.SQL_TIME), StandardCharsets.UTF_8);

79

```

80

81

### Numeric Data Types

82

83

```java

84

import java.math.BigDecimal;

85

import java.math.BigInteger;

86

87

// Serialize big numeric types

88

BigDecimal bigDecimal = new BigDecimal("12345.6789");

89

byte[] bigDecimalBytes = HBaseTypeUtils.serializeFromObject(bigDecimal,

90

HBaseTypeUtils.getTypeIndex(Types.BIG_DEC), StandardCharsets.UTF_8);

91

92

BigInteger bigInteger = new BigInteger("123456789012345678901234567890");

93

byte[] bigIntegerBytes = HBaseTypeUtils.serializeFromObject(bigInteger,

94

HBaseTypeUtils.getTypeIndex(Types.BIG_INT), StandardCharsets.UTF_8);

95

96

// Deserialize big numeric types

97

BigDecimal deserializedBigDecimal = (BigDecimal) HBaseTypeUtils.deserializeToObject(bigDecimalBytes,

98

HBaseTypeUtils.getTypeIndex(Types.BIG_DEC), StandardCharsets.UTF_8);

99

100

BigInteger deserializedBigInteger = (BigInteger) HBaseTypeUtils.deserializeToObject(bigIntegerBytes,

101

HBaseTypeUtils.getTypeIndex(Types.BIG_INT), StandardCharsets.UTF_8);

102

```

103

104

### Type Support Validation

105

106

```java

107

// Check if a type is supported

108

boolean isStringSupported = HBaseTypeUtils.isSupportedType(String.class); // true

109

boolean isIntSupported = HBaseTypeUtils.isSupportedType(Integer.class); // true

110

boolean isBooleanSupported = HBaseTypeUtils.isSupportedType(Boolean.class); // true

111

boolean isCustomSupported = HBaseTypeUtils.isSupportedType(MyCustomClass.class); // false

112

113

// Get type index for supported types

114

int stringTypeIndex = HBaseTypeUtils.getTypeIndex(Types.STRING);

115

int intTypeIndex = HBaseTypeUtils.getTypeIndex(Types.INT);

116

int booleanTypeIndex = HBaseTypeUtils.getTypeIndex(Types.BOOLEAN);

117

118

// Validate types before processing

119

public void validateSchema(HBaseTableSchema schema) {

120

String[] families = schema.getFamilyNames();

121

for (String family : families) {

122

TypeInformation<?>[] types = schema.getQualifierTypes(family);

123

for (TypeInformation<?> type : types) {

124

if (!HBaseTypeUtils.isSupportedType(type.getTypeClass())) {

125

throw new IllegalArgumentException(

126

"Unsupported type in family " + family + ": " + type.getTypeClass().getName());

127

}

128

}

129

}

130

}

131

```

132

133

## HBaseConfigurationUtil

134

135

Utility for serializing and deserializing Hadoop Configuration objects for distributed processing.

136

137

```java { .api }

138

class HBaseConfigurationUtil {

139

public static byte[] serializeConfiguration(Configuration conf);

140

public static Configuration deserializeConfiguration(byte[] serializedConfig, Configuration targetConfig);

141

}

142

```

143

144

### Configuration Serialization

145

146

```java

147

import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;

148

import org.apache.hadoop.conf.Configuration;

149

150

// Create and configure HBase configuration

151

Configuration conf = new Configuration();

152

conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");

153

conf.set("hbase.zookeeper.property.clientPort", "2181");

154

conf.set("zookeeper.znode.parent", "/hbase");

155

conf.setInt("hbase.client.scanner.caching", 1000);

156

conf.setLong("hbase.rpc.timeout", 60000);

157

158

// Serialize configuration for distribution

159

byte[] serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);

160

161

// Later, deserialize configuration on task managers

162

Configuration targetConf = new Configuration();

163

Configuration deserializedConf = HBaseConfigurationUtil.deserializeConfiguration(

164

serializedConfig, targetConf);

165

166

// Verify configuration was properly deserialized

167

String zkQuorum = deserializedConf.get("hbase.zookeeper.quorum");

168

int scannerCaching = deserializedConf.getInt("hbase.client.scanner.caching", 100);

169

long rpcTimeout = deserializedConf.getLong("hbase.rpc.timeout", 30000);

170

```

171

172

### Configuration Distribution Pattern

173

174

```java

175

// Pattern for distributing HBase configuration in Flink jobs

176

public class DistributedHBaseProcessor extends RichMapFunction<Row, Row> {

177

private byte[] serializedConfig;

178

private transient Configuration hbaseConfig;

179

private transient Connection hbaseConnection;

180

181

public DistributedHBaseProcessor(Configuration config) {

182

// Serialize configuration in constructor (on job manager)

183

this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(config);

184

}

185

186

@Override

187

public void open(Configuration parameters) throws Exception {

188

super.open(parameters);

189

190

// Deserialize configuration on task manager

191

this.hbaseConfig = HBaseConfigurationUtil.deserializeConfiguration(

192

serializedConfig, new Configuration());

193

194

// Create HBase connection

195

this.hbaseConnection = ConnectionFactory.createConnection(hbaseConfig);

196

}

197

198

@Override

199

public Row map(Row value) throws Exception {

200

// Use HBase connection for processing

201

// ...

202

return value;

203

}

204

205

@Override

206

public void close() throws Exception {

207

if (hbaseConnection != null) {

208

hbaseConnection.close();

209

}

210

super.close();

211

}

212

}

213

```

214

215

## HBaseReadWriteHelper

216

217

Helper class for creating HBase operations and converting between HBase Result objects and Flink Row objects.

218

219

```java { .api }

220

class HBaseReadWriteHelper {

221

public HBaseReadWriteHelper(HBaseTableSchema hbaseTableSchema);

222

223

// Operation creation

224

public Get createGet(Object rowKey);

225

public Scan createScan();

226

public Put createPutMutation(Row row);

227

public Delete createDeleteMutation(Row row);

228

229

// Result conversion

230

public Row parseToRow(Result result);

231

public Row parseToRow(Result result, Object rowKey);

232

}

233

```

234

235

### Read Operations

236

237

```java

238

import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;

239

import org.apache.hadoop.hbase.client.Get;

240

import org.apache.hadoop.hbase.client.Scan;

241

import org.apache.hadoop.hbase.client.Result;

242

243

// Create helper with schema

244

HBaseTableSchema schema = new HBaseTableSchema();

245

schema.setRowKey("user_id", String.class);

246

schema.addColumn("profile", "name", String.class);

247

schema.addColumn("profile", "age", Integer.class);

248

249

HBaseReadWriteHelper helper = new HBaseReadWriteHelper(schema);

250

251

// Create Get operation for point queries

252

String userId = "user123";

253

Get get = helper.createGet(userId);

254

255

// Execute get and parse result

256

Table table = connection.getTable(TableName.valueOf("users"));

257

Result result = table.get(get);

258

Row userRow = helper.parseToRow(result);

259

260

// Create Scan operation for range queries

261

Scan scan = helper.createScan();

262

ResultScanner scanner = table.getScanner(scan);

263

264

for (Result scanResult : scanner) {

265

Row row = helper.parseToRow(scanResult);

266

// Process row

267

String name = (String) row.getField(1);

268

Integer age = (Integer) row.getField(2);

269

}

270

scanner.close();

271

```

272

273

### Write Operations

274

275

```java

276

import org.apache.hadoop.hbase.client.Put;

277

import org.apache.hadoop.hbase.client.Delete;

278

import org.apache.flink.types.Row;

279

280

// Create Row for insertion

281

Row userRow = Row.of(

282

"user456", // user_id (row key)

283

"Jane Doe", // name

284

28 // age

285

);

286

287

// Create Put mutation

288

Put put = helper.createPutMutation(userRow);

289

290

// Execute put

291

table.put(put);

292

293

// Create Row for deletion (only row key needed)

294

Row deleteRow = Row.of("user456", null, null);

295

296

// Create Delete mutation

297

Delete delete = helper.createDeleteMutation(deleteRow);

298

299

// Execute delete

300

table.delete(delete);

301

```

302

303

### Batch Operations

304

305

```java

306

import java.util.List;

307

import java.util.ArrayList;

308

309

// Batch write operations

310

List<Row> rows = Arrays.asList(

311

Row.of("user001", "Alice", 25),

312

Row.of("user002", "Bob", 30),

313

Row.of("user003", "Charlie", 35)

314

);

315

316

List<Put> puts = new ArrayList<>();

317

for (Row row : rows) {

318

puts.add(helper.createPutMutation(row));

319

}

320

321

// Execute batch put

322

table.put(puts);

323

324

// Batch read operations

325

List<Get> gets = Arrays.asList(

326

helper.createGet("user001"),

327

helper.createGet("user002"),

328

helper.createGet("user003")

329

);

330

331

Result[] results = table.get(gets);

332

for (Result result : results) {

333

if (!result.isEmpty()) {

334

Row row = helper.parseToRow(result);

335

// Process row

336

}

337

}

338

```

339

340

## Advanced Utility Patterns

341

342

### Custom Type Conversion

343

344

```java

345

// Extend HBaseTypeUtils for custom type handling

346

public class ExtendedTypeUtils {

347

348

// Custom serialization for complex types

349

public static byte[] serializeJson(Object jsonObject) {

350

try {

351

ObjectMapper mapper = new ObjectMapper();

352

String json = mapper.writeValueAsString(jsonObject);

353

return HBaseTypeUtils.serializeFromObject(json,

354

HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);

355

} catch (Exception e) {

356

throw new RuntimeException("JSON serialization failed", e);

357

}

358

}

359

360

// Custom deserialization for complex types

361

public static <T> T deserializeJson(byte[] bytes, Class<T> valueType) {

362

try {

363

String json = (String) HBaseTypeUtils.deserializeToObject(bytes,

364

HBaseTypeUtils.getTypeIndex(Types.STRING), StandardCharsets.UTF_8);

365

ObjectMapper mapper = new ObjectMapper();

366

return mapper.readValue(json, valueType);

367

} catch (Exception e) {

368

throw new RuntimeException("JSON deserialization failed", e);

369

}

370

}

371

}

372

373

// Usage example

374

MyCustomObject obj = new MyCustomObject("value1", 123);

375

byte[] serialized = ExtendedTypeUtils.serializeJson(obj);

376

MyCustomObject deserialized = ExtendedTypeUtils.deserializeJson(serialized, MyCustomObject.class);

377

```

378

379

### Configuration Templates

380

381

```java

382

// Utility class for common HBase configurations

383

public class HBaseConfigTemplates {

384

385

public static Configuration createDevelopmentConfig() {

386

Configuration conf = new Configuration();

387

conf.set("hbase.zookeeper.quorum", "localhost:2181");

388

conf.set("hbase.zookeeper.property.clientPort", "2181");

389

conf.set("zookeeper.znode.parent", "/hbase");

390

391

// Development-friendly settings

392

conf.setLong("hbase.rpc.timeout", 30000);

393

conf.setLong("hbase.client.operation.timeout", 60000);

394

conf.setInt("hbase.client.retries.number", 3);

395

396

return conf;

397

}

398

399

public static Configuration createProductionConfig(String zkQuorum) {

400

Configuration conf = new Configuration();

401

conf.set("hbase.zookeeper.quorum", zkQuorum);

402

conf.set("hbase.zookeeper.property.clientPort", "2181");

403

conf.set("zookeeper.znode.parent", "/hbase");

404

405

// Production-optimized settings

406

conf.setLong("hbase.rpc.timeout", 120000);

407

conf.setLong("hbase.client.operation.timeout", 300000);

408

conf.setInt("hbase.client.retries.number", 10);

409

conf.setLong("hbase.client.pause", 1000);

410

411

// Performance tuning

412

conf.setInt("hbase.client.ipc.pool.size", 10);

413

conf.setInt("hbase.client.max.total.tasks", 200);

414

conf.setInt("hbase.client.max.perserver.tasks", 20);

415

416

return conf;

417

}

418

419

public static Configuration createHighPerformanceConfig(String zkQuorum) {

420

Configuration conf = createProductionConfig(zkQuorum);

421

422

// High-performance settings

423

conf.setInt("hbase.client.scanner.caching", 2000);

424

conf.setLong("hbase.client.scanner.max.result.size", 8 * 1024 * 1024);

425

conf.setBoolean("hbase.client.scanner.async.prefetch", true);

426

conf.setLong("hbase.client.write.buffer", 16 * 1024 * 1024);

427

428

return conf;

429

}

430

}

431

```

432

433

### Schema Validation Utilities

434

435

```java

436

// Comprehensive schema validation

437

public class SchemaValidationUtils {

438

439

public static void validateTableSchema(HBaseTableSchema schema) {

440

validateRowKey(schema);

441

validateColumnFamilies(schema);

442

validateDataTypes(schema);

443

validateCharset(schema);

444

}

445

446

private static void validateRowKey(HBaseTableSchema schema) {

447

if (schema.getRowKeyIndex() < 0) {

448

throw new IllegalArgumentException("Row key must be defined");

449

}

450

451

Optional<TypeInformation<?>> rowKeyType = schema.getRowKeyTypeInfo();

452

if (!rowKeyType.isPresent()) {

453

throw new IllegalArgumentException("Row key type information missing");

454

}

455

456

if (!HBaseTypeUtils.isSupportedType(rowKeyType.get().getTypeClass())) {

457

throw new IllegalArgumentException("Unsupported row key type: " +

458

rowKeyType.get().getTypeClass().getSimpleName());

459

}

460

}

461

462

private static void validateColumnFamilies(HBaseTableSchema schema) {

463

String[] families = schema.getFamilyNames();

464

if (families == null || families.length == 0) {

465

throw new IllegalArgumentException("At least one column family must be defined");

466

}

467

468

for (String family : families) {

469

if (family == null || family.trim().isEmpty()) {

470

throw new IllegalArgumentException("Column family name cannot be null or empty");

471

}

472

473

byte[][] qualifiers = schema.getQualifierKeys(family);

474

if (qualifiers == null || qualifiers.length == 0) {

475

throw new IllegalArgumentException("Column family '" + family + "' has no qualifiers");

476

}

477

}

478

}

479

480

private static void validateDataTypes(HBaseTableSchema schema) {

481

String[] families = schema.getFamilyNames();

482

for (String family : families) {

483

TypeInformation<?>[] types = schema.getQualifierTypes(family);

484

for (TypeInformation<?> type : types) {

485

if (!HBaseTypeUtils.isSupportedType(type.getTypeClass())) {

486

throw new IllegalArgumentException("Unsupported type in family '" +

487

family + "': " + type.getTypeClass().getSimpleName());

488

}

489

}

490

}

491

}

492

493

private static void validateCharset(HBaseTableSchema schema) {

494

String charset = schema.getStringCharset();

495

try {

496

Charset.forName(charset);

497

} catch (Exception e) {

498

throw new IllegalArgumentException("Invalid charset: " + charset, e);

499

}

500

}

501

}

502

```

503

504

## Performance Monitoring Utilities

505

506

```java

507

// Utility for monitoring HBase operations

508

public class HBaseMonitoringUtils {

509

510

public static class OperationTimer implements AutoCloseable {

511

private final String operationName;

512

private final long startTime;

513

private final Histogram latencyHistogram;

514

private final Counter operationCounter;

515

516

public OperationTimer(String operationName, MetricGroup metricGroup) {

517

this.operationName = operationName;

518

this.startTime = System.currentTimeMillis();

519

this.latencyHistogram = metricGroup.histogram(operationName + "_latency");

520

this.operationCounter = metricGroup.counter(operationName + "_count");

521

operationCounter.inc();

522

}

523

524

@Override

525

public void close() {

526

long duration = System.currentTimeMillis() - startTime;

527

latencyHistogram.update(duration);

528

}

529

}

530

531

// Usage in HBase operations

532

public Row performMonitoredGet(String rowKey, MetricGroup metricGroup) {

533

try (OperationTimer timer = new OperationTimer("hbase_get", metricGroup)) {

534

Get get = helper.createGet(rowKey);

535

Result result = table.get(get);

536

return helper.parseToRow(result);

537

} catch (Exception e) {

538

metricGroup.counter("hbase_get_errors").inc();

539

throw new RuntimeException("HBase get failed", e);

540

}

541

}

542

}

543

```

544

545

## Data Type Reference

546

547

### Complete Type Support Matrix

548

549

| Java Type | HBase Storage | Type Index | Notes |

550

|-----------|---------------|------------|-------|

551

| `String` | `byte[]` | STRING | UTF-8/configurable encoding |

552

| `byte[]` | `byte[]` | PRIMITIVE_ARRAY | Direct binary storage |

553

| `Byte` | `byte[]` | BYTE | Single byte value |

554

| `Short` | `byte[]` | SHORT | 2-byte big-endian |

555

| `Integer` | `byte[]` | INT | 4-byte big-endian |

556

| `Long` | `byte[]` | LONG | 8-byte big-endian |

557

| `Float` | `byte[]` | FLOAT | IEEE 754 format |

558

| `Double` | `byte[]` | DOUBLE | IEEE 754 format |

559

| `Boolean` | `byte[]` | BOOLEAN | Single byte (0/1) |

560

| `java.sql.Date` | `byte[]` | SQL_DATE | Long timestamp |

561

| `java.sql.Time` | `byte[]` | SQL_TIME | Long timestamp |

562

| `java.sql.Timestamp` | `byte[]` | SQL_TIMESTAMP | Long timestamp |

563

| `java.math.BigDecimal` | `byte[]` | BIG_DEC | String representation |

564

| `java.math.BigInteger` | `byte[]` | BIG_INT | String representation |

565

566

### Character Encoding Support

567

568

```java

569

// Supported character encodings

570

Charset utf8 = StandardCharsets.UTF_8; // Default

571

Charset utf16 = StandardCharsets.UTF_16; // Unicode

572

Charset iso88591 = StandardCharsets.ISO_8859_1; // Latin-1

573

Charset ascii = StandardCharsets.US_ASCII; // ASCII

574

575

// Usage in schema

576

HBaseTableSchema schema = new HBaseTableSchema();

577

schema.setCharset("UTF-8"); // Set encoding for string types

578

```