or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md

schema-config.mddocs/

0

# Schema and Configuration

1

2

The HBase connector provides comprehensive schema definition and configuration classes to map between Flink data types and HBase column families and qualifiers. This includes connection configuration, write performance tuning, and character encoding options.

3

4

## HBaseTableSchema

5

6

The central class for defining the mapping between Flink table schema and HBase table structure.

7

8

```java { .api }

9

class HBaseTableSchema {

10

public HBaseTableSchema();

11

12

// Schema definition

13

public void addColumn(String family, String qualifier, Class<?> clazz);

14

public void setRowKey(String rowKeyName, Class<?> clazz);

15

public void setCharset(String charset);

16

17

// Schema introspection

18

public String[] getFamilyNames();

19

public byte[][] getFamilyKeys();

20

public byte[][] getQualifierKeys(String family);

21

public TypeInformation<?>[] getQualifierTypes(String family);

22

public String getStringCharset();

23

public int getRowKeyIndex();

24

public Optional<TypeInformation<?>> getRowKeyTypeInfo();

25

}

26

```

27

28

### Basic Schema Definition

29

30

```java

31

import org.apache.flink.addons.hbase.HBaseTableSchema;

32

33

// Create schema for user profile table

34

HBaseTableSchema schema = new HBaseTableSchema();

35

36

// Define row key

37

schema.setRowKey("user_id", String.class);

38

39

// Define column families and qualifiers

40

schema.addColumn("personal", "first_name", String.class);

41

schema.addColumn("personal", "last_name", String.class);

42

schema.addColumn("personal", "birth_date", java.sql.Date.class);

43

schema.addColumn("personal", "age", Integer.class);

44

45

schema.addColumn("contact", "email", String.class);

46

schema.addColumn("contact", "phone", String.class);

47

schema.addColumn("contact", "address", String.class);

48

49

schema.addColumn("activity", "last_login", java.sql.Timestamp.class);

50

schema.addColumn("activity", "login_count", Long.class);

51

schema.addColumn("activity", "is_active", Boolean.class);

52

53

schema.addColumn("preferences", "settings", String.class); // JSON as string

54

schema.addColumn("data", "profile_picture", byte[].class); // Binary data

55

```

56

57

### Character Encoding Configuration

58

59

```java

60

// Set character encoding for string serialization

61

HBaseTableSchema schema = new HBaseTableSchema();

62

schema.setCharset("UTF-8"); // Default encoding

63

// schema.setCharset("ISO-8859-1"); // Alternative encoding

64

65

// Add string columns that will use the specified charset

66

schema.setRowKey("user_id", String.class);

67

schema.addColumn("profile", "name", String.class);

68

schema.addColumn("profile", "description", String.class);

69

70

// Binary data is not affected by charset setting

71

schema.addColumn("data", "binary_content", byte[].class);

72

```

73

74

### Schema Introspection

75

76

```java

77

// Examine schema structure

78

HBaseTableSchema schema = // ... configured schema

79

80

// Get all column families

81

String[] families = schema.getFamilyNames();

82

System.out.println("Column families: " + Arrays.toString(families));

83

84

// Get qualifiers for a specific family

85

byte[][] qualifiers = schema.getQualifierKeys("personal");

86

for (byte[] qualifier : qualifiers) {

87

System.out.println("Qualifier: " + Bytes.toString(qualifier));

88

}

89

90

// Get types for a family's qualifiers

91

TypeInformation<?>[] types = schema.getQualifierTypes("personal");

92

for (int i = 0; i < types.length; i++) {

93

System.out.println("Type: " + types[i].getTypeClass().getSimpleName());

94

}

95

96

// Check row key configuration

97

int rowKeyIndex = schema.getRowKeyIndex();

98

Optional<TypeInformation<?>> rowKeyType = schema.getRowKeyTypeInfo();

99

if (rowKeyType.isPresent()) {

100

System.out.println("Row key type: " + rowKeyType.get().getTypeClass().getSimpleName());

101

}

102

```

