or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection-config.mddata-types.mdindex.mdlookup-operations.mdsink-operations.mdsql-ddl.md

sink-operations.mddocs/

0

# Sink Operations and Buffering

1

2

UPSERT sink operations with intelligent buffering strategies, exactly-once semantics through checkpointing, and comprehensive error handling.

3

4

## Capabilities

5

6

### UPSERT Operations

7

8

HBase sink supports UPSERT semantics, handling INSERT, UPDATE_AFTER, and DELETE operations from Flink's changelog streams.

9

10

```sql { .api }

11

-- Supported changelog operations

12

INSERT INTO hbase_table VALUES (...); -- Creates new row or updates existing

13

UPDATE hbase_table SET ... WHERE rowkey = '...'; -- Updates existing row (becomes UPDATE_AFTER)

14

DELETE FROM hbase_table WHERE rowkey = '...'; -- Deletes row by row key

15

```

16

17

**Operation Mapping**:

18

- `INSERT` and `UPDATE_AFTER`: Converted to HBase PUT operations

19

- `DELETE`: Converted to HBase DELETE operations

20

- `UPDATE_BEFORE`: Ignored (not needed for UPSERT semantics)

21

22

**Usage Examples**:

23

24

```sql

25

-- Streaming UPSERT from changelog source

26

INSERT INTO user_profiles

27

SELECT

28

user_id,

29

ROW(name, email, age) AS info,

30

ROW(last_login, total_orders) AS activity

31

FROM user_changelog_stream;

32

33

-- Batch UPSERT operation

34

INSERT INTO product_inventory

35

SELECT

36

product_id,

37

ROW(name, category, price) AS basic_info,

38

ROW(stock_count, warehouse_location) AS inventory

39

FROM product_updates

40

WHERE update_type IN ('INSERT', 'UPDATE');

41

42

-- Delete operation

43

DELETE FROM expired_sessions

44

WHERE session_id IN (

45

SELECT session_id FROM session_cleanup_stream

46

);

47

```

48

49

### Buffering Configuration

50

51

Intelligent buffering strategies to optimize write performance and reduce HBase load.

52

53

```sql { .api }

54

WITH (

55

'sink.buffer-flush.max-size' = '2mb', -- Buffer size threshold (default: 2MB)

56

'sink.buffer-flush.max-rows' = '1000', -- Buffer row count threshold (default: 1000)

57

'sink.buffer-flush.interval' = '1s' -- Time-based flush interval (default: 1s)

58

)

59

```

60

61

**Buffer Flush Triggers**:

62

1. **Size Threshold**: When buffered mutations exceed max-size

63

2. **Row Count Threshold**: When buffered row count exceeds max-rows

64

3. **Time Interval**: When flush interval expires since last flush

65

4. **Checkpoint**: On Flink checkpoint to ensure exactly-once semantics

66

67

**Usage Examples**:

68

69

```sql

70

-- High-throughput buffering for batch loads

71

CREATE TABLE batch_sink (

72

id STRING,

73

data ROW<value STRING, timestamp BIGINT>,

74

PRIMARY KEY (id) NOT ENFORCED

75

) WITH (

76

'connector' = 'hbase-2.2',

77

'table-name' = 'batch_data',

78

'zookeeper.quorum' = 'localhost:2181',

79

'sink.buffer-flush.max-size' = '64mb', -- Large buffer for batch

80

'sink.buffer-flush.max-rows' = '100000', -- High row count

81

'sink.buffer-flush.interval' = '30s' -- Longer flush interval

82

);

83

84

-- Low-latency streaming for real-time updates

85

CREATE TABLE realtime_sink (

86

event_id STRING,

87

event_data ROW<type STRING, payload STRING, timestamp TIMESTAMP(3)>,

88

PRIMARY KEY (event_id) NOT ENFORCED

89

) WITH (

90

'connector' = 'hbase-2.2',

91

'table-name' = 'events',

92

'zookeeper.quorum' = 'localhost:2181',

93

'sink.buffer-flush.max-size' = '100kb', -- Small buffer for low latency

94

'sink.buffer-flush.max-rows' = '10', -- Low row count

95

'sink.buffer-flush.interval' = '100ms' -- Fast flush interval

96

);

97

```

