0
# Schema and Configuration
1
2
The HBase connector provides comprehensive schema definition and configuration classes to map between Flink data types and HBase column families and qualifiers. This includes connection configuration, write performance tuning, and character encoding options.
3
4
## HBaseTableSchema
5
6
The central class for defining the mapping between Flink table schema and HBase table structure.
7
8
```java { .api }
9
class HBaseTableSchema {
10
public HBaseTableSchema();
11
12
// Schema definition
13
public void addColumn(String family, String qualifier, Class<?> clazz);
14
public void setRowKey(String rowKeyName, Class<?> clazz);
15
public void setCharset(String charset);
16
17
// Schema introspection
18
public String[] getFamilyNames();
19
public byte[][] getFamilyKeys();
20
public byte[][] getQualifierKeys(String family);
21
public TypeInformation<?>[] getQualifierTypes(String family);
22
public String getStringCharset();
23
public int getRowKeyIndex();
24
public Optional<TypeInformation<?>> getRowKeyTypeInfo();
25
}
26
```
27
28
### Basic Schema Definition
29
30
```java
31
import org.apache.flink.addons.hbase.HBaseTableSchema;
32
33
// Create schema for user profile table
34
HBaseTableSchema schema = new HBaseTableSchema();
35
36
// Define row key
37
schema.setRowKey("user_id", String.class);
38
39
// Define column families and qualifiers
40
schema.addColumn("personal", "first_name", String.class);
41
schema.addColumn("personal", "last_name", String.class);
42
schema.addColumn("personal", "birth_date", java.sql.Date.class);
43
schema.addColumn("personal", "age", Integer.class);
44
45
schema.addColumn("contact", "email", String.class);
46
schema.addColumn("contact", "phone", String.class);
47
schema.addColumn("contact", "address", String.class);
48
49
schema.addColumn("activity", "last_login", java.sql.Timestamp.class);
50
schema.addColumn("activity", "login_count", Long.class);
51
schema.addColumn("activity", "is_active", Boolean.class);
52
53
schema.addColumn("preferences", "settings", String.class); // JSON as string
54
schema.addColumn("data", "profile_picture", byte[].class); // Binary data
55
```
56
57
### Character Encoding Configuration
58
59
```java
60
// Set character encoding for string serialization
61
HBaseTableSchema schema = new HBaseTableSchema();
62
schema.setCharset("UTF-8"); // Default encoding
63
// schema.setCharset("ISO-8859-1"); // Alternative encoding
64
65
// Add string columns that will use the specified charset
66
schema.setRowKey("user_id", String.class);
67
schema.addColumn("profile", "name", String.class);
68
schema.addColumn("profile", "description", String.class);
69
70
// Binary data is not affected by charset setting
71
schema.addColumn("data", "binary_content", byte[].class);
72
```
73
74
### Schema Introspection
75
76
```java
77
// Examine schema structure
78
HBaseTableSchema schema = // ... configured schema
79
80
// Get all column families
81
String[] families = schema.getFamilyNames();
82
System.out.println("Column families: " + Arrays.toString(families));
83
84
// Get qualifiers for a specific family
85
byte[][] qualifiers = schema.getQualifierKeys("personal");
86
for (byte[] qualifier : qualifiers) {
87
System.out.println("Qualifier: " + Bytes.toString(qualifier));
88
}
89
90
// Get types for a family's qualifiers
91
TypeInformation<?>[] types = schema.getQualifierTypes("personal");
92
for (int i = 0; i < types.length; i++) {
93
System.out.println("Type: " + types[i].getTypeClass().getSimpleName());
94
}
95
96
// Check row key configuration
97
int rowKeyIndex = schema.getRowKeyIndex();
98
Optional<TypeInformation<?>> rowKeyType = schema.getRowKeyTypeInfo();
99
if (rowKeyType.isPresent()) {
100
System.out.println("Row key type: " + rowKeyType.get().getTypeClass().getSimpleName());
101
}
102
```
103
104
## HBaseOptions
105
106
Connection and basic configuration options for HBase connectivity.
107
108
```java { .api }
109
class HBaseOptions {
110
public static Builder builder();
111
112
static class Builder {
113
public Builder setTableName(String tableName); // Required
114
public Builder setZkQuorum(String zkQuorum); // Required
115
public Builder setZkNodeParent(String zkNodeParent); // Optional, default: "/hbase"
116
public HBaseOptions build();
117
}
118
}
119
```
120
121
### Basic Connection Configuration
122
123
```java
124
import org.apache.flink.addons.hbase.HBaseOptions;
125
126
// Simple configuration
127
HBaseOptions basicOptions = HBaseOptions.builder()
128
.setTableName("user_profiles")
129
.setZkQuorum("localhost:2181")
130
.build();
131
132
// Production configuration with multiple ZooKeeper nodes
133
HBaseOptions productionOptions = HBaseOptions.builder()
134
.setTableName("user_events")
135
.setZkQuorum("zk1.prod.com:2181,zk2.prod.com:2181,zk3.prod.com:2181")
136
.setZkNodeParent("/hbase-prod") // Custom ZNode parent
137
.build();
138
139
// Configuration with custom ZooKeeper port
140
HBaseOptions customPortOptions = HBaseOptions.builder()
141
.setTableName("analytics_data")
142
.setZkQuorum("zk-cluster:2182") // Non-standard port
143
.setZkNodeParent("/hbase")
144
.build();
145
```
146
147
## HBaseWriteOptions
148
149
Performance tuning options for write operations with buffering configuration.
150
151
```java { .api }
152
class HBaseWriteOptions {
153
public static Builder builder();
154
155
static class Builder {
156
public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes);
157
public Builder setBufferFlushMaxRows(long bufferFlushMaxRows);
158
public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis);
159
public HBaseWriteOptions build();
160
}
161
}
162
```
163
164
### Write Performance Configuration
165
166
```java
167
import org.apache.flink.addons.hbase.HBaseWriteOptions;
168
169
// High-throughput configuration
170
HBaseWriteOptions highThroughputOptions = HBaseWriteOptions.builder()
171
.setBufferFlushMaxSizeInBytes(16 * 1024 * 1024) // 16MB buffer
172
.setBufferFlushMaxRows(10000) // 10,000 mutations per batch
173
.setBufferFlushIntervalMillis(30000) // 30 second flush interval
174
.build();
175
176
// Low-latency configuration
177
HBaseWriteOptions lowLatencyOptions = HBaseWriteOptions.builder()
178
.setBufferFlushMaxSizeInBytes(512 * 1024) // 512KB buffer
179
.setBufferFlushMaxRows(100) // 100 mutations per batch
180
.setBufferFlushIntervalMillis(1000) // 1 second flush interval
181
.build();
182
183
// Balanced configuration
184
HBaseWriteOptions balancedOptions = HBaseWriteOptions.builder()
185
.setBufferFlushMaxSizeInBytes(4 * 1024 * 1024) // 4MB buffer
186
.setBufferFlushMaxRows(2000) // 2,000 mutations per batch
187
.setBufferFlushIntervalMillis(5000) // 5 second flush interval
188
.build();
189
190
// Memory-constrained configuration
191
HBaseWriteOptions memoryConstrainedOptions = HBaseWriteOptions.builder()
192
.setBufferFlushMaxSizeInBytes(256 * 1024) // 256KB buffer
193
.setBufferFlushMaxRows(50) // 50 mutations per batch
194
.setBufferFlushIntervalMillis(2000) // 2 second flush interval
195
.build();
196
```
197
198
## Hadoop Configuration
199
200
Advanced HBase configuration using Hadoop Configuration objects for fine-grained control.
201
202
### Basic Hadoop Configuration
203
204
```java
205
import org.apache.hadoop.conf.Configuration;
206
207
// Create and configure Hadoop Configuration
208
Configuration conf = new Configuration();
209
210
// Basic connection settings
211
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
212
conf.set("hbase.zookeeper.property.clientPort", "2181");
213
conf.set("zookeeper.znode.parent", "/hbase");
214
215
// Cluster configuration
216
conf.set("hbase.cluster.distributed", "true");
217
conf.set("hbase.master", "hbase-master:60000");
218
```
219
220
### Performance Tuning Configuration
221
222
```java
223
// Client-side performance tuning
224
Configuration perfConf = new Configuration();
225
226
// Connection settings
227
perfConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
228
229
// Write performance
230
perfConf.setLong("hbase.client.write.buffer", 8 * 1024 * 1024); // 8MB write buffer
231
perfConf.setInt("hbase.client.max.total.tasks", 200); // Max concurrent tasks
232
perfConf.setInt("hbase.client.max.perserver.tasks", 20); // Max tasks per server
233
perfConf.setInt("hbase.client.max.perregion.tasks", 5); // Max tasks per region
234
235
// Read performance
236
perfConf.setInt("hbase.client.scanner.caching", 1000); // Scanner caching
237
perfConf.setLong("hbase.client.scanner.max.result.size", 4 * 1024 * 1024); // 4MB max result
238
perfConf.setBoolean("hbase.client.scanner.async.prefetch", true); // Async prefetch
239
240
// Connection pool
241
perfConf.setInt("hbase.client.ipc.pool.size", 10); // Connection pool size
242
perfConf.setInt("hbase.client.ipc.pool.type", 1); // RoundRobin pool
243
244
// Timeout settings
245
perfConf.setLong("hbase.rpc.timeout", 120000); // 2 minute RPC timeout
246
perfConf.setLong("hbase.client.operation.timeout", 300000); // 5 minute operation timeout
247
perfConf.setLong("hbase.client.scanner.timeout.period", 600000); // 10 minute scanner timeout
248
249
// Retry settings
250
perfConf.setInt("hbase.client.retries.number", 10); // Max retries
251
perfConf.setLong("hbase.client.pause", 200); // Retry pause (ms)
252
perfConf.setLong("hbase.client.pause.cqtbe", 1000); // Quota exceeded pause
253
```
254
255
### Security Configuration
256
257
```java
258
// Kerberos authentication
259
Configuration secureConf = new Configuration();
260
secureConf.set("hbase.zookeeper.quorum", "secure-zk1:2181,secure-zk2:2181");
261
262
// Enable security
263
secureConf.set("hbase.security.authentication", "kerberos");
264
secureConf.set("hbase.security.authorization", "true");
265
secureConf.set("hbase.master.kerberos.principal", "hbase/_HOST@REALM.COM");
266
secureConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@REALM.COM");
267
268
// Client principal and keytab
269
secureConf.set("hbase.client.kerberos.principal", "flink-user@REALM.COM");
270
secureConf.set("hbase.client.keytab.file", "/path/to/flink-user.keytab");
271
272
// HDFS security (if applicable)
273
secureConf.set("dfs.nameservices", "hdfs-cluster");
274
secureConf.set("hadoop.security.authentication", "kerberos");
275
```
276
277
## Data Type Mapping
278
279
Comprehensive mapping between Java types and HBase storage formats.
280
281
### Supported Data Types
282
283
```java
284
HBaseTableSchema schema = new HBaseTableSchema();
285
286
// Primitive types
287
schema.addColumn("primitives", "byte_val", Byte.class);
288
schema.addColumn("primitives", "short_val", Short.class);
289
schema.addColumn("primitives", "int_val", Integer.class);
290
schema.addColumn("primitives", "long_val", Long.class);
291
schema.addColumn("primitives", "float_val", Float.class);
292
schema.addColumn("primitives", "double_val", Double.class);
293
schema.addColumn("primitives", "boolean_val", Boolean.class);
294
295
// String and binary types
296
schema.addColumn("text", "string_val", String.class);
297
schema.addColumn("binary", "byte_array", byte[].class);
298
299
// Temporal types
300
schema.addColumn("time", "timestamp_val", java.sql.Timestamp.class);
301
schema.addColumn("time", "date_val", java.sql.Date.class);
302
schema.addColumn("time", "time_val", java.sql.Time.class);
303
304
// Numeric types
305
schema.addColumn("numbers", "decimal_val", java.math.BigDecimal.class);
306
schema.addColumn("numbers", "bigint_val", java.math.BigInteger.class);
307
```
308
309
### Type Conversion Examples
310
311
```java
312
// Example data insertion with proper types
313
Row userRow = Row.of(
314
"user123", // String row key
315
"John", // String (first_name)
316
"Doe", // String (last_name)
317
Date.valueOf("1990-05-15"), // Date (birth_date)
318
33, // Integer (age)
319
"john.doe@email.com", // String (email)
320
"+1-555-123-4567", // String (phone)
321
true, // Boolean (is_active)
322
new Timestamp(System.currentTimeMillis()), // Timestamp (last_login)
323
1247L, // Long (login_count)
324
new BigDecimal("99.99"), // BigDecimal (balance)
325
"profile_data".getBytes("UTF-8") // byte[] (binary_data)
326
);
327
```
328
329
## Configuration Patterns
330
331
### Environment-Specific Configuration
332
333
```java
334
// Development environment
335
public static HBaseOptions createDevConfig(String tableName) {
336
return HBaseOptions.builder()
337
.setTableName(tableName)
338
.setZkQuorum("localhost:2181")
339
.setZkNodeParent("/hbase")
340
.build();
341
}
342
343
// Staging environment
344
public static HBaseOptions createStagingConfig(String tableName) {
345
return HBaseOptions.builder()
346
.setTableName("staging_" + tableName)
347
.setZkQuorum("staging-zk1:2181,staging-zk2:2181")
348
.setZkNodeParent("/hbase-staging")
349
.build();
350
}
351
352
// Production environment
353
public static HBaseOptions createProdConfig(String tableName) {
354
return HBaseOptions.builder()
355
.setTableName("prod_" + tableName)
356
.setZkQuorum("prod-zk1:2181,prod-zk2:2181,prod-zk3:2181")
357
.setZkNodeParent("/hbase-prod")
358
.build();
359
}
360
```
361
362
### Workload-Specific Write Options
363
364
```java
365
// Real-time analytics workload
366
public static HBaseWriteOptions createRealTimeWriteOptions() {
367
return HBaseWriteOptions.builder()
368
.setBufferFlushMaxSizeInBytes(1 * 1024 * 1024) // 1MB - small buffer
369
.setBufferFlushMaxRows(500) // 500 mutations
370
.setBufferFlushIntervalMillis(2000) // 2 second interval
371
.build();
372
}
373
374
// Batch processing workload
375
public static HBaseWriteOptions createBatchWriteOptions() {
376
return HBaseWriteOptions.builder()
377
.setBufferFlushMaxSizeInBytes(32 * 1024 * 1024) // 32MB - large buffer
378
.setBufferFlushMaxRows(20000) // 20,000 mutations
379
.setBufferFlushIntervalMillis(60000) // 60 second interval
380
.build();
381
}
382
383
// Mixed workload
384
public static HBaseWriteOptions createMixedWriteOptions() {
385
return HBaseWriteOptions.builder()
386
.setBufferFlushMaxSizeInBytes(8 * 1024 * 1024) // 8MB buffer
387
.setBufferFlushMaxRows(4000) // 4,000 mutations
388
.setBufferFlushIntervalMillis(10000) // 10 second interval
389
.build();
390
}
391
```
392
393
## Schema Evolution and Versioning
394
395
### Adding New Columns
396
397
```java
398
// Original schema
399
HBaseTableSchema v1Schema = new HBaseTableSchema();
400
v1Schema.setRowKey("user_id", String.class);
401
v1Schema.addColumn("profile", "name", String.class);
402
v1Schema.addColumn("profile", "email", String.class);
403
404
// Evolved schema - adding new columns
405
HBaseTableSchema v2Schema = new HBaseTableSchema();
406
v2Schema.setRowKey("user_id", String.class);
407
v2Schema.addColumn("profile", "name", String.class);
408
v2Schema.addColumn("profile", "email", String.class);
409
// New columns
410
v2Schema.addColumn("profile", "phone", String.class); // New optional field
411
v2Schema.addColumn("preferences", "theme", String.class); // New column family
412
v2Schema.addColumn("activity", "last_seen", java.sql.Timestamp.class); // New activity tracking
413
```
414
415
### Handling Schema Changes
416
417
```java
418
// Schema-aware processing that handles missing columns gracefully
419
public class SchemaAwareProcessor {
420
421
public void processRow(Row row, HBaseTableSchema schema) {
422
// Always present fields
423
String userId = (String) row.getField(schema.getRowKeyIndex());
424
425
// Handle potentially missing fields
426
String name = getFieldSafely(row, schema, "profile", "name", String.class);
427
String phone = getFieldSafely(row, schema, "profile", "phone", String.class);
428
429
// Use defaults for missing fields
430
if (phone == null) {
431
phone = "N/A";
432
}
433
434
// Process with null-safe logic
435
processUserData(userId, name, phone);
436
}
437
438
private <T> T getFieldSafely(Row row, HBaseTableSchema schema,
439
String family, String qualifier, Class<T> type) {
440
try {
441
// Implementation would need schema introspection to find field index
442
// This is a conceptual example
443
return type.cast(row.getField(findFieldIndex(schema, family, qualifier)));
444
} catch (Exception e) {
445
return null; // Field not present in this schema version
446
}
447
}
448
}
449
```
450
451
## Configuration Validation
452
453
### Schema Validation
454
455
```java
456
public class SchemaValidator {
457
458
public static void validateSchema(HBaseTableSchema schema) {
459
// Check row key is defined
460
if (schema.getRowKeyIndex() < 0) {
461
throw new IllegalArgumentException("Row key must be defined");
462
}
463
464
// Check at least one column family exists
465
String[] families = schema.getFamilyNames();
466
if (families == null || families.length == 0) {
467
throw new IllegalArgumentException("At least one column family must be defined");
468
}
469
470
// Validate supported types
471
for (String family : families) {
472
TypeInformation<?>[] types = schema.getQualifierTypes(family);
473
for (TypeInformation<?> type : types) {
474
if (!HBaseTypeUtils.isSupportedType(type.getTypeClass())) {
475
throw new IllegalArgumentException(
476
"Unsupported type: " + type.getTypeClass().getSimpleName());
477
}
478
}
479
}
480
}
481
}
482
```
483
484
### Connection Validation
485
486
```java
487
public class ConnectionValidator {
488
489
public static void validateConfiguration(Configuration conf) {
490
// Check required properties
491
String zkQuorum = conf.get("hbase.zookeeper.quorum");
492
if (zkQuorum == null || zkQuorum.trim().isEmpty()) {
493
throw new IllegalArgumentException("hbase.zookeeper.quorum must be set");
494
}
495
496
// Validate ZooKeeper addresses
497
String[] zkNodes = zkQuorum.split(",");
498
for (String node : zkNodes) {
499
if (!isValidZkAddress(node.trim())) {
500
throw new IllegalArgumentException("Invalid ZooKeeper address: " + node);
501
}
502
}
503
}
504
505
private static boolean isValidZkAddress(String address) {
506
// Basic validation for hostname:port format
507
return address.matches("^[a-zA-Z0-9.-]+:[0-9]+$");
508
}
509
}
510
```