103

104

## HBaseOptions

105

106

Connection and basic configuration options for HBase connectivity.

107

108

```java { .api }

109

class HBaseOptions {

110

public static Builder builder();

111

112

static class Builder {

113

public Builder setTableName(String tableName); // Required

114

public Builder setZkQuorum(String zkQuorum); // Required

115

public Builder setZkNodeParent(String zkNodeParent); // Optional, default: "/hbase"

116

public HBaseOptions build();

117

}

118

}

119

```

120

121

### Basic Connection Configuration

122

123

```java

124

import org.apache.flink.addons.hbase.HBaseOptions;

125

126

// Simple configuration

127

HBaseOptions basicOptions = HBaseOptions.builder()

128

.setTableName("user_profiles")

129

.setZkQuorum("localhost:2181")

130

.build();

131

132

// Production configuration with multiple ZooKeeper nodes

133

HBaseOptions productionOptions = HBaseOptions.builder()

134

.setTableName("user_events")

135

.setZkQuorum("zk1.prod.com:2181,zk2.prod.com:2181,zk3.prod.com:2181")

136

.setZkNodeParent("/hbase-prod") // Custom ZNode parent

137

.build();

138

139

// Configuration with custom ZooKeeper port

140

HBaseOptions customPortOptions = HBaseOptions.builder()

141

.setTableName("analytics_data")

142

.setZkQuorum("zk-cluster:2182") // Non-standard port

143

.setZkNodeParent("/hbase")

144

.build();

145

```

146

147

## HBaseWriteOptions

148

149

Performance tuning options for write operations with buffering configuration.

150

151

```java { .api }

152

class HBaseWriteOptions {

153

public static Builder builder();

154

155

static class Builder {

156

public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes);

157

public Builder setBufferFlushMaxRows(long bufferFlushMaxRows);

158

public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis);

159

public HBaseWriteOptions build();

160

}

161

}

162

```

163

164

### Write Performance Configuration

165

166

```java

167

import org.apache.flink.addons.hbase.HBaseWriteOptions;

168

169

// High-throughput configuration

170

HBaseWriteOptions highThroughputOptions = HBaseWriteOptions.builder()

171

.setBufferFlushMaxSizeInBytes(16 * 1024 * 1024) // 16MB buffer

172

.setBufferFlushMaxRows(10000) // 10,000 mutations per batch

173

.setBufferFlushIntervalMillis(30000) // 30 second flush interval

174

.build();

175

176

// Low-latency configuration

177

HBaseWriteOptions lowLatencyOptions = HBaseWriteOptions.builder()

178

.setBufferFlushMaxSizeInBytes(512 * 1024) // 512KB buffer

179

.setBufferFlushMaxRows(100) // 100 mutations per batch

180

.setBufferFlushIntervalMillis(1000) // 1 second flush interval

181

.build();

182

183

// Balanced configuration

184

HBaseWriteOptions balancedOptions = HBaseWriteOptions.builder()

185

.setBufferFlushMaxSizeInBytes(4 * 1024 * 1024) // 4MB buffer

186

.setBufferFlushMaxRows(2000) // 2,000 mutations per batch

187

.setBufferFlushIntervalMillis(5000) // 5 second flush interval

188

.build();

189

190

// Memory-constrained configuration

191

HBaseWriteOptions memoryConstrainedOptions = HBaseWriteOptions.builder()

192

.setBufferFlushMaxSizeInBytes(256 * 1024) // 256KB buffer

193

.setBufferFlushMaxRows(50) // 50 mutations per batch

194

.setBufferFlushIntervalMillis(2000) // 2 second flush interval

195

.build();

196

```

197

198

## Hadoop Configuration

199

200

Advanced HBase configuration using Hadoop Configuration objects for fine-grained control.

201

202

### Basic Hadoop Configuration

203

204

