or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md

sink-functions.mddocs/

0

# DataStream Sink Functions

1

2

The HBase connector provides sink functions for writing data from Flink DataStreams to HBase tables. These functions support upsert operations, configurable buffering, and automatic batching for optimal write performance.

3

4

## HBaseUpsertSinkFunction

5

6

The primary sink function for writing Flink DataStream data to HBase with upsert semantics and buffering support.

7

8

```java { .api }

9

class HBaseUpsertSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>>

10

implements CheckpointedFunction, BufferedMutator.ExceptionListener {

11

12

public HBaseUpsertSinkFunction(String hTableName, HBaseTableSchema schema,

13

Configuration conf, long bufferFlushMaxSizeInBytes,

14

long bufferFlushMaxMutations, long bufferFlushIntervalMillis);

15

16

public void open(Configuration parameters) throws Exception;

17

public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception;

18

public void close() throws Exception;

19

public void snapshotState(FunctionSnapshotContext context) throws Exception;

20

public void initializeState(FunctionInitializationContext context) throws Exception;

21

public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator);

22

}

23

```

24

25

### Basic Usage

26

27

```java

28

import org.apache.flink.addons.hbase.HBaseUpsertSinkFunction;

29

import org.apache.flink.addons.hbase.HBaseTableSchema;

30

import org.apache.flink.streaming.api.datastream.DataStream;

31

import org.apache.flink.api.java.tuple.Tuple2;

32

import org.apache.flink.types.Row;

33

import org.apache.hadoop.conf.Configuration;

34

35

// Configure HBase connection

36

Configuration conf = new Configuration();

37

conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");

38

conf.set("hbase.zookeeper.property.clientPort", "2181");

39

40

// Define table schema

41

HBaseTableSchema schema = new HBaseTableSchema();

42

schema.setRowKey("user_id", String.class);

43

schema.addColumn("profile", "name", String.class);

44

schema.addColumn("profile", "age", Integer.class);

45

schema.addColumn("activity", "last_login", java.sql.Timestamp.class);

46

47

// Create sink function with buffering configuration

48

HBaseUpsertSinkFunction sinkFunction = new HBaseUpsertSinkFunction(

49

"user_table", // table name

50

schema, // table schema

51

conf, // HBase configuration

52

2 * 1024 * 1024, // buffer flush max size (2MB)

53

1000, // buffer flush max mutations

54

5000 // buffer flush interval (5 seconds)

55

);

56

57

// Apply to DataStream

58

DataStream<Tuple2<Boolean, Row>> upsertStream = // your stream of upserts

59

upsertStream.addSink(sinkFunction);

60

```

61

62

### Upsert vs Delete Operations

63

64

The sink function handles both insert/update and delete operations based on the Boolean flag in the Tuple2:

65

66

```java

67

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

68

69

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

70

71

// Create sample data with upsert/delete flags

72

DataStream<Tuple2<Boolean, Row>> operations = env.fromElements(

73

// Insert/Update operations (true)

74

Tuple2.of(true, Row.of("user001", "John Doe", 25, new Timestamp(System.currentTimeMillis()))),

75

Tuple2.of(true, Row.of("user002", "Jane Smith", 30, new Timestamp(System.currentTimeMillis()))),

76

77

// Delete operation (false)

78

Tuple2.of(false, Row.of("user003", null, null, null)) // Only row key needed for delete

79

);

80

81

operations.addSink(sinkFunction);

82

env.execute("HBase Upsert Job");

83

```

84

85

## Buffering Configuration

86

87

The sink function uses HBase's BufferedMutator for optimal write performance through batching:

88

89

### Buffer Size Configuration

90

91

```java

92

// High-throughput configuration (larger buffers)

93

HBaseUpsertSinkFunction highThroughputSink = new HBaseUpsertSinkFunction(

94

"events_table",

95

schema,

96

conf,

97

10 * 1024 * 1024, // 10MB buffer size

98

5000, // 5000 mutations per batch

99

10000 // 10 second flush interval

100

);

101

102

// Low-latency configuration (smaller buffers)

103

HBaseUpsertSinkFunction lowLatencySink = new HBaseUpsertSinkFunction(

104

"realtime_table",

105

schema,

106

conf,

107

512 * 1024, // 512KB buffer size

108

100, // 100 mutations per batch

109

1000 // 1 second flush interval

110

);

111

```

