or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro.mdhadoop.mdhbase.mdhcatalog.mdindex.mdjdbc.md

hbase.mddocs/

0

# HBase Connector

1

2

Apache HBase database connectivity for Flink batch processing, providing region-aware table access with distributed processing capabilities.

3

4

## Capabilities

5

6

### TableInputFormat

7

8

Abstract base class for reading from HBase tables with region-aware splitting for optimal distributed processing.

9

10

```java { .api }

11

/**

12

* Abstract base class for reading from HBase tables in Flink

13

* @param <T> Tuple type representing the HBase table row structure

14

*/

15

public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> {

16

17

/**

18

* Default constructor for TableInputFormat

19

*/

20

public TableInputFormat();

21

22

/**

23

* Returns the HBase Scan instance for reading table data

24

* Subclasses must implement this to define what data to read

25

* @return Scan object configured with column families, filters, etc.

26

*/

27

protected abstract Scan getScanner();

28

29

/**

30

* Returns the name of the HBase table to read from

31

* @return HBase table name as a string

32

*/

33

protected abstract String getTableName();

34

35

/**

36

* Maps an HBase Result to a Flink Tuple

37

* This method defines how to convert HBase row data to Flink types

38

* @param r HBase Result containing row data

39

* @return Flink Tuple representing the row

40

*/

41

protected abstract T mapResultToTuple(Result r);

42

43

/**

44

* Determines whether to include a specific HBase region in the input split

45

* Can be overridden to filter regions based on key ranges

46

* @param startKey Start key of the region

47

* @param endKey End key of the region

48

* @return true to include the region, false to skip it

49

*/

50

protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey);

51

}

52

```

53

54

**Usage Example:**

55

56

```java

57

import org.apache.flink.addons.hbase.TableInputFormat;

58

import org.apache.flink.api.java.tuple.Tuple3;

59

import org.apache.hadoop.hbase.client.Scan;

60

import org.apache.hadoop.hbase.client.Result;

61

import org.apache.hadoop.hbase.util.Bytes;

62

63

// Custom implementation for reading user data from HBase

64

public class UserTableInputFormat extends TableInputFormat<Tuple3<String, String, Integer>> {

65

66

@Override

67

protected Scan getScanner() {

68

Scan scan = new Scan();

69

scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));

70

scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("email"));

71

scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));

72

return scan;

73

}

74

75

@Override

76

protected String getTableName() {

77

return "users";

78

}

79

80

@Override

81

protected Tuple3<String, String, Integer> mapResultToTuple(Result r) {

82

String name = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));

83

String email = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("email")));

84

Integer age = Bytes.toInt(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));

85

return new Tuple3<>(name, email, age);

86

}

87

88

@Override

89

protected boolean includeRegionInSplit(byte[] startKey, byte[] endKey) {

90

// Include all regions by default

91

return true;

92

}

93

}

94

95

// Use in Flink program

96

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

97

DataSet<Tuple3<String, String, Integer>> users = env.createInput(new UserTableInputFormat());

98

users.print();

99

```

100

101

### TableInputSplit

102

103

Input split representing HBase table region ranges for distributed processing.

104

105

```java { .api }

106

/**

107

* Input split representing HBase table region ranges

108

*/

109

public class TableInputSplit extends LocatableInputSplit {

110

111

/**

112

* Creates a new TableInputSplit for an HBase region

113

* @param splitNumber Unique identifier for this split

114

* @param hostnames Array of hostnames where this region is located

115

* @param tableName Name of the HBase table (as byte array)

116

* @param startRow Start row key for this region (as byte array)

117

* @param endRow End row key for this region (as byte array)

118

*/

119

TableInputSplit(final int splitNumber, final String[] hostnames,

120

final byte[] tableName, final byte[] startRow, final byte[] endRow);

121

122

/**

123

* Returns the HBase table name for this split

124

* @return Table name as byte array

125

*/

126

public byte[] getTableName();

127

128

/**

129

* Returns the start row key for this region split

130

* @return Start row key as byte array

131

*/

132

public byte[] getStartRow();

133

134

/**

135

* Returns the end row key for this region split

136

* @return End row key as byte array

137

*/

138

public byte[] getEndRow();

139

}

140

```

141

142

## Advanced Usage Patterns

143

144

### Custom Row Key Filtering

145

146

