0
# Sink Operations
1
2
Writing data to HBase tables with configurable buffering, batching, and exactly-once processing guarantees in the Apache Flink HBase 1.4 Connector.
3
4
## Capabilities
5
6
### HBaseDynamicTableSink
7
8
Table sink implementation that enables writing data to HBase tables through Flink's Table API and SQL with configurable write options and change data capture support.
9
10
```java { .api }
11
/**
12
* HBase table sink implementation for writing data to HBase tables
13
* Supports UPSERT operations with configurable buffering and batching
14
*/
15
@Internal
16
public class HBaseDynamicTableSink implements DynamicTableSink {
17
18
/**
19
* Creates a new HBase dynamic table sink
20
* @param tableName Name of the HBase table to write to
21
* @param hbaseTableSchema Schema mapping for the HBase table
22
* @param hbaseConf Hadoop configuration for HBase connection
23
* @param writeOptions Configuration for buffering and write performance
24
* @param nullStringLiteral String representation for null values
25
*/
26
public HBaseDynamicTableSink(
27
String tableName,
28
HBaseTableSchema hbaseTableSchema,
29
Configuration hbaseConf,
30
HBaseWriteOptions writeOptions,
31
String nullStringLiteral
32
);
33
34
/**
35
* Returns the sink runtime provider with configured HBase sink function
36
* @param context Sink context for runtime configuration
37
* @return SinkFunctionProvider with HBaseSinkFunction and parallelism settings
38
*/
39
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
40
41
/**
42
* Returns the supported changelog mode for this sink
43
* @param requestedMode The changelog mode requested by the planner
44
* @return ChangelogMode supporting INSERT, UPDATE_AFTER, and DELETE operations
45
*/
46
public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
47
48
/**
49
* Creates a copy of this table sink for parallel execution
50
* @return New HBaseDynamicTableSink instance with same configuration
51
*/
52
public DynamicTableSink copy();
53
54
/**
55
* Returns a string summary of this sink
56
* @return "HBase" identifier string
57
*/
58
public String asSummaryString();
59
60
// Testing methods
61
/**
62
* Returns the HBase table schema for testing purposes
63
* @return HBaseTableSchema instance with column family mappings
64
*/
65
@VisibleForTesting
66
public HBaseTableSchema getHBaseTableSchema();
67
68
/**
69
* Returns the write options configuration for testing purposes
70
* @return HBaseWriteOptions instance with buffering settings
71
*/
72
@VisibleForTesting
73
public HBaseWriteOptions getWriteOptions();
74
75
/**
76
* Returns the Hadoop configuration for testing purposes
77
* @return Configuration instance with HBase connection settings
78
*/
79
@VisibleForTesting
80
public Configuration getConfiguration();
81
82
/**
83
* Returns the table name for testing purposes
84
* @return String name of the target HBase table
85
*/
86
@VisibleForTesting
87
public String getTableName();
88
}
89
```
90
91
**Usage Example:**
92
93
```java
94
// Example: Writing streaming data to HBase
95
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
96
TableEnvironment tableEnv = StreamTableEnvironment.create(env);
97
98
// Create HBase sink table
99
tableEnv.executeSql(
100
"CREATE TABLE user_activity_sink (" +
101
" user_id STRING," +
102
" activity ROW<event_type STRING, timestamp TIMESTAMP(3), value DOUBLE>," +
103
" metadata ROW<source STRING, processed_time TIMESTAMP(3)>," +
104
" PRIMARY KEY (user_id) NOT ENFORCED" +
105
") WITH (" +
106
" 'connector' = 'hbase-1.4'," +
107
" 'table-name' = 'user_events'," +
108
" 'zookeeper.quorum' = 'localhost:2181'," +
109
" 'sink.buffer-flush.max-size' = '4mb'," +
110
" 'sink.buffer-flush.max-rows' = '2000'," +
111
" 'sink.buffer-flush.interval' = '2s'" +
112
")"
113
);
114
115
// Insert data into HBase
116
tableEnv.executeSql(
117
"INSERT INTO user_activity_sink " +
118
"SELECT user_id, activity, metadata FROM source_stream"
119
);
120
```
121
122
## Changelog Mode Support
123
124
### UPSERT Operations
125
126
The HBase sink supports UPSERT (INSERT/UPDATE/DELETE) operations through Flink's changelog mode, mapping to HBase's natural key-value storage model.
127
128
```java { .api }
129
/**
130
* Supported row change types:
131
* - INSERT: Creates new HBase row or overwrites existing
132
* - UPDATE_AFTER: Updates existing HBase row (same as INSERT in HBase)
133
* - DELETE: Removes HBase row
134
*
135
* UPDATE_BEFORE operations are filtered out as HBase doesn't need them
136
*/
137
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
138
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
139
for (RowKind kind : requestedMode.getContainedKinds()) {
140
if (kind != RowKind.UPDATE_BEFORE) {
141
builder.addContainedKind(kind);
142
}
143
}
144
return builder.build();
145
}
146
```
147
148
**Changelog Example:**
149
150
```sql
151
-- Example: Processing CDC stream to HBase
152
CREATE TABLE orders_cdc (
153
order_id STRING,
154
customer_id STRING,
155
amount DECIMAL(10,2),
156
status STRING,
157
PRIMARY KEY (order_id) NOT ENFORCED
158
) WITH (
159
'connector' = 'kafka',
160
'topic' = 'orders-cdc',
161
'format' = 'debezium-json'
162
);
163
164
CREATE TABLE orders_hbase (
165
order_id STRING,
166
order_info ROW<customer_id STRING, amount DECIMAL(10,2), status STRING>,
167
PRIMARY KEY (order_id) NOT ENFORCED
168
) WITH (
169
'connector' = 'hbase-1.4',
170
'table-name' = 'orders',
171
'zookeeper.quorum' = 'localhost:2181'
172
);
173
174
-- Process CDC events: INSERT, UPDATE, DELETE automatically handled
175
INSERT INTO orders_hbase
176
SELECT
177
order_id,
178
ROW(customer_id, amount, status) as order_info
179
FROM orders_cdc;
180
```
181
182
## Write Performance Configuration
183
184
### Buffer Configuration
185
186
The sink provides multiple buffering strategies to optimize write throughput and latency trade-offs.
187
188
**Buffering Options:**
189
190
1. **Size-based flushing**: Flush when buffer reaches specified memory size
191
2. **Count-based flushing**: Flush when buffer reaches specified row count
192
3. **Time-based flushing**: Flush at regular intervals regardless of buffer size
193
4. **Combined strategies**: Use multiple triggers for optimal performance
194
195
```sql
196
-- Example: High-throughput configuration
197
CREATE TABLE high_volume_sink (
198
-- Table schema
199
) WITH (
200
'connector' = 'hbase-1.4',
201
'table-name' = 'events',
202
'zookeeper.quorum' = 'localhost:2181',
203
-- Large buffer for high throughput
204
'sink.buffer-flush.max-size' = '10mb',
205
'sink.buffer-flush.max-rows' = '5000',
206
'sink.buffer-flush.interval' = '5s',
207
-- High parallelism for write scaling
208
'sink.parallelism' = '8'
209
);
210
211
-- Example: Low-latency configuration
212
CREATE TABLE low_latency_sink (
213
-- Table schema
214
) WITH (
215
'connector' = 'hbase-1.4',
216
'table-name' = 'realtime_data',
217
'zookeeper.quorum' = 'localhost:2181',
218
-- Small buffer for low latency
219
'sink.buffer-flush.max-size' = '100kb',
220
'sink.buffer-flush.max-rows' = '100',
221
'sink.buffer-flush.interval' = '500ms'
222
);
223
```
224
225
### Parallelism Control
226
227
The sink supports configurable parallelism to scale write operations across multiple HBase region servers.
228
229
```sql
230
CREATE TABLE scalable_sink (
231
-- Table schema
232
) WITH (
233
'connector' = 'hbase-1.4',
234
'table-name' = 'large_table',
235
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
236
-- Scale write operations
237
'sink.parallelism' = '12',
238
-- Optimize for distributed writes
239
'sink.buffer-flush.max-size' = '8mb',
240
'sink.buffer-flush.max-rows' = '4000'
241
);
242
```
243
244
## Data Conversion and Serialization
245
246
### Row Key Mapping
247
248
The sink automatically handles conversion from Flink's primary key to HBase row key format.
249
250
**Conversion Rules:**
251
252
- Simple primary keys: Direct string conversion
253
- Composite primary keys: Concatenated with separators
254
- Complex types: Serialized to byte arrays
255
256
```java
257
// Example: Row key conversion
258
// Flink primary key: ("user_123", "2023-01-01")
259
// HBase row key: "user_123|2023-01-01"
260
```
261
262
### Column Family Mapping
263
264
Flink ROW types are mapped to HBase column families with individual fields becoming column qualifiers.
265
266
```sql
267
-- Flink schema
268
CREATE TABLE user_data (
269
user_id STRING,
270
profile ROW<name STRING, age INT, email STRING>,
271
settings ROW<theme STRING, notifications BOOLEAN>,
272
PRIMARY KEY (user_id) NOT ENFORCED
273
) WITH (...);
274
275
-- Maps to HBase structure:
276
-- Row key: user_id value
277
-- Column family 'profile': columns 'name', 'age', 'email'
278
-- Column family 'settings': columns 'theme', 'notifications'
279
```
280
281
### Type Serialization
282
283
All Flink data types are automatically serialized to HBase-compatible byte arrays.
284
285
**Supported Type Conversions:**
286
287
- Primitive types: Direct byte serialization
288
- Timestamp types: Long milliseconds representation
289
- Decimal types: Precision-preserving byte encoding
290
- String types: UTF-8 byte encoding
291
- Complex types: JSON or binary serialization
292
293
## Error Handling and Reliability
294
295
### Exactly-Once Guarantees
296
297
The sink integrates with Flink's checkpointing mechanism to provide exactly-once processing guarantees.
298
299
**Reliability Features:**
300
301
- Transactional writes aligned with Flink checkpoints
302
- Automatic retry on temporary failures
303
- Dead letter queue support for failed records
304
- Connection pooling and management
305
306
### Exception Handling
307
308
Comprehensive error handling for various failure scenarios:
309
310
```java
311
// Common error scenarios and handling:
312
313
// 1. Connection failures
314
// - Automatic retry with exponential backoff
315
// - Connection pool management
316
// - Failover to backup region servers
317
318
// 2. Table schema mismatches
319
// - Schema validation during sink creation
320
// - Clear error messages for incompatible types
321
// - Graceful handling of missing column families
322
323
// 3. Write buffer overflows
324
// - Configurable buffer sizes and timeouts
325
// - Automatic flushing on resource pressure
326
// - Memory usage monitoring and alerts
327
328
// 4. HBase cluster unavailability
329
// - Circuit breaker pattern for failures
330
// - Graceful degradation and recovery
331
// - Integration with Flink's restart strategies
332
```
333
334
**Error Configuration:**
335
336
```sql
337
CREATE TABLE resilient_sink (
338
-- Table schema
339
) WITH (
340
'connector' = 'hbase-1.4',
341
'table-name' = 'critical_data',
342
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
343
-- Aggressive retry for critical data
344
'sink.buffer-flush.max-size' = '2mb',
345
'sink.buffer-flush.interval' = '1s',
346
-- Multiple ZK nodes for high availability
347
);
348
```
349
350
## Monitoring and Metrics
351
352
The sink provides comprehensive metrics for monitoring write performance and health:
353
354
**Available Metrics:**
355
356
- Write throughput (records/second, bytes/second)
357
- Buffer utilization and flush frequency
358
- Error rates and retry counts
359
- Connection pool status
360
- Latency percentiles for write operations
361
362
**Integration with Flink Metrics:**
363
364
```java
365
// Metrics are automatically registered with Flink's metric system
366
// Available in Flink UI and external monitoring systems
367
// - numRecordsOut: Total records written
368
// - numBytesOut: Total bytes written
369
// - currentSendTime: Current write latency
370
// - bufferUsage: Current buffer utilization percentage
371
```