or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdlookup-options.mdsink-operations.mdsource-operations.mdtable-factory.mdwrite-options.md

sink-operations.mddocs/

0

# Sink Operations

1

2

Writing data to HBase tables with configurable buffering, batching, and exactly-once processing guarantees in the Apache Flink HBase 1.4 Connector.

3

4

## Capabilities

5

6

### HBaseDynamicTableSink

7

8

Table sink implementation that enables writing data to HBase tables through Flink's Table API and SQL with configurable write options and change data capture support.

9

10

```java { .api }

11

/**

12

* HBase table sink implementation for writing data to HBase tables

13

* Supports UPSERT operations with configurable buffering and batching

14

*/

15

@Internal

16

public class HBaseDynamicTableSink implements DynamicTableSink {

17

18

/**

19

* Creates a new HBase dynamic table sink

20

* @param tableName Name of the HBase table to write to

21

* @param hbaseTableSchema Schema mapping for the HBase table

22

* @param hbaseConf Hadoop configuration for HBase connection

23

* @param writeOptions Configuration for buffering and write performance

24

* @param nullStringLiteral String representation for null values

25

*/

26

public HBaseDynamicTableSink(

27

String tableName,

28

HBaseTableSchema hbaseTableSchema,

29

Configuration hbaseConf,

30

HBaseWriteOptions writeOptions,

31

String nullStringLiteral

32

);

33

34

/**

35

* Returns the sink runtime provider with configured HBase sink function

36

* @param context Sink context for runtime configuration

37

* @return SinkFunctionProvider with HBaseSinkFunction and parallelism settings

38

*/

39

public SinkRuntimeProvider getSinkRuntimeProvider(Context context);

40

41

/**

42

* Returns the supported changelog mode for this sink

43

* @param requestedMode The changelog mode requested by the planner

44

* @return ChangelogMode supporting INSERT, UPDATE_AFTER, and DELETE operations

45

*/

46

public ChangelogMode getChangelogMode(ChangelogMode requestedMode);

47

48

/**

49

* Creates a copy of this table sink for parallel execution

50

* @return New HBaseDynamicTableSink instance with same configuration

51

*/

52

public DynamicTableSink copy();

53

54

/**

55

* Returns a string summary of this sink

56

* @return "HBase" identifier string

57

*/

58

public String asSummaryString();

59

60

// Testing methods

61

/**

62

* Returns the HBase table schema for testing purposes

63

* @return HBaseTableSchema instance with column family mappings

64

*/

65

@VisibleForTesting

66

public HBaseTableSchema getHBaseTableSchema();

67

68

/**

69

* Returns the write options configuration for testing purposes

70

* @return HBaseWriteOptions instance with buffering settings

71

*/

72

@VisibleForTesting

73

public HBaseWriteOptions getWriteOptions();

74

75

/**

76

* Returns the Hadoop configuration for testing purposes

77

* @return Configuration instance with HBase connection settings

78

*/

79

@VisibleForTesting

80

public Configuration getConfiguration();

81

82

/**

83

* Returns the table name for testing purposes

84

* @return String name of the target HBase table

85

*/

86

@VisibleForTesting

87

public String getTableName();

88

}

89

```

90

91

**Usage Example:**

92

93

```java

94

// Example: Writing streaming data to HBase

95

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

96

TableEnvironment tableEnv = StreamTableEnvironment.create(env);

97

98

// Create HBase sink table

99

tableEnv.executeSql(

100

"CREATE TABLE user_activity_sink (" +

101

" user_id STRING," +

102

" activity ROW<event_type STRING, timestamp TIMESTAMP(3), value DOUBLE>," +

103

" metadata ROW<source STRING, processed_time TIMESTAMP(3)>," +

104

" PRIMARY KEY (user_id) NOT ENFORCED" +

105

") WITH (" +

106

" 'connector' = 'hbase-1.4'," +

107

" 'table-name' = 'user_events'," +

108

" 'zookeeper.quorum' = 'localhost:2181'," +

109

" 'sink.buffer-flush.max-size' = '4mb'," +

110

" 'sink.buffer-flush.max-rows' = '2000'," +

111

" 'sink.buffer-flush.interval' = '2s'" +

112

")"

113

);

114

115

// Insert data into HBase

116

tableEnv.executeSql(

117

"INSERT INTO user_activity_sink " +

118

"SELECT user_id, activity, metadata FROM source_stream"

119

);

120

```

