or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdlookup-options.mdsink-operations.mdsource-operations.mdtable-factory.mdwrite-options.md

source-operations.mddocs/

0

# Source Operations

1

2

Reading data from HBase tables with support for batch and lookup operations, region-aware splitting, and configurable caching in the Apache Flink HBase 1.4 Connector.

3

4

## Capabilities

5

6

### HBaseDynamicTableSource

7

8

Table source implementation that enables reading data from HBase tables through Flink's Table API and SQL.

9

10

```java { .api }

11

/**

12

* HBase table source implementation for reading data from HBase tables

13

* Extends AbstractHBaseDynamicTableSource with HBase 1.4 specific functionality

14

*/

15

@Internal

16

public class HBaseDynamicTableSource extends AbstractHBaseDynamicTableSource {

17

18

/**

19

* Creates a new HBase dynamic table source

20

* @param conf Hadoop configuration for HBase connection

21

* @param tableName Name of the HBase table to read from

22

* @param hbaseSchema Schema mapping for the HBase table

23

* @param nullStringLiteral String representation for null values

24

* @param lookupOptions Configuration for lookup operations and caching

25

*/

26

public HBaseDynamicTableSource(

27

Configuration conf,

28

String tableName,

29

HBaseTableSchema hbaseSchema,

30

String nullStringLiteral,

31

HBaseLookupOptions lookupOptions

32

);

33

34

/**

35

* Creates a copy of this table source for parallel execution

36

* @return New HBaseDynamicTableSource instance with same configuration

37

*/

38

public DynamicTableSource copy();

39

40

/**

41

* Returns the input format for reading HBase data

42

* @return HBaseRowDataInputFormat configured for this table

43

*/

44

public InputFormat<RowData, ?> getInputFormat();

45

46

/**

47

* Returns the lookup options configuration for testing purposes

48

* @return HBaseLookupOptions instance with caching and retry settings

49

*/

50

@VisibleForTesting

51

public HBaseLookupOptions getLookupOptions();

52

}

53

```

54

55

**Usage Example:**

56

57

```java

58

// Example: Reading from HBase table in Flink job

59

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

60

TableEnvironment tableEnv = StreamTableEnvironment.create(env);

61

62

// Create HBase source table

63

tableEnv.executeSql(

64

"CREATE TABLE user_profiles (" +

65

" user_id STRING," +

66

" profile ROW<name STRING, age INT, email STRING>," +

67

" activity ROW<last_login TIMESTAMP(3), login_count BIGINT>," +

68

" PRIMARY KEY (user_id) NOT ENFORCED" +

69

") WITH (" +

70

" 'connector' = 'hbase-1.4'," +

71

" 'table-name' = 'user_table'," +

72

" 'zookeeper.quorum' = 'localhost:2181'" +

73

")"

74

);

75

76

// Query HBase data

77

Table result = tableEnv.sqlQuery(

78

"SELECT user_id, profile.name, activity.login_count " +

79

"FROM user_profiles " +

80

"WHERE activity.login_count > 10"

81

);

82

```

83

84

### HBaseRowDataInputFormat

85

86

Input format implementation that handles the actual reading of data from HBase tables and conversion to Flink's RowData format.

87

88

```java { .api }

89

/**

90

* InputFormat subclass that wraps access for HBase tables

91

* Returns results as RowData for integration with Flink's Table API

92

*/

93

public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> {

94

95

/**

96

* Creates a new HBase row data input format

97

* @param conf Hadoop configuration for HBase connection

98

* @param tableName Name of the HBase table to read from

99

* @param schema HBase table schema for data conversion

100

* @param nullStringLiteral String representation for null values

101

*/

102

public HBaseRowDataInputFormat(

103

Configuration conf,

104

String tableName,

105

HBaseTableSchema schema,

106

String nullStringLiteral

107

);

108

109

/**

110

* Initializes the table connection and serialization components

111

* @throws IOException if connection cannot be established

112

*/

113

protected void initTable() throws IOException;

114

115

/**

116

* Creates an HBase Scan object for reading data

117

* @return Configured Scan object for the table

118

*/

119

protected Scan getScanner();

120

121

/**

122

* Returns the name of the HBase table being read

123

* @return Table name as configured

124

*/

125

public String getTableName();

126

127

/**

128

* Converts HBase Result to Flink RowData format

129

* @param res HBase Result object from table scan

130

* @return RowData representation of the HBase row

131

*/

132

protected RowData mapResultToOutType(Result res);

133

}

134

```

135

136

### AbstractTableInputFormat

137

138

Base class providing common functionality for all HBase input formats, including connection management, splitting, and error handling.

139

140

