or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md
tile.json

sink-functions.mddocs/

DataStream Sink Functions

The HBase connector provides sink functions for writing data from Flink DataStreams to HBase tables. These functions support upsert operations, configurable buffering, and automatic batching for optimal write performance.

HBaseUpsertSinkFunction

The primary sink function for writing Flink DataStream data to HBase with upsert semantics and buffering support.

class HBaseUpsertSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>>
    implements CheckpointedFunction, BufferedMutator.ExceptionListener {
    
    public HBaseUpsertSinkFunction(String hTableName, HBaseTableSchema schema, 
        Configuration conf, long bufferFlushMaxSizeInBytes, 
        long bufferFlushMaxMutations, long bufferFlushIntervalMillis);
    
    public void open(Configuration parameters) throws Exception;
    public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception;
    public void close() throws Exception;
    public void snapshotState(FunctionSnapshotContext context) throws Exception;
    public void initializeState(FunctionInitializationContext context) throws Exception;
    public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator);
}

Basic Usage

import org.apache.flink.addons.hbase.HBaseUpsertSinkFunction;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;

// Configure HBase connection
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");

// Define table schema
HBaseTableSchema schema = new HBaseTableSchema();
schema.setRowKey("user_id", String.class);
schema.addColumn("profile", "name", String.class);
schema.addColumn("profile", "age", Integer.class);
schema.addColumn("activity", "last_login", java.sql.Timestamp.class);

// Create sink function with buffering configuration
HBaseUpsertSinkFunction sinkFunction = new HBaseUpsertSinkFunction(
    "user_table",           // table name
    schema,                 // table schema
    conf,                   // HBase configuration
    2 * 1024 * 1024,        // buffer flush max size (2MB)
    1000,                   // buffer flush max mutations
    5000                    // buffer flush interval (5 seconds)
);

// Apply to DataStream
DataStream<Tuple2<Boolean, Row>> upsertStream = // your stream of upserts
upsertStream.addSink(sinkFunction);

Upsert vs Delete Operations

The sink function handles both insert/update and delete operations based on the Boolean flag in the Tuple2:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create sample data with upsert/delete flags
DataStream<Tuple2<Boolean, Row>> operations = env.fromElements(
    // Insert/Update operations (true)
    Tuple2.of(true, Row.of("user001", "John Doe", 25, new Timestamp(System.currentTimeMillis()))),
    Tuple2.of(true, Row.of("user002", "Jane Smith", 30, new Timestamp(System.currentTimeMillis()))),
    
    // Delete operation (false)
    Tuple2.of(false, Row.of("user003", null, null, null)) // Only row key needed for delete
);

operations.addSink(sinkFunction);
env.execute("HBase Upsert Job");

Buffering Configuration

The sink function uses HBase's BufferedMutator for optimal write performance through batching:

Buffer Size Configuration

// High-throughput configuration (larger buffers)
HBaseUpsertSinkFunction highThroughputSink = new HBaseUpsertSinkFunction(
    "events_table",
    schema,
    conf,
    10 * 1024 * 1024,    // 10MB buffer size
    5000,                // 5000 mutations per batch
    10000                // 10 second flush interval
);

// Low-latency configuration (smaller buffers)
HBaseUpsertSinkFunction lowLatencySink = new HBaseUpsertSinkFunction(
    "realtime_table", 
    schema,
    conf,
    512 * 1024,          // 512KB buffer size
    100,                 // 100 mutations per batch
    1000                 // 1 second flush interval
);

Buffer Flush Triggers

The buffer is flushed when any of these conditions are met:

  1. Size threshold: Buffer reaches bufferFlushMaxSizeInBytes
  2. Mutation count: Buffer reaches bufferFlushMaxMutations operations
  3. Time interval: bufferFlushIntervalMillis elapsed since last flush
  4. Checkpoint: Flink checkpoint triggers immediate flush
  5. Close: Sink function close triggers final flush

Fault Tolerance and Checkpointing

The sink function integrates with Flink's checkpointing for exactly-once processing guarantees:

// Enable checkpointing in your Flink job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Checkpoint every 5 seconds

// The sink function automatically participates in checkpointing
// No additional configuration needed
DataStream<Tuple2<Boolean, Row>> stream = // your data stream
stream.addSink(sinkFunction);

State Management

// Checkpointing methods (automatically called by Flink)
public void snapshotState(FunctionSnapshotContext context) throws Exception;
public void initializeState(FunctionInitializationContext context) throws Exception;

The sink function maintains internal state for:

  • Buffered mutations waiting to be written
  • Buffer flush timing
  • Error recovery information

Error Handling

The sink function implements BufferedMutator.ExceptionListener for handling write failures:

public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator);

