or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md

index.mddocs/

0

# Apache Flink HBase Connector

1

2

The Apache Flink HBase connector provides comprehensive integration between Apache Flink's stream processing capabilities and Apache HBase's NoSQL database. It supports both DataStream API and Table API operations for reading from and writing to HBase tables, with features like lookup joins, buffered writes, and flexible serialization.

3

4

## Package Information

5

6

- **Package Name**: org.apache.flink:flink-hbase_2.11

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Version**: 1.10.3

10

- **Installation**: Add to your Maven pom.xml:

11

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-hbase_${scala.binary.version}</artifactId>

16

<version>1.10.3</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```java

23

// DataStream API - Input Format

24

import org.apache.flink.addons.hbase.HBaseRowInputFormat;

25

import org.apache.flink.addons.hbase.HBaseTableSchema;

26

27

// DataStream API - Sink Function

28

import org.apache.flink.addons.hbase.HBaseUpsertSinkFunction;

29

30

// Table API - Source and Sink

31

import org.apache.flink.addons.hbase.HBaseTableSource;

32

import org.apache.flink.addons.hbase.HBaseUpsertTableSink;

33

import org.apache.flink.addons.hbase.HBaseTableFactory;

34

35

// Configuration

36

import org.apache.flink.addons.hbase.HBaseOptions;

37

import org.apache.flink.addons.hbase.HBaseWriteOptions;

38

39

// Table API Descriptors

40

import org.apache.flink.table.descriptors.HBase;

41

```

42

43

## Basic Usage

44

45

### Reading from HBase (DataStream API)

46

47

```java

48

import org.apache.flink.addons.hbase.HBaseRowInputFormat;

49

import org.apache.flink.addons.hbase.HBaseTableSchema;

50

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

51

import org.apache.hadoop.conf.Configuration;

52

53

// Configure HBase connection

54

Configuration conf = new Configuration();

55

conf.set("hbase.zookeeper.quorum", "localhost:2181");

56

57

// Define table schema

58

HBaseTableSchema schema = new HBaseTableSchema();

59

schema.setRowKey("rowkey", String.class);

60

schema.addColumn("cf1", "col1", String.class);

61

schema.addColumn("cf1", "col2", Integer.class);

62

63

// Create input format

64

HBaseRowInputFormat inputFormat = new HBaseRowInputFormat(conf, "my_table", schema);

65

66

// Read as DataStream

67

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

68

DataStream<Row> hbaseData = env.createInput(inputFormat);

69

```

70

71

### Writing to HBase (DataStream API)

72

73

```java

74

import org.apache.flink.addons.hbase.HBaseUpsertSinkFunction;

75

import org.apache.flink.types.Row;

76

77

// Create sink function with buffering

78

HBaseUpsertSinkFunction sinkFunction = new HBaseUpsertSinkFunction(

79

"my_table", // table name

80

schema, // table schema

81

conf, // HBase configuration

82

2 * 1024 * 1024, // buffer flush max size (2MB)

83

1000, // buffer flush max mutations

84

5000 // buffer flush interval (5 seconds)

85

);

86

87

// Apply to DataStream

88

DataStream<Tuple2<Boolean, Row>> upsertStream = // your stream of upserts

89

upsertStream.addSink(sinkFunction);

90

```

91

92

### Table API Usage

93

94

```java

95

import org.apache.flink.table.api.EnvironmentSettings;

96

import org.apache.flink.table.api.TableEnvironment;

97

import org.apache.flink.table.descriptors.HBase;

98

import org.apache.flink.table.descriptors.Schema;

99

100

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());

101

102

// Register HBase table

103

tableEnv.connect(

104

new HBase()

105

.version("1.4.3")

106

.tableName("my_table")

107

.zookeeperQuorum("localhost:2181")

108

)

109

.withSchema(

110

new Schema()

111

.field("rowkey", DataTypes.STRING())

112

.field("cf1_col1", DataTypes.STRING())

113

.field("cf1_col2", DataTypes.INT())

114

)

115

.createTemporaryTable("hbase_table");

116

117

// Query the table

118

Table result = tableEnv.sqlQuery("SELECT * FROM hbase_table WHERE cf1_col2 > 100");

119

```

120

121

## Architecture

122

123

The HBase connector is organized into several key components:

124

125

- **Input Formats**: For batch and streaming reads (`HBaseRowInputFormat`, `AbstractTableInputFormat`)

126

- **Sink Functions**: For streaming writes with buffering (`HBaseUpsertSinkFunction`)

127

- **Table API Integration**: Source/sink factories and descriptors (`HBaseTableFactory`, `HBaseTableSource`)

128

- **Schema Definition**: Type-safe column family and qualifier mapping (`HBaseTableSchema`)

129

- **Configuration**: Connection and write performance options (`HBaseOptions`, `HBaseWriteOptions`)

130

- **Utilities**: Type conversion and HBase operation helpers

131

132

## Capabilities

133

134

### DataStream API Input Formats

135

136

Read data from HBase tables using InputFormat classes with full control over scanning and result mapping.

137

138

```java { .api }

139

class HBaseRowInputFormat extends AbstractTableInputFormat<Row> {

140

public HBaseRowInputFormat(Configuration conf, String tableName, HBaseTableSchema schema);

141

public void configure(Configuration parameters);

142

public String getTableName();

143

public TypeInformation<Row> getProducedType();

144

}

145

```

