or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdinput-formats.mdlookup-functions.mdschema-config.mdsink-functions.mdtable-api.mdutilities.md

input-formats.mddocs/

0

# DataStream Input Formats

1

2

The HBase connector provides input formats for reading data from HBase tables in both batch and streaming Flink applications. These formats handle table scanning, row key ranges, and automatic result mapping to Flink data types.

3

4

## HBaseRowInputFormat

5

6

The primary input format for reading HBase tables and converting results to Flink Row objects.

7

8

```java { .api }

9

class HBaseRowInputFormat extends AbstractTableInputFormat<Row> {

10

public HBaseRowInputFormat(Configuration conf, String tableName, HBaseTableSchema schema);

11

public void configure(Configuration parameters);

12

public String getTableName();

13

public TypeInformation<Row> getProducedType();

14

}

15

```

16

17

### Usage Example

18

19

```java

20

import org.apache.flink.addons.hbase.HBaseRowInputFormat;

21

import org.apache.flink.addons.hbase.HBaseTableSchema;

22

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

23

import org.apache.hadoop.conf.Configuration;

24

25

// Configure HBase connection

26

Configuration conf = new Configuration();

27

conf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");

28

conf.set("hbase.zookeeper.property.clientPort", "2181");

29

conf.set("zookeeper.znode.parent", "/hbase");

30

31

// Define table schema

32

HBaseTableSchema schema = new HBaseTableSchema();

33

schema.setRowKey("user_id", String.class);

34

schema.addColumn("profile", "name", String.class);

35

schema.addColumn("profile", "age", Integer.class);

36

schema.addColumn("activity", "last_login", java.sql.Timestamp.class);

37

38

// Create input format

39

HBaseRowInputFormat inputFormat = new HBaseRowInputFormat(conf, "user_table", schema);

40

41

// Use in streaming environment

42

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

43

DataStream<Row> userStream = env.createInput(inputFormat);

44

45

// Process the data

46

userStream.filter(row -> (Integer) row.getField(2) > 18)

47

.print();

48

49

env.execute("HBase Read Job");

50

```

51

52

## AbstractTableInputFormat<T>

53

54

Base class for creating custom HBase input formats with different output types.

55

56

```java { .api }

57

abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit> {

58

// Abstract methods to implement

59

public abstract Scan getScanner();

60

public abstract String getTableName();

61

public abstract T mapResultToOutType(Result r);

62

public abstract void configure(Configuration parameters);

63

64

// Implemented methods

65

public void open(TableInputSplit split) throws IOException;

66

public T nextRecord(T reuse) throws IOException;

67

public boolean reachedEnd() throws IOException;

68

public void close() throws IOException;

69

public void closeInputFormat() throws IOException;

70

public TableInputSplit[] createInputSplits(int minNumSplits) throws IOException;

71

public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits);

72

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;

73

}

74

```

75

76

### Custom Input Format Example

77

78

```java

79

import org.apache.flink.addons.hbase.AbstractTableInputFormat;

80

import org.apache.flink.api.java.tuple.Tuple3;

81

import org.apache.hadoop.hbase.client.Result;

82

import org.apache.hadoop.hbase.client.Scan;

83

import org.apache.hadoop.hbase.util.Bytes;

84

85

public class CustomHBaseInputFormat extends AbstractTableInputFormat<Tuple3<String, String, Integer>> {

86

private String tableName;

87

private Configuration conf;

88

89

public CustomHBaseInputFormat(Configuration conf, String tableName) {

90

this.conf = conf;

91

this.tableName = tableName;

92

}

93

94

@Override

95

public Scan getScanner() {

96

Scan scan = new Scan();

97

scan.addFamily(Bytes.toBytes("profile"));

98

scan.addFamily(Bytes.toBytes("activity"));

99

return scan;

100

}

101

102

@Override

103

public String getTableName() {

104

return tableName;

105

}

106

107

@Override

108

public Tuple3<String, String, Integer> mapResultToOutType(Result r) {

109

String rowKey = Bytes.toString(r.getRow());

110

String name = Bytes.toString(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("name")));

111

Integer age = Bytes.toInt(r.getValue(Bytes.toBytes("profile"), Bytes.toBytes("age")));

112

return new Tuple3<>(rowKey, name, age);

113

}

114

115

@Override

116

public void configure(Configuration parameters) {

117

// Configuration logic

118

}

119

}

120

```

121

122

## TableInputFormat<T>

123

124

Abstract input format specialized for Tuple output types.

125

126