```java

205

import org.apache.hadoop.conf.Configuration;

206

207

// Create and configure Hadoop Configuration

208

Configuration conf = new Configuration();

209

210

// Basic connection settings

211

conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");

212

conf.set("hbase.zookeeper.property.clientPort", "2181");

213

conf.set("zookeeper.znode.parent", "/hbase");

214

215

// Cluster configuration

216

conf.set("hbase.cluster.distributed", "true");

217

conf.set("hbase.master", "hbase-master:60000");

218

```

219

220

### Performance Tuning Configuration

221

222

```java

223

// Client-side performance tuning

224

Configuration perfConf = new Configuration();

225

226

// Connection settings

227

perfConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");

228

229

// Write performance

230

perfConf.setLong("hbase.client.write.buffer", 8 * 1024 * 1024); // 8MB write buffer

231

perfConf.setInt("hbase.client.max.total.tasks", 200); // Max concurrent tasks

232

perfConf.setInt("hbase.client.max.perserver.tasks", 20); // Max tasks per server

233

perfConf.setInt("hbase.client.max.perregion.tasks", 5); // Max tasks per region

234

235

// Read performance

236

perfConf.setInt("hbase.client.scanner.caching", 1000); // Scanner caching

237

perfConf.setLong("hbase.client.scanner.max.result.size", 4 * 1024 * 1024); // 4MB max result

238

perfConf.setBoolean("hbase.client.scanner.async.prefetch", true); // Async prefetch

239

240

// Connection pool

241

perfConf.setInt("hbase.client.ipc.pool.size", 10); // Connection pool size

242

perfConf.setInt("hbase.client.ipc.pool.type", 1); // RoundRobin pool

243

244

// Timeout settings

245

perfConf.setLong("hbase.rpc.timeout", 120000); // 2 minute RPC timeout

246

perfConf.setLong("hbase.client.operation.timeout", 300000); // 5 minute operation timeout

247

perfConf.setLong("hbase.client.scanner.timeout.period", 600000); // 10 minute scanner timeout

248

249

// Retry settings

250

perfConf.setInt("hbase.client.retries.number", 10); // Max retries

251

perfConf.setLong("hbase.client.pause", 200); // Retry pause (ms)

252

perfConf.setLong("hbase.client.pause.cqtbe", 1000); // Quota exceeded pause

253

```

254

255

### Security Configuration

256

257

```java

258

// Kerberos authentication

259

Configuration secureConf = new Configuration();

260

secureConf.set("hbase.zookeeper.quorum", "secure-zk1:2181,secure-zk2:2181");

261

262

// Enable security

263

secureConf.set("hbase.security.authentication", "kerberos");

264

secureConf.set("hbase.security.authorization", "true");

265

secureConf.set("hbase.master.kerberos.principal", "hbase/_HOST@REALM.COM");

266

secureConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@REALM.COM");

267

268

// Client principal and keytab

269

secureConf.set("hbase.client.kerberos.principal", "flink-user@REALM.COM");

270

secureConf.set("hbase.client.keytab.file", "/path/to/flink-user.keytab");

271

272

// HDFS security (if applicable)

273

secureConf.set("dfs.nameservices", "hdfs-cluster");

274

secureConf.set("hadoop.security.authentication", "kerberos");

275

```

276

277

## Data Type Mapping

278

279

Comprehensive mapping between Java types and HBase storage formats.

280

281

### Supported Data Types

282

283