146

147

[DataStream Input Formats](./input-formats.md)

148

149

### DataStream API Sink Functions

150

151

Write data to HBase tables with configurable buffering and automatic batching for optimal performance.

152

153

```java { .api }

154

class HBaseUpsertSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>>

155

implements CheckpointedFunction, BufferedMutator.ExceptionListener {

156

public HBaseUpsertSinkFunction(String hTableName, HBaseTableSchema schema,

157

Configuration conf, long bufferFlushMaxSizeInBytes,

158

long bufferFlushMaxMutations, long bufferFlushIntervalMillis);

159

public void open(Configuration parameters);

160

public void invoke(Tuple2<Boolean, Row> value, Context context);

161

public void close();

162

}

163

```

164

165

[DataStream Sink Functions](./sink-functions.md)

166

167

### Table API Sources and Sinks

168

169

Integrate HBase with Flink's Table API for SQL-based data processing and lookup joins.

170

171

```java { .api }

172

class HBaseTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> {

173

public HBaseTableSource(Configuration conf, String tableName);

174

public void addColumn(String family, String qualifier, Class<?> clazz);

175

public void setRowKey(String rowKeyName, Class<?> clazz);

176

public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv);

177

public TableFunction<Row> getLookupFunction(String[] lookupKeys);

178

}

179

```

180

181

```java { .api }

182

class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {

183

public HBaseUpsertTableSink(HBaseTableSchema hbaseTableSchema,

184

HBaseOptions hbaseOptions, HBaseWriteOptions writeOptions);

185

public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream);

186

}

187

```

188

189

[Table API Integration](./table-api.md)

190

191

### Schema Configuration

192

193

Define type-safe mappings between Flink data types and HBase column families and qualifiers.

194

195

```java { .api }

196

class HBaseTableSchema {

197

public void addColumn(String family, String qualifier, Class<?> clazz);

198

public void setRowKey(String rowKeyName, Class<?> clazz);

199

public void setCharset(String charset);

200

public String[] getFamilyNames();

201

public TypeInformation<?>[] getQualifierTypes(String family);

202

}

203

```

204

205

[Schema and Configuration](./schema-config.md)

206

207

### Lookup Functions

208

209

Enable temporal table joins by looking up dimension data from HBase in real-time.

210

211

```java { .api }

212

class HBaseLookupFunction extends TableFunction<Row> {

213

public HBaseLookupFunction(Configuration configuration, String hTableName,

214

HBaseTableSchema hbaseTableSchema);

215

public void eval(Object rowKey);

216

public TypeInformation<Row> getResultType();

217

}

218

```

219

220

[Lookup Functions](./lookup-functions.md)

221

222

### Utility Classes

223

224

Helper classes for type conversion, configuration serialization, and HBase operation creation.

225

226

```java { .api }

227

class HBaseTypeUtils {

228

public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset);

229

public static byte[] serializeFromObject(Object value, int typeIdx, Charset stringCharset);

230

public static int getTypeIndex(TypeInformation typeInfo);

231

public static boolean isSupportedType(Class<?> clazz);

232

}

233

```

234

235

[Utilities](./utilities.md)

236

237

## Configuration

238

239

### Connection Configuration

240

241

```java { .api }

242

class HBaseOptions {

243

public static Builder builder();

244

245

static class Builder {

246

public Builder setTableName(String tableName); // Required

247

public Builder setZkQuorum(String zkQuorum); // Required

248

public Builder setZkNodeParent(String zkNodeParent); // Optional

249

public HBaseOptions build();

250

}

251

}

252

```

253

254

### Write Performance Configuration

255

256

```java { .api }

257

class HBaseWriteOptions {

258

public static Builder builder();

259

260

static class Builder {

261

public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes);

262

public Builder setBufferFlushMaxRows(long bufferFlushMaxRows);

263

public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis);

264

public HBaseWriteOptions build();

265

}

266

}

267

```

268

269

## Supported Data Types

270

271

The connector supports automatic serialization/deserialization for:

272

273

- **Primitive types**: `byte[]`, `String`, `Byte`, `Short`, `Integer`, `Long`, `Float`, `Double`, `Boolean`

274

- **Temporal types**: `java.sql.Timestamp`, `java.sql.Date`, `java.sql.Time`

275

- **Numeric types**: `java.math.BigDecimal`, `java.math.BigInteger`

276

277

## Error Handling

278

279

Common exceptions and their meanings:

280

281

- `RuntimeException`: HBase connection or configuration errors

282

- `IllegalArgumentException`: Invalid parameters or unsupported data types

283

- `IOException`: HBase I/O operation failures

284

- `ValidationException`: Table configuration validation errors

285

- `TableNotFoundException`: Specified HBase table doesn't exist

286

- `RetriesExhaustedWithDetailsException`: BufferedMutator operation failures

287

288

## Requirements

289

290

- **HBase Version**: 1.4.3

291

- **Flink Version**: 1.10.3

292

- **Java Version**: 8+

293

- **Hadoop Configuration**: Properly configured `hbase-site.xml` or programmatic configuration