```java { .api }

141

/**

142

* Abstract InputFormat to read data from HBase tables

143

* Provides common functionality for connection management and data reading

144

*/

145

@Internal

146

public abstract class AbstractTableInputFormat<T>

147

extends RichInputFormat<T, TableInputSplit> {

148

149

/**

150

* Opens a connection for reading a specific table split

151

* @param split The input split to read from

152

* @throws IOException if the split cannot be opened

153

*/

154

public void open(TableInputSplit split) throws IOException;

155

156

/**

157

* Reads the next record from the HBase table

158

* @param reuse Reusable object for the result (can be null)

159

* @return Next record of type T, or null if end is reached

160

* @throws IOException if reading fails

161

*/

162

public T nextRecord(T reuse) throws IOException;

163

164

/**

165

* Checks if the end of the input has been reached

166

* @return true if no more records are available

167

* @throws IOException if status check fails

168

*/

169

public boolean reachedEnd() throws IOException;

170

171

/**

172

* Closes all connections and resources

173

* @throws IOException if cleanup fails

174

*/

175

public void close() throws IOException;

176

177

/**

178

* Creates input splits for parallel reading based on HBase regions

179

* @param minNumSplits Minimum number of splits to create

180

* @return Array of TableInputSplit objects for parallel execution

181

* @throws IOException if split creation fails

182

*/

183

public TableInputSplit[] createInputSplits(int minNumSplits) throws IOException;

184

185

/**

186

* Returns an input split assigner for the given splits

187

* @param inputSplits Array of input splits to assign

188

* @return InputSplitAssigner for managing split distribution

189

*/

190

public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits);

191

192

/**

193

* Returns statistics about the input data (not implemented)

194

* @param cachedStatistics Previously cached statistics

195

* @return null (statistics not supported)

196

*/

197

public BaseStatistics getStatistics(BaseStatistics cachedStatistics);

198

199

// Abstract methods to be implemented by subclasses

200

protected abstract void initTable() throws IOException;

201

protected abstract Scan getScanner();

202

protected abstract String getTableName();

203

protected abstract T mapResultToOutType(Result r);

204

}

205

```

206

207

## Region-Aware Splitting

208

209

The HBase connector automatically creates input splits based on HBase table regions for optimal parallel processing.

210

211

**Split Creation Process:**

212

213

1. **Region Discovery**: Queries HBase for region start/end keys

214

2. **Scan Range Mapping**: Maps Flink scan ranges to HBase regions

215

3. **Split Generation**: Creates one split per relevant region

216

4. **Locality Preservation**: Assigns splits to nodes hosting the regions

217

218

**Performance Benefits:**

219

220

- Parallel reading across multiple regions

221

- Data locality optimization

222

- Automatic load balancing

223

- Scan pushdown to HBase region servers

224

225

```java

226

// Example: The connector automatically handles splitting

227

// No manual configuration required - splits are created based on:

228

// - HBase table regions

229

// - Scan start/stop keys

230

// - Region server locations

231

```

232

233

## Lookup Operations

234

235

### Lookup Join Support

236

237

HBase tables can be used as dimension tables in lookup joins with configurable caching and retry mechanisms.

238

239

**Lookup Join Example:**

240

241

```sql

242

-- Orders stream joined with customer dimension table in HBase

243

SELECT

244

o.order_id,

245

o.amount,

246

c.customer.name,

247

c.customer.segment

248

FROM orders_stream o

249

JOIN customer_hbase FOR SYSTEM_TIME AS OF o.proc_time AS c

250

ON o.customer_id = c.customer_id;

251

```

252

253

### Caching Configuration

254

255

Lookup operations support configurable caching to reduce HBase load and improve performance.

256

257

```sql

258

-- Configure lookup caching

259

CREATE TABLE customer_dim (

260

customer_id STRING,

261

customer ROW<name STRING, segment STRING, region STRING>,

262

PRIMARY KEY (customer_id) NOT ENFORCED

263

) WITH (

264

'connector' = 'hbase-1.4',

265

'table-name' = 'customers',

266

'zookeeper.quorum' = 'localhost:2181',

267

'lookup.cache.max-rows' = '50000',

268

'lookup.cache.ttl' = '10min',

269

'lookup.max-retries' = '3'

270

);

271

```

272

273

## Error Handling and Resilience

274

275

### Connection Management

276

277

The source operations include comprehensive error handling for connection failures and timeouts.

278

279

**Automatic Recovery Features:**

280

281

- Connection retry with exponential backoff

282

- Scanner recreation on timeout

283

- Region failover handling

284

- Checkpoint-based recovery

285

286

### Exception Types

287

288

```java

289

// Common exceptions and their handling:

290

291

// TableNotFoundException: Thrown when HBase table doesn't exist

292

try {

293

// Table operations

294

} catch (TableNotFoundException e) {

295

throw new RuntimeException("HBase table '" + tableName + "' not found.", e);

296

}

297

298

// IOException: Connection and I/O failures

299

// Automatically retried with configurable limits

300

301

// Timeout exceptions: Scanner automatically recreated

302

// Progress tracked by current row key for resumption

303

```

304

305

**Configuration for Resilience:**

306

307

```sql

308

CREATE TABLE resilient_source (

309

-- Table definition

310

) WITH (

311

'connector' = 'hbase-1.4',

312

'table-name' = 'my_table',

313

'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',

314

'lookup.max-retries' = '5',

315

-- Multiple Zookeeper nodes for failover

316

-- Retry configuration for lookup operations

317

);

318

```