98

99

### Parallelism Configuration

100

101

Control sink parallelism for optimal write throughput and resource utilization.

102

103

```sql { .api }

104

WITH (

105

'sink.parallelism' = '4' -- Number of parallel sink operators

106

)

107

```

108

109

**Parallelism Considerations**:

110

- Higher parallelism increases write throughput

111

- Each parallel instance maintains separate buffers

112

- HBase region distribution affects optimal parallelism

113

- Memory usage scales with parallelism

114

115

**Usage Examples**:

116

117

```sql

118

-- High-throughput sink with multiple parallel writers

119

CREATE TABLE parallel_sink (

120

partition_key STRING,

121

metrics ROW<cpu_usage DOUBLE, memory_usage DOUBLE, timestamp TIMESTAMP(3)>,

122

PRIMARY KEY (partition_key) NOT ENFORCED

123

) WITH (

124

'connector' = 'hbase-2.2',

125

'table-name' = 'system_metrics',

126

'zookeeper.quorum' = 'localhost:2181',

127

'sink.parallelism' = '8', -- 8 parallel writers

128

'sink.buffer-flush.max-size' = '8mb',

129

'sink.buffer-flush.max-rows' = '5000'

130

);

131

132

-- Single-writer sink for ordered operations

133

CREATE TABLE ordered_sink (

134

sequence_id STRING,

135

ordered_data ROW<value STRING, order_timestamp TIMESTAMP(3)>,

136

PRIMARY KEY (sequence_id) NOT ENFORCED

137

) WITH (

138

'connector' = 'hbase-2.2',

139

'table-name' = 'ordered_events',

140

'zookeeper.quorum' = 'localhost:2181',

141

'sink.parallelism' = '1' -- Single writer for ordering

142

);

143

```

144

145

### Exactly-Once Semantics

146

147

Checkpoint integration ensures exactly-once processing guarantees with HBase.

148

149

**Checkpoint Behavior**:

150

- Buffers are flushed on checkpoint barriers

151

- Failed checkpoints trigger buffer discard and restart

152

- Recovery restores buffer state from last successful checkpoint

153

- Two-phase commit protocol ensures data consistency

154

155

**Configuration Example**:

156

```sql

157

-- Sink with exactly-once guarantees

158

CREATE TABLE exactly_once_sink (

159

transaction_id STRING,

160

transaction ROW<amount DECIMAL(15,2), currency STRING, timestamp TIMESTAMP(3)>,

161

PRIMARY KEY (transaction_id) NOT ENFORCED

162

) WITH (

163

'connector' = 'hbase-2.2',

164

'table-name' = 'financial_transactions',

165

'zookeeper.quorum' = 'localhost:2181',

166

'sink.buffer-flush.max-size' = '4mb',

167

'sink.buffer-flush.max-rows' = '2000',

168

'sink.buffer-flush.interval' = '5s'

169

);

170

```

171

172

**Flink Job Configuration** (for exactly-once):

173

```java

174

// Configure checkpointing for exactly-once

175

env.enableCheckpointing(60000); // Checkpoint every 60 seconds

176

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

177

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);

178

```

179

180

### Error Handling and Recovery

181

182

Comprehensive error handling for write failures and network issues.

183

184

**Error Categories**:

185

1. **Transient Errors**: Connection timeouts, region server unavailability

186

2. **Permanent Errors**: Table not found, schema mismatches, permission denied

187

3. **Data Errors**: Serialization failures, constraint violations

188

189

**Error Handling Strategies**:

190

191

```sql

192

-- Robust sink with retry and error handling

193

CREATE TABLE robust_sink (

194

record_id STRING,

195

payload ROW<data STRING, metadata STRING>,

196

PRIMARY KEY (record_id) NOT ENFORCED

197

) WITH (

198

'connector' = 'hbase-2.2',

199

'table-name' = 'reliable_data',

200

'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',

201

'sink.buffer-flush.max-size' = '2mb',

202

'sink.buffer-flush.max-rows' = '1000',

203

'sink.buffer-flush.interval' = '10s'

204

);

205

```