Exception Handling Example

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class CustomHBaseSink extends RichSinkFunction<Tuple2<Boolean, Row>> {
    private HBaseUpsertSinkFunction hbaseSink;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // Create HBase sink with error handling
        hbaseSink = new HBaseUpsertSinkFunction(tableName, schema, conf, 
            bufferSize, maxMutations, flushInterval) {
            @Override
            public void onException(RetriesExhaustedWithDetailsException exception, 
                                  BufferedMutator mutator) {
                // Custom error handling logic
                for (Throwable cause : exception.getCauses()) {
                    if (cause instanceof IOException) {
                        // Handle I/O errors
                        LOG.error("HBase write I/O error", cause);
                    } else {
                        // Handle other errors
                        LOG.error("HBase write error", cause);
                    }
                }
                // Optionally rethrow to fail the job
                throw new RuntimeException("HBase write failed", exception);
            }
        };
    }
    
    @Override
    public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
        hbaseSink.invoke(value, context);
    }
    
    @Override
    public void close() throws Exception {
        if (hbaseSink != null) {
            hbaseSink.close();
        }
        super.close();
    }
}

Advanced Configuration

Custom HBase Configuration

import org.apache.hadoop.conf.Configuration;

Configuration conf = new Configuration();

// Connection settings
conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");

// Performance tuning
conf.setInt("hbase.client.write.buffer", 4 * 1024 * 1024); // 4MB write buffer
conf.setInt("hbase.client.max.total.tasks", 200);          // Max concurrent tasks
conf.setInt("hbase.client.max.perserver.tasks", 20);       // Max tasks per server
conf.setLong("hbase.client.pause", 100);                   // Retry pause time
conf.setInt("hbase.client.retries.number", 10);            // Max retries

// Timeout settings  
conf.setLong("hbase.rpc.timeout", 60000);                  // RPC timeout (60s)
conf.setLong("hbase.client.operation.timeout", 120000);    // Operation timeout (120s)

Schema Configuration with Character Encoding

HBaseTableSchema schema = new HBaseTableSchema();
schema.setCharset("UTF-8"); // Set encoding for string values

// Add columns with specific types
schema.setRowKey("id", String.class);
schema.addColumn("cf1", "name", String.class);
schema.addColumn("cf1", "age", Integer.class);
schema.addColumn("cf2", "score", Double.class);
schema.addColumn("cf2", "active", Boolean.class);
schema.addColumn("cf3", "data", byte[].class); // Binary data

Performance Optimization

Write Throughput Optimization

// For maximum throughput
HBaseUpsertSinkFunction throughputOptimized = new HBaseUpsertSinkFunction(
    tableName,
    schema, 
    conf,
    16 * 1024 * 1024,     // Large buffer (16MB)
    10000,                // High mutation count
    30000                 // Longer flush interval (30s)
);

// Tune HBase configuration for writes
conf.setBoolean("hbase.client.autoflush.on", false);
conf.setLong("hbase.hregion.memstore.flush.size", 128 * 1024 * 1024); // 128MB
conf.setInt("hbase.regionserver.handler.count", 30); // More handlers

Memory Usage Optimization

// For memory-constrained environments
HBaseUpsertSinkFunction memoryOptimized = new HBaseUpsertSinkFunction(
    tableName,
    schema,
    conf, 
    256 * 1024,           // Small buffer (256KB)
    50,                   // Low mutation count
    2000                  // Short flush interval (2s)
);

Monitoring and Metrics

Access built-in metrics for monitoring sink performance:

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;

public class MonitoredHBaseSink extends RichSinkFunction<Tuple2<Boolean, Row>> {
    private transient Counter recordsWritten;
    private transient Counter writeErrors;
    private transient Histogram writeLatency;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // Register metrics
        recordsWritten = getRuntimeContext()
            .getMetricGroup()
            .counter("records_written");
            
        writeErrors = getRuntimeContext()
            .getMetricGroup()
            .counter("write_errors");
            
        writeLatency = getRuntimeContext()
            .getMetricGroup()
            .histogram("write_latency");
    }
    
    @Override
    public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
        long startTime = System.currentTimeMillis();
        
        try {
            hbaseSink.invoke(value, context);
            recordsWritten.inc();
            writeLatency.update(System.currentTimeMillis() - startTime);
        } catch (Exception e) {
            writeErrors.inc();
            throw e;
        }
    }
}

Common Patterns

Conditional Writes

// Filter stream before writing to HBase
DataStream<Tuple2<Boolean, Row>> filteredStream = sourceStream
    .filter(tuple -> {
        Row row = tuple.f1;
        // Only write records where age > 0
        return row.getField(2) != null && ((Integer) row.getField(2)) > 0;
    });

filteredStream.addSink(sinkFunction);

Data Transformation Before Write

// Transform data before writing
DataStream<Tuple2<Boolean, Row>> transformedStream = sourceStream
    .map(tuple -> {
        Row row = tuple.f1;
        // Add timestamp column
        Row newRow = Row.of(
            row.getField(0), // user_id
            row.getField(1), // name  
            row.getField(2), // age
            new Timestamp(System.currentTimeMillis()) // current timestamp
        );
        return Tuple2.of(tuple.f0, newRow);
    });

transformedStream.addSink(sinkFunction);