0
# DataStream Sink Functions
1
2
The HBase connector provides sink functions for writing data from Flink DataStreams to HBase tables. These functions support upsert operations, configurable buffering, and automatic batching for optimal write performance.
3
4
## HBaseUpsertSinkFunction
5
6
The primary sink function for writing Flink DataStream data to HBase with upsert semantics and buffering support.
7
8
```java { .api }
9
class HBaseUpsertSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>>
10
implements CheckpointedFunction, BufferedMutator.ExceptionListener {
11
12
public HBaseUpsertSinkFunction(String hTableName, HBaseTableSchema schema,
13
Configuration conf, long bufferFlushMaxSizeInBytes,
14
long bufferFlushMaxMutations, long bufferFlushIntervalMillis);
15
16
public void open(Configuration parameters) throws Exception;
17
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception;
18
public void close() throws Exception;
19
public void snapshotState(FunctionSnapshotContext context) throws Exception;
20
public void initializeState(FunctionInitializationContext context) throws Exception;
21
public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator);
22
}
23
```
24
25
### Basic Usage
26
27
```java
28
import org.apache.flink.addons.hbase.HBaseUpsertSinkFunction;
29
import org.apache.flink.addons.hbase.HBaseTableSchema;
30
import org.apache.flink.streaming.api.datastream.DataStream;
31
import org.apache.flink.api.java.tuple.Tuple2;
32
import org.apache.flink.types.Row;
33
import org.apache.hadoop.conf.Configuration;
34
35
// Configure HBase connection
36
Configuration conf = new Configuration();
37
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
38
conf.set("hbase.zookeeper.property.clientPort", "2181");
39
40
// Define table schema
41
HBaseTableSchema schema = new HBaseTableSchema();
42
schema.setRowKey("user_id", String.class);
43
schema.addColumn("profile", "name", String.class);
44
schema.addColumn("profile", "age", Integer.class);
45
schema.addColumn("activity", "last_login", java.sql.Timestamp.class);
46
47
// Create sink function with buffering configuration
48
HBaseUpsertSinkFunction sinkFunction = new HBaseUpsertSinkFunction(
49
"user_table", // table name
50
schema, // table schema
51
conf, // HBase configuration
52
2 * 1024 * 1024, // buffer flush max size (2MB)
53
1000, // buffer flush max mutations
54
5000 // buffer flush interval (5 seconds)
55
);
56
57
// Apply to DataStream
58
DataStream<Tuple2<Boolean, Row>> upsertStream = // your stream of upserts
59
upsertStream.addSink(sinkFunction);
60
```
61
62
### Upsert vs Delete Operations
63
64
The sink function handles both insert/update and delete operations based on the Boolean flag in the Tuple2:
65
66
```java
67
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
68
69
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
70
71
// Create sample data with upsert/delete flags
72
DataStream<Tuple2<Boolean, Row>> operations = env.fromElements(
73
// Insert/Update operations (true)
74
Tuple2.of(true, Row.of("user001", "John Doe", 25, new Timestamp(System.currentTimeMillis()))),
75
Tuple2.of(true, Row.of("user002", "Jane Smith", 30, new Timestamp(System.currentTimeMillis()))),
76
77
// Delete operation (false)
78
Tuple2.of(false, Row.of("user003", null, null, null)) // Only row key needed for delete
79
);
80
81
operations.addSink(sinkFunction);
82
env.execute("HBase Upsert Job");
83
```
84
85
## Buffering Configuration
86
87
The sink function uses HBase's BufferedMutator for optimal write performance through batching:
88
89
### Buffer Size Configuration
90
91
```java
92
// High-throughput configuration (larger buffers)
93
HBaseUpsertSinkFunction highThroughputSink = new HBaseUpsertSinkFunction(
94
"events_table",
95
schema,
96
conf,
97
10 * 1024 * 1024, // 10MB buffer size
98
5000, // 5000 mutations per batch
99
10000 // 10 second flush interval
100
);
101
102
// Low-latency configuration (smaller buffers)
103
HBaseUpsertSinkFunction lowLatencySink = new HBaseUpsertSinkFunction(
104
"realtime_table",
105
schema,
106
conf,
107
512 * 1024, // 512KB buffer size
108
100, // 100 mutations per batch
109
1000 // 1 second flush interval
110
);
111
```
112
113
### Buffer Flush Triggers
114
115
The buffer is flushed when any of these conditions are met:
116
117
1. **Size threshold**: Buffer reaches `bufferFlushMaxSizeInBytes`
118
2. **Mutation count**: Buffer reaches `bufferFlushMaxMutations` operations
119
3. **Time interval**: `bufferFlushIntervalMillis` elapsed since last flush
120
4. **Checkpoint**: Flink checkpoint triggers immediate flush
121
5. **Close**: Sink function close triggers final flush
122
123
## Fault Tolerance and Checkpointing
124
125
The sink function integrates with Flink's checkpointing for exactly-once processing guarantees:
126
127
```java
128
// Enable checkpointing in your Flink job
129
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
130
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
131
132
// The sink function automatically participates in checkpointing
133
// No additional configuration needed
134
DataStream<Tuple2<Boolean, Row>> stream = // your data stream
135
stream.addSink(sinkFunction);
136
```
137
138
### State Management
139
140
```java { .api }
141
// Checkpointing methods (automatically called by Flink)
142
public void snapshotState(FunctionSnapshotContext context) throws Exception;
143
public void initializeState(FunctionInitializationContext context) throws Exception;
144
```
145
146
The sink function maintains internal state for:
147
- Buffered mutations waiting to be written
148
- Buffer flush timing
149
- Error recovery information
150
151
## Error Handling
152
153
The sink function implements `BufferedMutator.ExceptionListener` for handling write failures:
154
155
```java { .api }
156
public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator);
157
```
158
159
### Exception Handling Example
160
161
```java
162
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
163
164
public class CustomHBaseSink extends RichSinkFunction<Tuple2<Boolean, Row>> {
165
private HBaseUpsertSinkFunction hbaseSink;
166
167
@Override
168
public void open(Configuration parameters) throws Exception {
169
super.open(parameters);
170
171
// Create HBase sink with error handling
172
hbaseSink = new HBaseUpsertSinkFunction(tableName, schema, conf,
173
bufferSize, maxMutations, flushInterval) {
174
@Override
175
public void onException(RetriesExhaustedWithDetailsException exception,
176
BufferedMutator mutator) {
177
// Custom error handling logic
178
for (Throwable cause : exception.getCauses()) {
179
if (cause instanceof IOException) {
180
// Handle I/O errors
181
LOG.error("HBase write I/O error", cause);
182
} else {
183
// Handle other errors
184
LOG.error("HBase write error", cause);
185
}
186
}
187
// Optionally rethrow to fail the job
188
throw new RuntimeException("HBase write failed", exception);
189
}
190
};
191
}
192
193
@Override
194
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
195
hbaseSink.invoke(value, context);
196
}
197
198
@Override
199
public void close() throws Exception {
200
if (hbaseSink != null) {
201
hbaseSink.close();
202
}
203
super.close();
204
}
205
}
206
```
207
208
## Advanced Configuration
209
210
### Custom HBase Configuration
211
212
```java
213
import org.apache.hadoop.conf.Configuration;
214
215
Configuration conf = new Configuration();
216
217
// Connection settings
218
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
219
conf.set("hbase.zookeeper.property.clientPort", "2181");
220
conf.set("zookeeper.znode.parent", "/hbase");
221
222
// Performance tuning
223
conf.setInt("hbase.client.write.buffer", 4 * 1024 * 1024); // 4MB write buffer
224
conf.setInt("hbase.client.max.total.tasks", 200); // Max concurrent tasks
225
conf.setInt("hbase.client.max.perserver.tasks", 20); // Max tasks per server
226
conf.setLong("hbase.client.pause", 100); // Retry pause time
227
conf.setInt("hbase.client.retries.number", 10); // Max retries
228
229
// Timeout settings
230
conf.setLong("hbase.rpc.timeout", 60000); // RPC timeout (60s)
231
conf.setLong("hbase.client.operation.timeout", 120000); // Operation timeout (120s)
232
```
233
234
### Schema Configuration with Character Encoding
235
236
```java
237
HBaseTableSchema schema = new HBaseTableSchema();
238
schema.setCharset("UTF-8"); // Set encoding for string values
239
240
// Add columns with specific types
241
schema.setRowKey("id", String.class);
242
schema.addColumn("cf1", "name", String.class);
243
schema.addColumn("cf1", "age", Integer.class);
244
schema.addColumn("cf2", "score", Double.class);
245
schema.addColumn("cf2", "active", Boolean.class);
246
schema.addColumn("cf3", "data", byte[].class); // Binary data
247
```
248
249
## Performance Optimization
250
251
### Write Throughput Optimization
252
253
```java
254
// For maximum throughput
255
HBaseUpsertSinkFunction throughputOptimized = new HBaseUpsertSinkFunction(
256
tableName,
257
schema,
258
conf,
259
16 * 1024 * 1024, // Large buffer (16MB)
260
10000, // High mutation count
261
30000 // Longer flush interval (30s)
262
);
263
264
// Tune HBase configuration for writes
265
conf.setBoolean("hbase.client.autoflush.on", false);
266
conf.setLong("hbase.hregion.memstore.flush.size", 128 * 1024 * 1024); // 128MB
267
conf.setInt("hbase.regionserver.handler.count", 30); // More handlers
268
```
269
270
### Memory Usage Optimization
271
272
```java
273
// For memory-constrained environments
274
HBaseUpsertSinkFunction memoryOptimized = new HBaseUpsertSinkFunction(
275
tableName,
276
schema,
277
conf,
278
256 * 1024, // Small buffer (256KB)
279
50, // Low mutation count
280
2000 // Short flush interval (2s)
281
);
282
```
283
284
## Monitoring and Metrics
285
286
Access built-in metrics for monitoring sink performance:
287
288
```java
289
import org.apache.flink.metrics.Counter;
290
import org.apache.flink.metrics.Histogram;
291
292
public class MonitoredHBaseSink extends RichSinkFunction<Tuple2<Boolean, Row>> {
293
private transient Counter recordsWritten;
294
private transient Counter writeErrors;
295
private transient Histogram writeLatency;
296
297
@Override
298
public void open(Configuration parameters) throws Exception {
299
super.open(parameters);
300
301
// Register metrics
302
recordsWritten = getRuntimeContext()
303
.getMetricGroup()
304
.counter("records_written");
305
306
writeErrors = getRuntimeContext()
307
.getMetricGroup()
308
.counter("write_errors");
309
310
writeLatency = getRuntimeContext()
311
.getMetricGroup()
312
.histogram("write_latency");
313
}
314
315
@Override
316
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
317
long startTime = System.currentTimeMillis();
318
319
try {
320
hbaseSink.invoke(value, context);
321
recordsWritten.inc();
322
writeLatency.update(System.currentTimeMillis() - startTime);
323
} catch (Exception e) {
324
writeErrors.inc();
325
throw e;
326
}
327
}
328
}
329
```
330
331
## Common Patterns
332
333
### Conditional Writes
334
335
```java
336
// Filter stream before writing to HBase
337
DataStream<Tuple2<Boolean, Row>> filteredStream = sourceStream
338
.filter(tuple -> {
339
Row row = tuple.f1;
340
// Only write records where age > 0
341
return row.getField(2) != null && ((Integer) row.getField(2)) > 0;
342
});
343
344
filteredStream.addSink(sinkFunction);
345
```
346
347
### Data Transformation Before Write
348
349
```java
350
// Transform data before writing
351
DataStream<Tuple2<Boolean, Row>> transformedStream = sourceStream
352
.map(tuple -> {
353
Row row = tuple.f1;
354
// Add timestamp column
355
Row newRow = Row.of(
356
row.getField(0), // user_id
357
row.getField(1), // name
358
row.getField(2), // age
359
new Timestamp(System.currentTimeMillis()) // current timestamp
360
);
361
return Tuple2.of(tuple.f0, newRow);
362
});
363
364
transformedStream.addSink(sinkFunction);
365
```