0
# Source Operations
1
2
Reading data from HBase tables with support for batch and lookup operations, region-aware splitting, and configurable caching in the Apache Flink HBase 1.4 Connector.
3
4
## Capabilities
5
6
### HBaseDynamicTableSource
7
8
Table source implementation that enables reading data from HBase tables through Flink's Table API and SQL.
9
10
```java { .api }
11
/**
12
* HBase table source implementation for reading data from HBase tables
13
* Extends AbstractHBaseDynamicTableSource with HBase 1.4 specific functionality
14
*/
15
@Internal
16
public class HBaseDynamicTableSource extends AbstractHBaseDynamicTableSource {
17
18
/**
19
* Creates a new HBase dynamic table source
20
* @param conf Hadoop configuration for HBase connection
21
* @param tableName Name of the HBase table to read from
22
* @param hbaseSchema Schema mapping for the HBase table
23
* @param nullStringLiteral String representation for null values
24
* @param lookupOptions Configuration for lookup operations and caching
25
*/
26
public HBaseDynamicTableSource(
27
Configuration conf,
28
String tableName,
29
HBaseTableSchema hbaseSchema,
30
String nullStringLiteral,
31
HBaseLookupOptions lookupOptions
32
);
33
34
/**
35
* Creates a copy of this table source for parallel execution
36
* @return New HBaseDynamicTableSource instance with same configuration
37
*/
38
public DynamicTableSource copy();
39
40
/**
41
* Returns the input format for reading HBase data
42
* @return HBaseRowDataInputFormat configured for this table
43
*/
44
public InputFormat<RowData, ?> getInputFormat();
45
46
/**
47
* Returns the lookup options configuration for testing purposes
48
* @return HBaseLookupOptions instance with caching and retry settings
49
*/
50
@VisibleForTesting
51
public HBaseLookupOptions getLookupOptions();
52
}
53
```
54
55
**Usage Example:**
56
57
```java
58
// Example: Reading from HBase table in Flink job
59
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
60
TableEnvironment tableEnv = StreamTableEnvironment.create(env);
61
62
// Create HBase source table
63
tableEnv.executeSql(
64
"CREATE TABLE user_profiles (" +
65
" user_id STRING," +
66
" profile ROW<name STRING, age INT, email STRING>," +
67
" activity ROW<last_login TIMESTAMP(3), login_count BIGINT>," +
68
" PRIMARY KEY (user_id) NOT ENFORCED" +
69
") WITH (" +
70
" 'connector' = 'hbase-1.4'," +
71
" 'table-name' = 'user_table'," +
72
" 'zookeeper.quorum' = 'localhost:2181'" +
73
")"
74
);
75
76
// Query HBase data
77
Table result = tableEnv.sqlQuery(
78
"SELECT user_id, profile.name, activity.login_count " +
79
"FROM user_profiles " +
80
"WHERE activity.login_count > 10"
81
);
82
```
83
84
### HBaseRowDataInputFormat
85
86
Input format implementation that handles the actual reading of data from HBase tables and conversion to Flink's RowData format.
87
88
```java { .api }
89
/**
90
* InputFormat subclass that wraps access for HBase tables
91
* Returns results as RowData for integration with Flink's Table API
92
*/
93
public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> {
94
95
/**
96
* Creates a new HBase row data input format
97
* @param conf Hadoop configuration for HBase connection
98
* @param tableName Name of the HBase table to read from
99
* @param schema HBase table schema for data conversion
100
* @param nullStringLiteral String representation for null values
101
*/
102
public HBaseRowDataInputFormat(
103
Configuration conf,
104
String tableName,
105
HBaseTableSchema schema,
106
String nullStringLiteral
107
);
108
109
/**
110
* Initializes the table connection and serialization components
111
* @throws IOException if connection cannot be established
112
*/
113
protected void initTable() throws IOException;
114
115
/**
116
* Creates an HBase Scan object for reading data
117
* @return Configured Scan object for the table
118
*/
119
protected Scan getScanner();
120
121
/**
122
* Returns the name of the HBase table being read
123
* @return Table name as configured
124
*/
125
public String getTableName();
126
127
/**
128
* Converts HBase Result to Flink RowData format
129
* @param res HBase Result object from table scan
130
* @return RowData representation of the HBase row
131
*/
132
protected RowData mapResultToOutType(Result res);
133
}
134
```
135
136
### AbstractTableInputFormat
137
138
Base class providing common functionality for all HBase input formats, including connection management, splitting, and error handling.
139
140
```java { .api }
141
/**
142
* Abstract InputFormat to read data from HBase tables
143
* Provides common functionality for connection management and data reading
144
*/
145
@Internal
146
public abstract class AbstractTableInputFormat<T>
147
extends RichInputFormat<T, TableInputSplit> {
148
149
/**
150
* Opens a connection for reading a specific table split
151
* @param split The input split to read from
152
* @throws IOException if the split cannot be opened
153
*/
154
public void open(TableInputSplit split) throws IOException;
155
156
/**
157
* Reads the next record from the HBase table
158
* @param reuse Reusable object for the result (can be null)
159
* @return Next record of type T, or null if end is reached
160
* @throws IOException if reading fails
161
*/
162
public T nextRecord(T reuse) throws IOException;
163
164
/**
165
* Checks if the end of the input has been reached
166
* @return true if no more records are available
167
* @throws IOException if status check fails
168
*/
169
public boolean reachedEnd() throws IOException;
170
171
/**
172
* Closes all connections and resources
173
* @throws IOException if cleanup fails
174
*/
175
public void close() throws IOException;
176
177
/**
178
* Creates input splits for parallel reading based on HBase regions
179
* @param minNumSplits Minimum number of splits to create
180
* @return Array of TableInputSplit objects for parallel execution
181
* @throws IOException if split creation fails
182
*/
183
public TableInputSplit[] createInputSplits(int minNumSplits) throws IOException;
184
185
/**
186
* Returns an input split assigner for the given splits
187
* @param inputSplits Array of input splits to assign
188
* @return InputSplitAssigner for managing split distribution
189
*/
190
public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits);
191
192
/**
193
* Returns statistics about the input data (not implemented)
194
* @param cachedStatistics Previously cached statistics
195
* @return null (statistics not supported)
196
*/
197
public BaseStatistics getStatistics(BaseStatistics cachedStatistics);
198
199
// Abstract methods to be implemented by subclasses
200
protected abstract void initTable() throws IOException;
201
protected abstract Scan getScanner();
202
protected abstract String getTableName();
203
protected abstract T mapResultToOutType(Result r);
204
}
205
```
206
207
## Region-Aware Splitting
208
209
The HBase connector automatically creates input splits based on HBase table regions for optimal parallel processing.
210
211
**Split Creation Process:**
212
213
1. **Region Discovery**: Queries HBase for region start/end keys
214
2. **Scan Range Mapping**: Maps Flink scan ranges to HBase regions
215
3. **Split Generation**: Creates one split per relevant region
216
4. **Locality Preservation**: Assigns splits to nodes hosting the regions
217
218
**Performance Benefits:**
219
220
- Parallel reading across multiple regions
221
- Data locality optimization
222
- Automatic load balancing
223
- Scan pushdown to HBase region servers
224
225
```java
226
// Example: The connector automatically handles splitting
227
// No manual configuration required - splits are created based on:
228
// - HBase table regions
229
// - Scan start/stop keys
230
// - Region server locations
231
```
232
233
## Lookup Operations
234
235
### Lookup Join Support
236
237
HBase tables can be used as dimension tables in lookup joins with configurable caching and retry mechanisms.
238
239
**Lookup Join Example:**
240
241
```sql
242
-- Orders stream joined with customer dimension table in HBase
243
SELECT
244
o.order_id,
245
o.amount,
246
c.customer.name,
247
c.customer.segment
248
FROM orders_stream o
249
JOIN customer_hbase FOR SYSTEM_TIME AS OF o.proc_time AS c
250
ON o.customer_id = c.customer_id;
251
```
252
253
### Caching Configuration
254
255
Lookup operations support configurable caching to reduce HBase load and improve performance.
256
257
```sql
258
-- Configure lookup caching
259
CREATE TABLE customer_dim (
260
customer_id STRING,
261
customer ROW<name STRING, segment STRING, region STRING>,
262
PRIMARY KEY (customer_id) NOT ENFORCED
263
) WITH (
264
'connector' = 'hbase-1.4',
265
'table-name' = 'customers',
266
'zookeeper.quorum' = 'localhost:2181',
267
'lookup.cache.max-rows' = '50000',
268
'lookup.cache.ttl' = '10min',
269
'lookup.max-retries' = '3'
270
);
271
```
272
273
## Error Handling and Resilience
274
275
### Connection Management
276
277
The source operations include comprehensive error handling for connection failures and timeouts.
278
279
**Automatic Recovery Features:**
280
281
- Connection retry with exponential backoff
282
- Scanner recreation on timeout
283
- Region failover handling
284
- Checkpoint-based recovery
285
286
### Exception Types
287
288
```java
289
// Common exceptions and their handling:
290
291
// TableNotFoundException: Thrown when HBase table doesn't exist
292
try {
293
// Table operations
294
} catch (TableNotFoundException e) {
295
throw new RuntimeException("HBase table '" + tableName + "' not found.", e);
296
}
297
298
// IOException: Connection and I/O failures
299
// Automatically retried with configurable limits
300
301
// Timeout exceptions: Scanner automatically recreated
302
// Progress tracked by current row key for resumption
303
```
304
305
**Configuration for Resilience:**
306
307
```sql
308
CREATE TABLE resilient_source (
309
-- Table definition
310
) WITH (
311
'connector' = 'hbase-1.4',
312
'table-name' = 'my_table',
313
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
314
'lookup.max-retries' = '5',
315
-- Multiple Zookeeper nodes for failover
316
-- Retry configuration for lookup operations
317
);
318
```