```java { .api }

127

abstract class TableInputFormat<T> extends AbstractTableInputFormat<T> {

128

// Abstract methods specific to Tuple mapping

129

public abstract Scan getScanner();

130

public abstract String getTableName();

131

public abstract T mapResultToTuple(Result r);

132

133

// Implementation of configure method

134

public void configure(Configuration parameters);

135

}

136

```

137

138

## TableInputSplit

139

140

Represents a split of an HBase table for parallel processing.

141

142

```java { .api }

143

class TableInputSplit implements InputSplit {

144

public byte[] getTableName();

145

public byte[] getStartRow();

146

public byte[] getEndRow();

147

public int getSplitNumber();

148

public String[] getLocations() throws IOException;

149

}

150

```

151

152

## Scan Configuration

153

154

The input formats use HBase Scan objects to define which data to read:

155

156

### Basic Scanning

157

158

```java

159

// Scan all rows and columns

160

Scan scan = new Scan();

161

162

// Scan specific column families

163

Scan scan = new Scan();

164

scan.addFamily(Bytes.toBytes("cf1"));

165

scan.addFamily(Bytes.toBytes("cf2"));

166

167

// Scan specific columns

168

Scan scan = new Scan();

169

scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col1"));

170

scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col2"));

171

```

172

173

### Row Key Range Scanning

174

175

```java

176

// Scan row key range

177

Scan scan = new Scan();

178

scan.setStartRow(Bytes.toBytes("user_00001"));

179

scan.setStopRow(Bytes.toBytes("user_99999"));

180

181

// Scan with row key prefix

182

Scan scan = new Scan();

183

scan.setRowPrefixFilter(Bytes.toBytes("user_2023"));

184

```

185

186

### Filtering

187

188

```java

189

import org.apache.hadoop.hbase.filter.*;

190

191

// Single column value filter

192

SingleColumnValueFilter filter = new SingleColumnValueFilter(

193

Bytes.toBytes("profile"),

194

Bytes.toBytes("age"),

195

CompareFilter.CompareOp.GREATER,

196

Bytes.toBytes(18)

197

);

198

Scan scan = new Scan();

199

scan.setFilter(filter);

200

201

// Multiple filters

202

FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);

203

filterList.addFilter(new SingleColumnValueFilter(

204

Bytes.toBytes("profile"), Bytes.toBytes("active"),

205

CompareFilter.CompareOp.EQUAL, Bytes.toBytes(true)));

206

filterList.addFilter(new SingleColumnValueFilter(

207

Bytes.toBytes("profile"), Bytes.toBytes("age"),

208

CompareFilter.CompareOp.GREATER, Bytes.toBytes(21)));

209

210

Scan scan = new Scan();

211

scan.setFilter(filterList);

212

```

213

214

## Performance Considerations

215

216

### Parallelism

217

218

The input formats automatically create splits based on HBase region boundaries for optimal parallel processing:

219

220

```java

221

// Control the minimum number of splits

222

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

223

env.setParallelism(8); // This will influence split creation

224

225

DataSet<Row> data = env.createInput(inputFormat);

226

```

227

228

### Caching and Batching

229

230

Configure HBase scan caching for better performance:

231

232

```java

233

Scan scan = new Scan();

234

scan.setCaching(1000); // Number of rows to cache per RPC

235

scan.setBatch(10); // Number of columns to retrieve per RPC

236

```

237

238

### Memory Management

239

240

For large scans, consider memory usage:

241

242

```java

243

// Limit scan to specific time range

244

Scan scan = new Scan();

245

scan.setTimeRange(startTime, endTime);

246

247

// Use filters to reduce data transfer

248

scan.setFilter(new PageFilter(10000)); // Limit results per region

249

```

250

251

## Error Handling

252

253

Common exceptions when using input formats:

254

255

```java

256

try {

257

DataStream<Row> stream = env.createInput(inputFormat);

258

// Process stream

259

} catch (IOException e) {

260

// HBase connection or read errors

261

log.error("HBase read failed", e);

262

} catch (IllegalArgumentException e) {

263

// Invalid configuration or schema

264

log.error("Configuration error", e);

265

}

266

```

267

268

## Type Mapping

269

270

The input formats handle automatic type conversion from HBase byte arrays:

271

272

| Java Type | HBase Storage | Notes |

273

|-----------|---------------|-------|

274

| `String` | `byte[]` | UTF-8 encoding |

275

| `Integer` | `byte[]` | 4-byte big-endian |

276

| `Long` | `byte[]` | 8-byte big-endian |

277

| `Double` | `byte[]` | IEEE 754 format |

278

| `Boolean` | `byte[]` | Single byte (0 or 1) |

279

| `java.sql.Timestamp` | `byte[]` | Long timestamp |

280

| `byte[]` | `byte[]` | Direct storage |