121

122

## Changelog Mode Support

123

124

### UPSERT Operations

125

126

The HBase sink supports UPSERT (INSERT/UPDATE/DELETE) operations through Flink's changelog mode, mapping to HBase's natural key-value storage model.

127

128

```java { .api }

129

/**

130

* Supported row change types:

131

* - INSERT: Creates new HBase row or overwrites existing

132

* - UPDATE_AFTER: Updates existing HBase row (same as INSERT in HBase)

133

* - DELETE: Removes HBase row

134

*

135

* UPDATE_BEFORE operations are filtered out as HBase doesn't need them

136

*/

137

public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {

138

ChangelogMode.Builder builder = ChangelogMode.newBuilder();

139

for (RowKind kind : requestedMode.getContainedKinds()) {

140

if (kind != RowKind.UPDATE_BEFORE) {

141

builder.addContainedKind(kind);

142

}

143

}

144

return builder.build();

145

}

146

```

147

148

**Changelog Example:**

149

150

```sql

151

-- Example: Processing CDC stream to HBase

152

CREATE TABLE orders_cdc (

153

order_id STRING,

154

customer_id STRING,

155

amount DECIMAL(10,2),

156

status STRING,

157

PRIMARY KEY (order_id) NOT ENFORCED

158

) WITH (

159

'connector' = 'kafka',

160

'topic' = 'orders-cdc',

161

'format' = 'debezium-json'

162

);

163

164

CREATE TABLE orders_hbase (

165

order_id STRING,

166

order_info ROW<customer_id STRING, amount DECIMAL(10,2), status STRING>,

167

PRIMARY KEY (order_id) NOT ENFORCED

168

) WITH (

169

'connector' = 'hbase-1.4',

170

'table-name' = 'orders',

171

'zookeeper.quorum' = 'localhost:2181'

172

);

173

174

-- Process CDC events: INSERT, UPDATE, DELETE automatically handled

175

INSERT INTO orders_hbase

176

SELECT

177

order_id,

178

ROW(customer_id, amount, status) as order_info

179

FROM orders_cdc;

180

```

181

182

## Write Performance Configuration

183

184

### Buffer Configuration

185

186

The sink provides multiple buffering strategies to optimize write throughput and latency trade-offs.

187

188

**Buffering Options:**

189

190

1. **Size-based flushing**: Flush when buffer reaches specified memory size

191

2. **Count-based flushing**: Flush when buffer reaches specified row count

192

3. **Time-based flushing**: Flush at regular intervals regardless of buffer size

193

4. **Combined strategies**: Use multiple triggers for optimal performance

194

195

```sql

196

-- Example: High-throughput configuration

197

CREATE TABLE high_volume_sink (

198

-- Table schema

199

) WITH (

200

'connector' = 'hbase-1.4',

201

'table-name' = 'events',

202

'zookeeper.quorum' = 'localhost:2181',

203

-- Large buffer for high throughput

204

'sink.buffer-flush.max-size' = '10mb',

205

'sink.buffer-flush.max-rows' = '5000',

206

'sink.buffer-flush.interval' = '5s',

207

-- High parallelism for write scaling

208

'sink.parallelism' = '8'

209

);

210

211

-- Example: Low-latency configuration

212

CREATE TABLE low_latency_sink (

213

-- Table schema

214

) WITH (

215

'connector' = 'hbase-1.4',

216

'table-name' = 'realtime_data',

217

'zookeeper.quorum' = 'localhost:2181',

218

-- Small buffer for low latency

219

'sink.buffer-flush.max-size' = '100kb',

220

'sink.buffer-flush.max-rows' = '100',

221

'sink.buffer-flush.interval' = '500ms'

222

);

223

```

224

225

### Parallelism Control

226

227

The sink supports configurable parallelism to scale write operations across multiple HBase region servers.

228

229