112

113

### Buffer Flush Triggers

114

115

The buffer is flushed when any of these conditions are met:

116

117

1. **Size threshold**: Buffer reaches `bufferFlushMaxSizeInBytes`

118

2. **Mutation count**: Buffer reaches `bufferFlushMaxMutations` operations

119

3. **Time interval**: `bufferFlushIntervalMillis` elapsed since last flush

120

4. **Checkpoint**: Flink checkpoint triggers immediate flush

121

5. **Close**: Sink function close triggers final flush

122

123

## Fault Tolerance and Checkpointing

124

125

The sink function integrates with Flink's checkpointing for exactly-once processing guarantees:

126

127

```java

128

// Enable checkpointing in your Flink job

129

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

130

env.enableCheckpointing(5000); // Checkpoint every 5 seconds

131

132

// The sink function automatically participates in checkpointing

133

// No additional configuration needed

134

DataStream<Tuple2<Boolean, Row>> stream = // your data stream

135

stream.addSink(sinkFunction);

136

```

137

138

### State Management

139

140

```java { .api }

141

// Checkpointing methods (automatically called by Flink)

142

public void snapshotState(FunctionSnapshotContext context) throws Exception;

143

public void initializeState(FunctionInitializationContext context) throws Exception;

144

```

145

146

The sink function maintains internal state for:

147

- Buffered mutations waiting to be written

148

- Buffer flush timing

149

- Error recovery information

150

151

## Error Handling

152

153

The sink function implements `BufferedMutator.ExceptionListener` for handling write failures:

154

155

```java { .api }

156

public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator);

157

```

158

159

### Exception Handling Example

160

161

```java

162

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

163

164

public class CustomHBaseSink extends RichSinkFunction<Tuple2<Boolean, Row>> {

165

private HBaseUpsertSinkFunction hbaseSink;

166

167

@Override

168

public void open(Configuration parameters) throws Exception {

169

super.open(parameters);

170

171

// Create HBase sink with error handling

172

hbaseSink = new HBaseUpsertSinkFunction(tableName, schema, conf,

173

bufferSize, maxMutations, flushInterval) {

174

@Override

175

public void onException(RetriesExhaustedWithDetailsException exception,

176

BufferedMutator mutator) {

177

// Custom error handling logic

178

for (Throwable cause : exception.getCauses()) {

179

if (cause instanceof IOException) {

180

// Handle I/O errors

181

LOG.error("HBase write I/O error", cause);

182

} else {

183

// Handle other errors

184

LOG.error("HBase write error", cause);

185

}

186

}

187

// Optionally rethrow to fail the job

188

throw new RuntimeException("HBase write failed", exception);

189

}

190

};

191

}

192

193

@Override

194

public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {

195

hbaseSink.invoke(value, context);

196

}

197

198

@Override

199

public void close() throws Exception {

200

if (hbaseSink != null) {

201

hbaseSink.close();

202

}

203

super.close();

204

}

205

}

206

```

207

208

## Advanced Configuration

209

210

### Custom HBase Configuration

211

212

```java

213

import org.apache.hadoop.conf.Configuration;

214

215

Configuration conf = new Configuration();

216

217

// Connection settings

218

conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");

219

conf.set("hbase.zookeeper.property.clientPort", "2181");

220

conf.set("zookeeper.znode.parent", "/hbase");

221

222

// Performance tuning

223

conf.setInt("hbase.client.write.buffer", 4 * 1024 * 1024); // 4MB write buffer

224

conf.setInt("hbase.client.max.total.tasks", 200); // Max concurrent tasks

225

conf.setInt("hbase.client.max.perserver.tasks", 20); // Max tasks per server

226

conf.setLong("hbase.client.pause", 100); // Retry pause time

227

conf.setInt("hbase.client.retries.number", 10); // Max retries

228

229

// Timeout settings

230

conf.setLong("hbase.rpc.timeout", 60000); // RPC timeout (60s)

231

conf.setLong("hbase.client.operation.timeout", 120000); // Operation timeout (120s)

232

```