```java

284

HBaseTableSchema schema = new HBaseTableSchema();

285

286

// Primitive types

287

schema.addColumn("primitives", "byte_val", Byte.class);

288

schema.addColumn("primitives", "short_val", Short.class);

289

schema.addColumn("primitives", "int_val", Integer.class);

290

schema.addColumn("primitives", "long_val", Long.class);

291

schema.addColumn("primitives", "float_val", Float.class);

292

schema.addColumn("primitives", "double_val", Double.class);

293

schema.addColumn("primitives", "boolean_val", Boolean.class);

294

295

// String and binary types

296

schema.addColumn("text", "string_val", String.class);

297

schema.addColumn("binary", "byte_array", byte[].class);

298

299

// Temporal types

300

schema.addColumn("time", "timestamp_val", java.sql.Timestamp.class);

301

schema.addColumn("time", "date_val", java.sql.Date.class);

302

schema.addColumn("time", "time_val", java.sql.Time.class);

303

304

// Numeric types

305

schema.addColumn("numbers", "decimal_val", java.math.BigDecimal.class);

306

schema.addColumn("numbers", "bigint_val", java.math.BigInteger.class);

307

```

308

309

### Type Conversion Examples

310

311

```java

312

// Example data insertion with proper types

313

Row userRow = Row.of(

314

"user123", // String row key

315

"John", // String (first_name)

316

"Doe", // String (last_name)

317

Date.valueOf("1990-05-15"), // Date (birth_date)

318

33, // Integer (age)

319

"john.doe@email.com", // String (email)

320

"+1-555-123-4567", // String (phone)

321

true, // Boolean (is_active)

322

new Timestamp(System.currentTimeMillis()), // Timestamp (last_login)

323

1247L, // Long (login_count)

324

new BigDecimal("99.99"), // BigDecimal (balance)

325

"profile_data".getBytes("UTF-8") // byte[] (binary_data)

326

);

327

```

328

329

## Configuration Patterns

330

331

### Environment-Specific Configuration

332

333

```java

334

// Development environment

335

public static HBaseOptions createDevConfig(String tableName) {

336

return HBaseOptions.builder()

337

.setTableName(tableName)

338

.setZkQuorum("localhost:2181")

339

.setZkNodeParent("/hbase")

340

.build();

341

}

342

343

// Staging environment

344

public static HBaseOptions createStagingConfig(String tableName) {

345

return HBaseOptions.builder()

346

.setTableName("staging_" + tableName)

347

.setZkQuorum("staging-zk1:2181,staging-zk2:2181")

348

.setZkNodeParent("/hbase-staging")

349

.build();

350

}

351

352

// Production environment

353

public static HBaseOptions createProdConfig(String tableName) {

354

return HBaseOptions.builder()

355

.setTableName("prod_" + tableName)

356

.setZkQuorum("prod-zk1:2181,prod-zk2:2181,prod-zk3:2181")

357

.setZkNodeParent("/hbase-prod")

358

.build();

359

}

360

```

361

362

### Workload-Specific Write Options

363

364

```java

365

// Real-time analytics workload

366

public static HBaseWriteOptions createRealTimeWriteOptions() {

367

return HBaseWriteOptions.builder()

368

.setBufferFlushMaxSizeInBytes(1 * 1024 * 1024) // 1MB - small buffer

369

.setBufferFlushMaxRows(500) // 500 mutations

370

.setBufferFlushIntervalMillis(2000) // 2 second interval

371

.build();

372

}

373

374

// Batch processing workload

375

public static HBaseWriteOptions createBatchWriteOptions() {

376

return HBaseWriteOptions.builder()

377

.setBufferFlushMaxSizeInBytes(32 * 1024 * 1024) // 32MB - large buffer

378

.setBufferFlushMaxRows(20000) // 20,000 mutations

379

.setBufferFlushIntervalMillis(60000) // 60 second interval

380

.build();

381

}

382

383

// Mixed workload

384

public static HBaseWriteOptions createMixedWriteOptions() {

385

return HBaseWriteOptions.builder()

386

.setBufferFlushMaxSizeInBytes(8 * 1024 * 1024) // 8MB buffer

387

.setBufferFlushMaxRows(4000) // 4,000 mutations

388

.setBufferFlushIntervalMillis(10000) // 10 second interval

389

.build();

390

}

391

```

392

393

## Schema Evolution and Versioning

394

395

### Adding New Columns

396

397

