The HBase connector provides sink functions for writing data from Flink DataStreams to HBase tables. These functions support upsert operations, configurable buffering, and automatic batching for optimal write performance.
The primary sink function for writing Flink DataStream data to HBase with upsert semantics and buffering support.
class HBaseUpsertSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>>
implements CheckpointedFunction, BufferedMutator.ExceptionListener {
public HBaseUpsertSinkFunction(String hTableName, HBaseTableSchema schema,
Configuration conf, long bufferFlushMaxSizeInBytes,
long bufferFlushMaxMutations, long bufferFlushIntervalMillis);
public void open(Configuration parameters) throws Exception;
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception;
public void close() throws Exception;
public void snapshotState(FunctionSnapshotContext context) throws Exception;
public void initializeState(FunctionInitializationContext context) throws Exception;
public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator);
}import org.apache.flink.addons.hbase.HBaseUpsertSinkFunction;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
// Configure HBase connection
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");
// Define table schema
HBaseTableSchema schema = new HBaseTableSchema();
schema.setRowKey("user_id", String.class);
schema.addColumn("profile", "name", String.class);
schema.addColumn("profile", "age", Integer.class);
schema.addColumn("activity", "last_login", java.sql.Timestamp.class);
// Create sink function with buffering configuration
HBaseUpsertSinkFunction sinkFunction = new HBaseUpsertSinkFunction(
"user_table", // table name
schema, // table schema
conf, // HBase configuration
2 * 1024 * 1024, // buffer flush max size (2MB)
1000, // buffer flush max mutations
5000 // buffer flush interval (5 seconds)
);
// Apply to DataStream
DataStream<Tuple2<Boolean, Row>> upsertStream = // your stream of upserts
upsertStream.addSink(sinkFunction);The sink function handles both insert/update and delete operations based on the Boolean flag in the Tuple2:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create sample data with upsert/delete flags
DataStream<Tuple2<Boolean, Row>> operations = env.fromElements(
// Insert/Update operations (true)
Tuple2.of(true, Row.of("user001", "John Doe", 25, new Timestamp(System.currentTimeMillis()))),
Tuple2.of(true, Row.of("user002", "Jane Smith", 30, new Timestamp(System.currentTimeMillis()))),
// Delete operation (false)
Tuple2.of(false, Row.of("user003", null, null, null)) // Only row key needed for delete
);
operations.addSink(sinkFunction);
env.execute("HBase Upsert Job");The sink function uses HBase's BufferedMutator for optimal write performance through batching:
// High-throughput configuration (larger buffers)
HBaseUpsertSinkFunction highThroughputSink = new HBaseUpsertSinkFunction(
"events_table",
schema,
conf,
10 * 1024 * 1024, // 10MB buffer size
5000, // 5000 mutations per batch
10000 // 10 second flush interval
);
// Low-latency configuration (smaller buffers)
HBaseUpsertSinkFunction lowLatencySink = new HBaseUpsertSinkFunction(
"realtime_table",
schema,
conf,
512 * 1024, // 512KB buffer size
100, // 100 mutations per batch
1000 // 1 second flush interval
);The buffer is flushed when any of these conditions are met:
bufferFlushMaxSizeInBytesbufferFlushMaxMutations operationsbufferFlushIntervalMillis elapsed since last flushThe sink function integrates with Flink's checkpointing for exactly-once processing guarantees:
// Enable checkpointing in your Flink job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
// The sink function automatically participates in checkpointing
// No additional configuration needed
DataStream<Tuple2<Boolean, Row>> stream = // your data stream
stream.addSink(sinkFunction);// Checkpointing methods (automatically called by Flink)
public void snapshotState(FunctionSnapshotContext context) throws Exception;
public void initializeState(FunctionInitializationContext context) throws Exception;The sink function maintains internal state for:
The sink function implements BufferedMutator.ExceptionListener for handling write failures:
public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator);import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class CustomHBaseSink extends RichSinkFunction<Tuple2<Boolean, Row>> {
private HBaseUpsertSinkFunction hbaseSink;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Create HBase sink with error handling
hbaseSink = new HBaseUpsertSinkFunction(tableName, schema, conf,
bufferSize, maxMutations, flushInterval) {
@Override
public void onException(RetriesExhaustedWithDetailsException exception,
BufferedMutator mutator) {
// Custom error handling logic
for (Throwable cause : exception.getCauses()) {
if (cause instanceof IOException) {
// Handle I/O errors
LOG.error("HBase write I/O error", cause);
} else {
// Handle other errors
LOG.error("HBase write error", cause);
}
}
// Optionally rethrow to fail the job
throw new RuntimeException("HBase write failed", exception);
}
};
}
@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
hbaseSink.invoke(value, context);
}
@Override
public void close() throws Exception {
if (hbaseSink != null) {
hbaseSink.close();
}
super.close();
}
}import org.apache.hadoop.conf.Configuration;
Configuration conf = new Configuration();
// Connection settings
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
// Performance tuning
conf.setInt("hbase.client.write.buffer", 4 * 1024 * 1024); // 4MB write buffer
conf.setInt("hbase.client.max.total.tasks", 200); // Max concurrent tasks
conf.setInt("hbase.client.max.perserver.tasks", 20); // Max tasks per server
conf.setLong("hbase.client.pause", 100); // Retry pause time
conf.setInt("hbase.client.retries.number", 10); // Max retries
// Timeout settings
conf.setLong("hbase.rpc.timeout", 60000); // RPC timeout (60s)
conf.setLong("hbase.client.operation.timeout", 120000); // Operation timeout (120s)HBaseTableSchema schema = new HBaseTableSchema();
schema.setCharset("UTF-8"); // Set encoding for string values
// Add columns with specific types
schema.setRowKey("id", String.class);
schema.addColumn("cf1", "name", String.class);
schema.addColumn("cf1", "age", Integer.class);
schema.addColumn("cf2", "score", Double.class);
schema.addColumn("cf2", "active", Boolean.class);
schema.addColumn("cf3", "data", byte[].class); // Binary data// For maximum throughput
HBaseUpsertSinkFunction throughputOptimized = new HBaseUpsertSinkFunction(
tableName,
schema,
conf,
16 * 1024 * 1024, // Large buffer (16MB)
10000, // High mutation count
30000 // Longer flush interval (30s)
);
// Tune HBase configuration for writes
conf.setBoolean("hbase.client.autoflush.on", false);
conf.setLong("hbase.hregion.memstore.flush.size", 128 * 1024 * 1024); // 128MB
conf.setInt("hbase.regionserver.handler.count", 30); // More handlers// For memory-constrained environments
HBaseUpsertSinkFunction memoryOptimized = new HBaseUpsertSinkFunction(
tableName,
schema,
conf,
256 * 1024, // Small buffer (256KB)
50, // Low mutation count
2000 // Short flush interval (2s)
);Access built-in metrics for monitoring sink performance:
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
public class MonitoredHBaseSink extends RichSinkFunction<Tuple2<Boolean, Row>> {
private transient Counter recordsWritten;
private transient Counter writeErrors;
private transient Histogram writeLatency;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Register metrics
recordsWritten = getRuntimeContext()
.getMetricGroup()
.counter("records_written");
writeErrors = getRuntimeContext()
.getMetricGroup()
.counter("write_errors");
writeLatency = getRuntimeContext()
.getMetricGroup()
.histogram("write_latency");
}
@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
long startTime = System.currentTimeMillis();
try {
hbaseSink.invoke(value, context);
recordsWritten.inc();
writeLatency.update(System.currentTimeMillis() - startTime);
} catch (Exception e) {
writeErrors.inc();
throw e;
}
}
}// Filter stream before writing to HBase
DataStream<Tuple2<Boolean, Row>> filteredStream = sourceStream
.filter(tuple -> {
Row row = tuple.f1;
// Only write records where age > 0
return row.getField(2) != null && ((Integer) row.getField(2)) > 0;
});
filteredStream.addSink(sinkFunction);// Transform data before writing
DataStream<Tuple2<Boolean, Row>> transformedStream = sourceStream
.map(tuple -> {
Row row = tuple.f1;
// Add timestamp column
Row newRow = Row.of(
row.getField(0), // user_id
row.getField(1), // name
row.getField(2), // age
new Timestamp(System.currentTimeMillis()) // current timestamp
);
return Tuple2.of(tuple.f0, newRow);
});
transformedStream.addSink(sinkFunction);