233

234

### Schema Configuration with Character Encoding

235

236

```java

237

HBaseTableSchema schema = new HBaseTableSchema();

238

schema.setCharset("UTF-8"); // Set encoding for string values

239

240

// Add columns with specific types

241

schema.setRowKey("id", String.class);

242

schema.addColumn("cf1", "name", String.class);

243

schema.addColumn("cf1", "age", Integer.class);

244

schema.addColumn("cf2", "score", Double.class);

245

schema.addColumn("cf2", "active", Boolean.class);

246

schema.addColumn("cf3", "data", byte[].class); // Binary data

247

```

248

249

## Performance Optimization

250

251

### Write Throughput Optimization

252

253

```java

254

// For maximum throughput

255

HBaseUpsertSinkFunction throughputOptimized = new HBaseUpsertSinkFunction(

256

tableName,

257

schema,

258

conf,

259

16 * 1024 * 1024, // Large buffer (16MB)

260

10000, // High mutation count

261

30000 // Longer flush interval (30s)

262

);

263

264

// Tune HBase configuration for writes

265

conf.setBoolean("hbase.client.autoflush.on", false);

266

conf.setLong("hbase.hregion.memstore.flush.size", 128 * 1024 * 1024); // 128MB

267

conf.setInt("hbase.regionserver.handler.count", 30); // More handlers

268

```

269

270

### Memory Usage Optimization

271

272

```java

273

// For memory-constrained environments

274

HBaseUpsertSinkFunction memoryOptimized = new HBaseUpsertSinkFunction(

275

tableName,

276

schema,

277

conf,

278

256 * 1024, // Small buffer (256KB)

279

50, // Low mutation count

280

2000 // Short flush interval (2s)

281

);

282

```

283

284

## Monitoring and Metrics

285

286

Access built-in metrics for monitoring sink performance:

287

288

```java

289

import org.apache.flink.metrics.Counter;

290

import org.apache.flink.metrics.Histogram;

291

292

public class MonitoredHBaseSink extends RichSinkFunction<Tuple2<Boolean, Row>> {

293

private transient Counter recordsWritten;

294

private transient Counter writeErrors;

295

private transient Histogram writeLatency;

296

297

@Override

298

public void open(Configuration parameters) throws Exception {

299

super.open(parameters);

300

301

// Register metrics

302

recordsWritten = getRuntimeContext()

303

.getMetricGroup()

304

.counter("records_written");

305

306

writeErrors = getRuntimeContext()

307

.getMetricGroup()

308

.counter("write_errors");

309

310

writeLatency = getRuntimeContext()

311

.getMetricGroup()

312

.histogram("write_latency");

313

}

314

315

@Override

316

public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {

317

long startTime = System.currentTimeMillis();

318

319

try {

320

hbaseSink.invoke(value, context);

321

recordsWritten.inc();

322

writeLatency.update(System.currentTimeMillis() - startTime);

323

} catch (Exception e) {

324

writeErrors.inc();

325

throw e;

326

}

327

}

328

}

329

```

330

331

## Common Patterns

332

333

### Conditional Writes

334

335

```java

336

// Filter stream before writing to HBase

337

DataStream<Tuple2<Boolean, Row>> filteredStream = sourceStream

338

.filter(tuple -> {

339

Row row = tuple.f1;

340

// Only write records where age > 0

341

return row.getField(2) != null && ((Integer) row.getField(2)) > 0;

342

});

343

344

filteredStream.addSink(sinkFunction);

345

```

346

347

### Data Transformation Before Write

348

349

```java

350

// Transform data before writing

351

DataStream<Tuple2<Boolean, Row>> transformedStream = sourceStream

352

.map(tuple -> {

353

Row row = tuple.f1;

354

// Add timestamp column

355

Row newRow = Row.of(

356

row.getField(0), // user_id

357

row.getField(1), // name

358

row.getField(2), // age

359

new Timestamp(System.currentTimeMillis()) // current timestamp

360

);

361

return Tuple2.of(tuple.f0, newRow);

362

});

363

364

transformedStream.addSink(sinkFunction);

365

```