```java

147

public class FilteredUserTableInputFormat extends TableInputFormat<Tuple2<String, String>> {

148

149

@Override

150

protected Scan getScanner() {

151

Scan scan = new Scan();

152

// Add row key prefix filter

153

scan.setRowPrefixFilter(Bytes.toBytes("user_"));

154

155

// Add column filter

156

scan.addColumn(Bytes.toBytes("profile"), Bytes.toBytes("name"));

157

scan.addColumn(Bytes.toBytes("profile"), Bytes.toBytes("status"));

158

159

// Set time range for recent data only

160

try {

161

long oneWeekAgo = System.currentTimeMillis() - (7 * 24 * 60 * 60 * 1000);

162

scan.setTimeRange(oneWeekAgo, System.currentTimeMillis());

163

} catch (IOException e) {

164

throw new RuntimeException("Failed to set time range", e);

165

}

166

167

return scan;

168

}

169

170

@Override

171

protected String getTableName() {

172

return "user_profiles";

173

}

174

175

@Override

176

protected Tuple2<String, String> mapResultToTuple(Result r) {

177

String name = Bytes.toString(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("name")));

178

String status = Bytes.toString(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("status")));

179

return new Tuple2<>(name, status);

180

}

181

182

@Override

183

protected boolean includeRegionInSplit(byte[] startKey, byte[] endKey) {

184

// Only include regions that contain user data

185

if (startKey.length == 0) return true; // First region

186

if (endKey.length == 0) return true; // Last region

187

188

String startKeyStr = Bytes.toString(startKey);

189

return startKeyStr.startsWith("user_");

190

}

191

}

192

```

193

194

### Multi-Column Family Access

195

196

```java

197

public class CompleteUserTableInputFormat extends TableInputFormat<Tuple5<String, String, Integer, String, Long>> {

198

199

@Override

200

protected Scan getScanner() {

201

Scan scan = new Scan();

202

// Add multiple column families

203

scan.addFamily(Bytes.toBytes("basic")); // Basic info

204

scan.addFamily(Bytes.toBytes("contact")); // Contact info

205

scan.addFamily(Bytes.toBytes("activity")); // Activity data

206

207

// Configure caching for better performance

208

scan.setCaching(1000);

209

scan.setBatch(100);

210

211

return scan;

212

}

213

214

@Override

215

protected String getTableName() {

216

return "complete_users";

217

}

218

219

@Override

220

protected Tuple5<String, String, Integer, String, Long> mapResultToTuple(Result r) {

221

// Extract from basic column family

222

String name = Bytes.toString(r.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name")));

223

Integer age = Bytes.toInt(r.getValue(Bytes.toBytes("basic"), Bytes.toBytes("age")));

224

225

// Extract from contact column family

226

String email = Bytes.toString(r.getValue(Bytes.toBytes("contact"), Bytes.toBytes("email")));

227

228

// Extract from activity column family

229

Long lastLogin = Bytes.toLong(r.getValue(Bytes.toBytes("activity"), Bytes.toBytes("last_login")));

230

231

// Get row key

232

String rowKey = Bytes.toString(r.getRow());

233

234

return new Tuple5<>(rowKey, name, age, email, lastLogin);

235

}

236

}

237

```

238

239

### Integration with Flink's Type System

240

241

```java

242

import org.apache.flink.api.common.typeinfo.TypeInformation;

243

import org.apache.flink.api.java.typeutils.TupleTypeInfo;

244

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

245

246

public class TypedUserTableInputFormat extends TableInputFormat<Tuple3<String, String, Integer>> {

247

248

// ... implement abstract methods as shown above ...

249

250

// Optional: Override to provide explicit type information

251

@Override

252

public TypeInformation<Tuple3<String, String, Integer>> getProducedType() {

253

return new TupleTypeInfo<>(

254

BasicTypeInfo.STRING_TYPE_INFO, // name

255

BasicTypeInfo.STRING_TYPE_INFO, // email

256

BasicTypeInfo.INT_TYPE_INFO // age

257

);

258

}

259

}

260

```

261

262

## Performance Considerations

263

264

### Region-Aware Processing

265

266

HBase regions are automatically mapped to Flink parallel tasks for optimal data locality:

267

268

```java

269

// Configure HBase client for better performance

270

Configuration hbaseConfig = HBaseConfiguration.create();

271

hbaseConfig.set("hbase.client.scanner.caching", "1000");

272

hbaseConfig.set("hbase.client.scanner.timeout.period", "600000");

273

274

// These settings are automatically picked up by TableInputFormat

275

```

276

277

### Memory Management

278

279

```java

280

@Override

281

protected Scan getScanner() {

282

Scan scan = new Scan();

283

284

// Configure batch size to control memory usage

285

scan.setBatch(100); // Process 100 columns per RPC

286

287

// Configure caching for network efficiency

288

scan.setCaching(1000); // Cache 1000 rows per RPC

289

290

// Limit columns to reduce network traffic

291

scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col1"));

292

scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col2"));

293

294

return scan;

295

}

296

```

297

298

## Common Types

299

300

```java { .api }

301

import org.apache.flink.api.common.io.RichInputFormat;

302

import org.apache.flink.api.common.io.LocatableInputSplit;

303

import org.apache.flink.api.java.tuple.Tuple;

304

import org.apache.hadoop.hbase.client.Scan;

305

import org.apache.hadoop.hbase.client.Result;

306

import org.apache.hadoop.hbase.util.Bytes;

307

import org.apache.hadoop.hbase.HBaseConfiguration;

308

import org.apache.hadoop.conf.Configuration;

309

import java.io.IOException;

310

```