```sql

230

CREATE TABLE scalable_sink (

231

-- Table schema

232

) WITH (

233

'connector' = 'hbase-1.4',

234

'table-name' = 'large_table',

235

'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',

236

-- Scale write operations

237

'sink.parallelism' = '12',

238

-- Optimize for distributed writes

239

'sink.buffer-flush.max-size' = '8mb',

240

'sink.buffer-flush.max-rows' = '4000'

241

);

242

```

243

244

## Data Conversion and Serialization

245

246

### Row Key Mapping

247

248

The sink automatically handles conversion from Flink's primary key to HBase row key format.

249

250

**Conversion Rules:**

251

252

- Simple primary keys: Direct string conversion

253

- Composite primary keys: Concatenated with separators

254

- Complex types: Serialized to byte arrays

255

256

```java

257

// Example: Row key conversion

258

// Flink primary key: ("user_123", "2023-01-01")

259

// HBase row key: "user_123|2023-01-01"

260

```

261

262

### Column Family Mapping

263

264

Flink ROW types are mapped to HBase column families with individual fields becoming column qualifiers.

265

266

```sql

267

-- Flink schema

268

CREATE TABLE user_data (

269

user_id STRING,

270

profile ROW<name STRING, age INT, email STRING>,

271

settings ROW<theme STRING, notifications BOOLEAN>,

272

PRIMARY KEY (user_id) NOT ENFORCED

273

) WITH (...);

274

275

-- Maps to HBase structure:

276

-- Row key: user_id value

277

-- Column family 'profile': columns 'name', 'age', 'email'

278

-- Column family 'settings': columns 'theme', 'notifications'

279

```

280

281

### Type Serialization

282

283

All Flink data types are automatically serialized to HBase-compatible byte arrays.

284

285

**Supported Type Conversions:**

286

287

- Primitive types: Direct byte serialization

288

- Timestamp types: Long milliseconds representation

289

- Decimal types: Precision-preserving byte encoding

290

- String types: UTF-8 byte encoding

291

- Complex types: JSON or binary serialization

292

293

## Error Handling and Reliability

294

295

### Exactly-Once Guarantees

296

297

The sink integrates with Flink's checkpointing mechanism to provide exactly-once processing guarantees.

298

299

**Reliability Features:**

300

301

- Transactional writes aligned with Flink checkpoints

302

- Automatic retry on temporary failures

303

- Dead letter queue support for failed records

304

- Connection pooling and management

305

306

### Exception Handling

307

308

Comprehensive error handling for various failure scenarios:

309

310

```java

311

// Common error scenarios and handling:

312

313

// 1. Connection failures

314

// - Automatic retry with exponential backoff

315

// - Connection pool management

316

// - Failover to backup region servers

317

318

// 2. Table schema mismatches

319

// - Schema validation during sink creation

320

// - Clear error messages for incompatible types

321

// - Graceful handling of missing column families

322

323

// 3. Write buffer overflows

324

// - Configurable buffer sizes and timeouts

325

// - Automatic flushing on resource pressure

326

// - Memory usage monitoring and alerts

327

328

// 4. HBase cluster unavailability

329

// - Circuit breaker pattern for failures

330

// - Graceful degradation and recovery

331

// - Integration with Flink's restart strategies

332

```

333

334

**Error Configuration:**

335

336

```sql

337

CREATE TABLE resilient_sink (

338

-- Table schema

339

) WITH (

340

'connector' = 'hbase-1.4',

341

'table-name' = 'critical_data',

342

'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',

343

-- Aggressive retry for critical data

344

'sink.buffer-flush.max-size' = '2mb',

345

'sink.buffer-flush.interval' = '1s',

346

-- Multiple ZK nodes for high availability

347

);

348

```

349

350

## Monitoring and Metrics

351

352

The sink provides comprehensive metrics for monitoring write performance and health:

353

354

**Available Metrics:**

355

356

- Write throughput (records/second, bytes/second)

357

- Buffer utilization and flush frequency

358

- Error rates and retry counts

359

- Connection pool status

360

- Latency percentiles for write operations

361

362

**Integration with Flink Metrics:**

363

364

```java

365

// Metrics are automatically registered with Flink's metric system

366

// Available in Flink UI and external monitoring systems

367

// - numRecordsOut: Total records written

368

// - numBytesOut: Total bytes written

369

// - currentSendTime: Current write latency

370

// - bufferUsage: Current buffer utilization percentage

371

```