206

207

**Dead Letter Queue Pattern**:

208

```sql

209

-- Main sink with error handling

210

INSERT INTO main_hbase_sink

211

SELECT * FROM input_stream

212

WHERE is_valid_record(data);

213

214

-- Error records to dead letter queue

215

INSERT INTO error_sink

216

SELECT *, 'validation_failed' AS error_reason

217

FROM input_stream

218

WHERE NOT is_valid_record(data);

219

```

220

221

### Performance Tuning Guidelines

222

223

**Buffer Size Tuning**:

224

- Start with default 2MB buffer size

225

- Increase for batch workloads (up to 64MB)

226

- Decrease for low-latency requirements (down to 100KB)

227

- Monitor memory usage and adjust accordingly

228

229

**Row Count Tuning**:

230

- Default 1000 rows works for most scenarios

231

- Increase for small records (up to 100K rows)

232

- Decrease for large records (down to 10 rows)

233

- Balance with memory constraints

234

235

**Flush Interval Tuning**:

236

- Default 1 second provides good balance

237

- Decrease for real-time applications (100ms-500ms)

238

- Increase for batch processing (10s-60s)

239

- Consider checkpoint interval alignment

240

241

**Complete Performance Configuration**:

242

```sql

243

CREATE TABLE tuned_sink (

244

key STRING,

245

large_payload ROW<

246

json_data STRING, -- Large JSON payloads

247

binary_data VARBINARY, -- Binary attachments

248

metadata ROW<

249

size_bytes BIGINT,

250

content_type STRING,

251

created_at TIMESTAMP(3)

252

>

253

>,

254

PRIMARY KEY (key) NOT ENFORCED

255

) WITH (

256

'connector' = 'hbase-2.2',

257

'table-name' = 'large_objects',

258

'zookeeper.quorum' = 'localhost:2181',

259

260

-- Tuned for large records

261

'sink.buffer-flush.max-size' = '32mb', -- Large buffer for big records

262

'sink.buffer-flush.max-rows' = '100', -- Few rows due to size

263

'sink.buffer-flush.interval' = '15s', -- Longer interval for batching

264

'sink.parallelism' = '4' -- Moderate parallelism

265

);

266

```

267

268

### Monitoring and Metrics

269

270

**Key Metrics to Monitor**:

271

- Buffer flush frequency and size

272

- Write throughput (records/second)

273

- Write latency (end-to-end)

274

- Error rates and types

275

- Memory usage per sink task

276

277

**Monitoring Query Example**:

278

```sql

279

-- Monitor sink performance

280

SELECT

281

window_start,

282

window_end,

283

COUNT(*) AS records_written,

284

COUNT(DISTINCT rowkey) AS unique_keys,

285

COUNT(*) / EXTRACT(EPOCH FROM (window_end - window_start)) AS records_per_second

286

FROM TABLE(

287

TUMBLE(TABLE sink_monitoring, DESCRIPTOR(proc_time), INTERVAL '1' MINUTE)

288

)

289

GROUP BY window_start, window_end;

290

```

291

292

### Troubleshooting Common Issues

293

294

**High Memory Usage**:

295

- Reduce `sink.buffer-flush.max-size`

296

- Decrease `sink.parallelism`

297

- Monitor heap usage and adjust JVM settings

298

299

**Slow Write Performance**:

300

- Increase `sink.buffer-flush.max-size`

301

- Increase `sink.parallelism`

302

- Check HBase region distribution

303

- Verify network connectivity

304

305

**Data Loss Concerns**:

306

- Enable Flink checkpointing

307

- Verify exactly-once configuration

308

- Monitor checkpoint success rates

309

- Check HBase write acknowledgments

310

311

**Connection Issues**:

312

- Verify Zookeeper connectivity

313

- Check HBase region server health

314

- Review network configuration

315

- Monitor connection pool exhaustion