0
# Sink Operations and Buffering
1
2
UPSERT sink operations with intelligent buffering strategies, exactly-once semantics through checkpointing, and comprehensive error handling.
3
4
## Capabilities
5
6
### UPSERT Operations
7
8
HBase sink supports UPSERT semantics, handling INSERT, UPDATE_AFTER, and DELETE operations from Flink's changelog streams.
9
10
```sql { .api }
11
-- Supported changelog operations
12
INSERT INTO hbase_table VALUES (...); -- Creates new row or updates existing
13
UPDATE hbase_table SET ... WHERE rowkey = '...'; -- Updates existing row (becomes UPDATE_AFTER)
14
DELETE FROM hbase_table WHERE rowkey = '...'; -- Deletes row by row key
15
```
16
17
**Operation Mapping**:
18
- `INSERT` and `UPDATE_AFTER`: Converted to HBase PUT operations
19
- `DELETE`: Converted to HBase DELETE operations
20
- `UPDATE_BEFORE`: Ignored (not needed for UPSERT semantics)
21
22
**Usage Examples**:
23
24
```sql
25
-- Streaming UPSERT from changelog source
26
INSERT INTO user_profiles
27
SELECT
28
user_id,
29
ROW(name, email, age) AS info,
30
ROW(last_login, total_orders) AS activity
31
FROM user_changelog_stream;
32
33
-- Batch UPSERT operation
34
INSERT INTO product_inventory
35
SELECT
36
product_id,
37
ROW(name, category, price) AS basic_info,
38
ROW(stock_count, warehouse_location) AS inventory
39
FROM product_updates
40
WHERE update_type IN ('INSERT', 'UPDATE');
41
42
-- Delete operation
43
DELETE FROM expired_sessions
44
WHERE session_id IN (
45
SELECT session_id FROM session_cleanup_stream
46
);
47
```
48
49
### Buffering Configuration
50
51
Intelligent buffering strategies to optimize write performance and reduce HBase load.
52
53
```sql { .api }
54
WITH (
55
'sink.buffer-flush.max-size' = '2mb', -- Buffer size threshold (default: 2MB)
56
'sink.buffer-flush.max-rows' = '1000', -- Buffer row count threshold (default: 1000)
57
'sink.buffer-flush.interval' = '1s' -- Time-based flush interval (default: 1s)
58
)
59
```
60
61
**Buffer Flush Triggers**:
62
1. **Size Threshold**: When buffered mutations exceed max-size
63
2. **Row Count Threshold**: When buffered row count exceeds max-rows
64
3. **Time Interval**: When flush interval expires since last flush
65
4. **Checkpoint**: On Flink checkpoint to ensure exactly-once semantics
66
67
**Usage Examples**:
68
69
```sql
70
-- High-throughput buffering for batch loads
71
CREATE TABLE batch_sink (
72
id STRING,
73
data ROW<value STRING, timestamp BIGINT>,
74
PRIMARY KEY (id) NOT ENFORCED
75
) WITH (
76
'connector' = 'hbase-2.2',
77
'table-name' = 'batch_data',
78
'zookeeper.quorum' = 'localhost:2181',
79
'sink.buffer-flush.max-size' = '64mb', -- Large buffer for batch
80
'sink.buffer-flush.max-rows' = '100000', -- High row count
81
'sink.buffer-flush.interval' = '30s' -- Longer flush interval
82
);
83
84
-- Low-latency streaming for real-time updates
85
CREATE TABLE realtime_sink (
86
event_id STRING,
87
event_data ROW<type STRING, payload STRING, timestamp TIMESTAMP(3)>,
88
PRIMARY KEY (event_id) NOT ENFORCED
89
) WITH (
90
'connector' = 'hbase-2.2',
91
'table-name' = 'events',
92
'zookeeper.quorum' = 'localhost:2181',
93
'sink.buffer-flush.max-size' = '100kb', -- Small buffer for low latency
94
'sink.buffer-flush.max-rows' = '10', -- Low row count
95
'sink.buffer-flush.interval' = '100ms' -- Fast flush interval
96
);
97
```
98
99
### Parallelism Configuration
100
101
Control sink parallelism for optimal write throughput and resource utilization.
102
103
```sql { .api }
104
WITH (
105
'sink.parallelism' = '4' -- Number of parallel sink operators
106
)
107
```
108
109
**Parallelism Considerations**:
110
- Higher parallelism increases write throughput
111
- Each parallel instance maintains separate buffers
112
- HBase region distribution affects optimal parallelism
113
- Memory usage scales with parallelism
114
115
**Usage Examples**:
116
117
```sql
118
-- High-throughput sink with multiple parallel writers
119
CREATE TABLE parallel_sink (
120
partition_key STRING,
121
metrics ROW<cpu_usage DOUBLE, memory_usage DOUBLE, timestamp TIMESTAMP(3)>,
122
PRIMARY KEY (partition_key) NOT ENFORCED
123
) WITH (
124
'connector' = 'hbase-2.2',
125
'table-name' = 'system_metrics',
126
'zookeeper.quorum' = 'localhost:2181',
127
'sink.parallelism' = '8', -- 8 parallel writers
128
'sink.buffer-flush.max-size' = '8mb',
129
'sink.buffer-flush.max-rows' = '5000'
130
);
131
132
-- Single-writer sink for ordered operations
133
CREATE TABLE ordered_sink (
134
sequence_id STRING,
135
ordered_data ROW<value STRING, order_timestamp TIMESTAMP(3)>,
136
PRIMARY KEY (sequence_id) NOT ENFORCED
137
) WITH (
138
'connector' = 'hbase-2.2',
139
'table-name' = 'ordered_events',
140
'zookeeper.quorum' = 'localhost:2181',
141
'sink.parallelism' = '1' -- Single writer for ordering
142
);
143
```
144
145
### Exactly-Once Semantics
146
147
Checkpoint integration ensures exactly-once processing guarantees with HBase.
148
149
**Checkpoint Behavior**:
150
- Buffers are flushed on checkpoint barriers
151
- Failed checkpoints trigger buffer discard and restart
152
- Recovery restores buffer state from last successful checkpoint
153
- Two-phase commit protocol ensures data consistency
154
155
**Configuration Example**:
156
```sql
157
-- Sink with exactly-once guarantees
158
CREATE TABLE exactly_once_sink (
159
transaction_id STRING,
160
transaction ROW<amount DECIMAL(15,2), currency STRING, timestamp TIMESTAMP(3)>,
161
PRIMARY KEY (transaction_id) NOT ENFORCED
162
) WITH (
163
'connector' = 'hbase-2.2',
164
'table-name' = 'financial_transactions',
165
'zookeeper.quorum' = 'localhost:2181',
166
'sink.buffer-flush.max-size' = '4mb',
167
'sink.buffer-flush.max-rows' = '2000',
168
'sink.buffer-flush.interval' = '5s'
169
);
170
```
171
172
**Flink Job Configuration** (for exactly-once):
173
```java
174
// Configure checkpointing for exactly-once
175
env.enableCheckpointing(60000); // Checkpoint every 60 seconds
176
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
177
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
178
```
179
180
### Error Handling and Recovery
181
182
Comprehensive error handling for write failures and network issues.
183
184
**Error Categories**:
185
1. **Transient Errors**: Connection timeouts, region server unavailability
186
2. **Permanent Errors**: Table not found, schema mismatches, permission denied
187
3. **Data Errors**: Serialization failures, constraint violations
188
189
**Error Handling Strategies**:
190
191
```sql
192
-- Robust sink with retry and error handling
193
CREATE TABLE robust_sink (
194
record_id STRING,
195
payload ROW<data STRING, metadata STRING>,
196
PRIMARY KEY (record_id) NOT ENFORCED
197
) WITH (
198
'connector' = 'hbase-2.2',
199
'table-name' = 'reliable_data',
200
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
201
'sink.buffer-flush.max-size' = '2mb',
202
'sink.buffer-flush.max-rows' = '1000',
203
'sink.buffer-flush.interval' = '10s'
204
);
205
```
206
207
**Dead Letter Queue Pattern**:
208
```sql
209
-- Main sink with error handling
210
INSERT INTO main_hbase_sink
211
SELECT * FROM input_stream
212
WHERE is_valid_record(data);
213
214
-- Error records to dead letter queue
215
INSERT INTO error_sink
216
SELECT *, 'validation_failed' AS error_reason
217
FROM input_stream
218
WHERE NOT is_valid_record(data);
219
```
220
221
### Performance Tuning Guidelines
222
223
**Buffer Size Tuning**:
224
- Start with default 2MB buffer size
225
- Increase for batch workloads (up to 64MB)
226
- Decrease for low-latency requirements (down to 100KB)
227
- Monitor memory usage and adjust accordingly
228
229
**Row Count Tuning**:
230
- Default 1000 rows works for most scenarios
231
- Increase for small records (up to 100K rows)
232
- Decrease for large records (down to 10 rows)
233
- Balance with memory constraints
234
235
**Flush Interval Tuning**:
236
- Default 1 second provides good balance
237
- Decrease for real-time applications (100ms-500ms)
238
- Increase for batch processing (10s-60s)
239
- Consider checkpoint interval alignment
240
241
**Complete Performance Configuration**:
242
```sql
243
CREATE TABLE tuned_sink (
244
key STRING,
245
large_payload ROW<
246
json_data STRING, -- Large JSON payloads
247
binary_data VARBINARY, -- Binary attachments
248
metadata ROW<
249
size_bytes BIGINT,
250
content_type STRING,
251
created_at TIMESTAMP(3)
252
>
253
>,
254
PRIMARY KEY (key) NOT ENFORCED
255
) WITH (
256
'connector' = 'hbase-2.2',
257
'table-name' = 'large_objects',
258
'zookeeper.quorum' = 'localhost:2181',
259
260
-- Tuned for large records
261
'sink.buffer-flush.max-size' = '32mb', -- Large buffer for big records
262
'sink.buffer-flush.max-rows' = '100', -- Few rows due to size
263
'sink.buffer-flush.interval' = '15s', -- Longer interval for batching
264
'sink.parallelism' = '4' -- Moderate parallelism
265
);
266
```
267
268
### Monitoring and Metrics
269
270
**Key Metrics to Monitor**:
271
- Buffer flush frequency and size
272
- Write throughput (records/second)
273
- Write latency (end-to-end)
274
- Error rates and types
275
- Memory usage per sink task
276
277
**Monitoring Query Example**:
278
```sql
279
-- Monitor sink performance
280
SELECT
281
window_start,
282
window_end,
283
COUNT(*) AS records_written,
284
COUNT(DISTINCT rowkey) AS unique_keys,
285
COUNT(*) / EXTRACT(EPOCH FROM (window_end - window_start)) AS records_per_second
286
FROM TABLE(
287
TUMBLE(TABLE sink_monitoring, DESCRIPTOR(proc_time), INTERVAL '1' MINUTE)
288
)
289
GROUP BY window_start, window_end;
290
```
291
292
### Troubleshooting Common Issues
293
294
**High Memory Usage**:
295
- Reduce `sink.buffer-flush.max-size`
296
- Decrease `sink.parallelism`
297
- Monitor heap usage and adjust JVM settings
298
299
**Slow Write Performance**:
300
- Increase `sink.buffer-flush.max-size`
301
- Increase `sink.parallelism`
302
- Check HBase region distribution
303
- Verify network connectivity
304
305
**Data Loss Concerns**:
306
- Enable Flink checkpointing
307
- Verify exactly-once configuration
308
- Monitor checkpoint success rates
309
- Check HBase write acknowledgments
310
311
**Connection Issues**:
312
- Verify Zookeeper connectivity
313
- Check HBase region server health
314
- Review network configuration
315
- Monitor connection pool exhaustion