```java

398

// Original schema

399

HBaseTableSchema v1Schema = new HBaseTableSchema();

400

v1Schema.setRowKey("user_id", String.class);

401

v1Schema.addColumn("profile", "name", String.class);

402

v1Schema.addColumn("profile", "email", String.class);

403

404

// Evolved schema - adding new columns

405

HBaseTableSchema v2Schema = new HBaseTableSchema();

406

v2Schema.setRowKey("user_id", String.class);

407

v2Schema.addColumn("profile", "name", String.class);

408

v2Schema.addColumn("profile", "email", String.class);

409

// New columns

410

v2Schema.addColumn("profile", "phone", String.class); // New optional field

411

v2Schema.addColumn("preferences", "theme", String.class); // New column family

412

v2Schema.addColumn("activity", "last_seen", java.sql.Timestamp.class); // New activity tracking

413

```

414

415

### Handling Schema Changes

416

417

```java

418

// Schema-aware processing that handles missing columns gracefully

419

public class SchemaAwareProcessor {

420

421

public void processRow(Row row, HBaseTableSchema schema) {

422

// Always present fields

423

String userId = (String) row.getField(schema.getRowKeyIndex());

424

425

// Handle potentially missing fields

426

String name = getFieldSafely(row, schema, "profile", "name", String.class);

427

String phone = getFieldSafely(row, schema, "profile", "phone", String.class);

428

429

// Use defaults for missing fields

430

if (phone == null) {

431

phone = "N/A";

432

}

433

434

// Process with null-safe logic

435

processUserData(userId, name, phone);

436

}

437

438

private <T> T getFieldSafely(Row row, HBaseTableSchema schema,

439

String family, String qualifier, Class<T> type) {

440

try {

441

// Implementation would need schema introspection to find field index

442

// This is a conceptual example

443

return type.cast(row.getField(findFieldIndex(schema, family, qualifier)));

444

} catch (Exception e) {

445

return null; // Field not present in this schema version

446

}

447

}

448

}

449

```

450

451

## Configuration Validation

452

453

### Schema Validation

454

455

```java

456

public class SchemaValidator {

457

458

public static void validateSchema(HBaseTableSchema schema) {

459

// Check row key is defined

460

if (schema.getRowKeyIndex() < 0) {

461

throw new IllegalArgumentException("Row key must be defined");

462

}

463

464

// Check at least one column family exists

465

String[] families = schema.getFamilyNames();

466

if (families == null || families.length == 0) {

467

throw new IllegalArgumentException("At least one column family must be defined");

468

}

469

470

// Validate supported types

471

for (String family : families) {

472

TypeInformation<?>[] types = schema.getQualifierTypes(family);

473

for (TypeInformation<?> type : types) {

474

if (!HBaseTypeUtils.isSupportedType(type.getTypeClass())) {

475

throw new IllegalArgumentException(

476

"Unsupported type: " + type.getTypeClass().getSimpleName());

477

}

478

}

479

}

480

}

481

}

482

```

483

484

### Connection Validation

485

486

```java

487

public class ConnectionValidator {

488

489

public static void validateConfiguration(Configuration conf) {

490

// Check required properties

491

String zkQuorum = conf.get("hbase.zookeeper.quorum");

492

if (zkQuorum == null || zkQuorum.trim().isEmpty()) {

493

throw new IllegalArgumentException("hbase.zookeeper.quorum must be set");

494

}

495

496

// Validate ZooKeeper addresses

497

String[] zkNodes = zkQuorum.split(",");

498

for (String node : zkNodes) {

499

if (!isValidZkAddress(node.trim())) {

500

throw new IllegalArgumentException("Invalid ZooKeeper address: " + node);

501

}

502

}

503

}

504

505

private static boolean isValidZkAddress(String address) {

506

// Basic validation for hostname:port format

507

return address.matches("^[a-zA-Z0-9.-]+:[0-9]+$");

508

}

509

}

510

```