or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdlookup-options.mdsink-operations.mdsource-operations.mdtable-factory.mdwrite-options.md

index.mddocs/

0

# Apache Flink HBase 1.4 Connector

1

2

Apache Flink HBase 1.4 Connector provides comprehensive bidirectional data integration between Apache Flink stream processing framework and HBase 1.4 NoSQL database. The connector enables reading from and writing to HBase tables through Flink's Table API and SQL, with support for exactly-once processing guarantees and high-performance stream processing applications.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-hbase-1.4_2.11

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-connector-hbase-1.4_2.11

11

- **Version**: 1.14.6

12

- **Installation**: Add to Maven dependencies:

13

14

```xml

15

<dependency>

16

<groupId>org.apache.flink</groupId>

17

<artifactId>flink-connector-hbase-1.4_2.11</artifactId>

18

<version>1.14.6</version>

19

</dependency>

20

```

21

22

## Core Imports

23

24

```java

25

import org.apache.flink.connector.hbase1.HBase1DynamicTableFactory;

26

import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource;

27

import org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink;

28

import org.apache.flink.connector.hbase.options.HBaseWriteOptions;

29

import org.apache.flink.connector.hbase.options.HBaseLookupOptions;

30

import org.apache.flink.connector.hbase.table.HBaseConnectorOptions;

31

```

32

33

## Basic Usage

34

35

### Creating HBase Table in Flink SQL

36

37

```sql

38

CREATE TABLE hbase_table (

39

rowkey STRING,

40

family1 ROW<col1 STRING, col2 BIGINT>,

41

family2 ROW<col1 STRING, col2 BOOLEAN>,

42

PRIMARY KEY (rowkey) NOT ENFORCED

43

) WITH (

44

'connector' = 'hbase-1.4',

45

'table-name' = 'my_hbase_table',

46

'zookeeper.quorum' = 'localhost:2181'

47

);

48

```

49

50

### Reading from HBase

51

52

```sql

53

SELECT rowkey, family1.col1, family2.col2

54

FROM hbase_table

55

WHERE family1.col2 > 100;

56

```

57

58

### Writing to HBase

59

60

```sql

61

INSERT INTO hbase_table

62

SELECT rowkey, family1, family2

63

FROM source_table;

64

```

65

66

## Architecture

67

68

The Apache Flink HBase 1.4 Connector is built around several key components:

69

70

- **Dynamic Table Factory**: `HBase1DynamicTableFactory` serves as the main entry point for creating HBase table sources and sinks through Flink's Table API

71

- **Source Integration**: `HBaseDynamicTableSource` and `HBaseRowDataInputFormat` handle reading data from HBase tables with support for region-aware splitting

72

- **Sink Integration**: `HBaseDynamicTableSink` provides buffered write operations with configurable flushing strategies

73

- **Configuration System**: Comprehensive configuration options for connection settings, performance tuning, and caching

74

- **Type System**: Full integration with Flink's type system and automatic serialization/deserialization of HBase data

75

76

## Capabilities

77

78

### Table Factory and Configuration

79

80

Core factory class for creating HBase table sources and sinks with comprehensive configuration options. Handles connector registration and table creation.

81

82

```java { .api }

83

public class HBase1DynamicTableFactory

84

implements DynamicTableSourceFactory, DynamicTableSinkFactory {

85

86

public DynamicTableSource createDynamicTableSource(Context context);

87

public DynamicTableSink createDynamicTableSink(Context context);

88

public String factoryIdentifier(); // Returns "hbase-1.4"

89

public Set<ConfigOption<?>> requiredOptions();

90

public Set<ConfigOption<?>> optionalOptions();

91

}

92

```

93

94

[Table Factory and Configuration](./table-factory.md)

95

96

### Source Operations

97

98

Reading data from HBase tables with support for batch and lookup operations, region-aware splitting, and configurable caching.

99

100

```java { .api }

101

public class HBaseDynamicTableSource extends AbstractHBaseDynamicTableSource {

102

public HBaseDynamicTableSource(

103

Configuration conf,

104

String tableName,

105

HBaseTableSchema hbaseSchema,

106

String nullStringLiteral,

107

HBaseLookupOptions lookupOptions

108

);

109

110

public DynamicTableSource copy();

111

public InputFormat<RowData, ?> getInputFormat();

112

public HBaseLookupOptions getLookupOptions();

113

}

114

```

115

116

[Source Operations](./source-operations.md)

117

118

### Sink Operations

119

120

Writing data to HBase tables with configurable buffering, batching, and exactly-once processing guarantees.

121

122

```java { .api }

123

public class HBaseDynamicTableSink implements DynamicTableSink {

124

public HBaseDynamicTableSink(

125

String tableName,

126

HBaseTableSchema hbaseTableSchema,

127

Configuration hbaseConf,

128

HBaseWriteOptions writeOptions,

129

String nullStringLiteral

130

);

131

132

public SinkRuntimeProvider getSinkRuntimeProvider(Context context);

133

public ChangelogMode getChangelogMode(ChangelogMode requestedMode);

134

public DynamicTableSink copy();

135

}

136

```

137

138

[Sink Operations](./sink-operations.md)

139

140

### Write Options and Performance Tuning

141

142

Configuration options for optimizing write performance through buffering, batching, and parallelism control.

143

144

```java { .api }

145

public class HBaseWriteOptions implements Serializable {

146

public static Builder builder();

147

148

public long getBufferFlushMaxSizeInBytes();

149

public long getBufferFlushMaxRows();

150

public long getBufferFlushIntervalMillis();

151

public Integer getParallelism();

152

}

153

154

public static class Builder {

155

public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes);

156

public Builder setBufferFlushMaxRows(long bufferFlushMaxRows);

157

public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis);

158

public Builder setParallelism(Integer parallelism);

159

public HBaseWriteOptions build();

160

}

161

```

162

163

[Write Options and Performance](./write-options.md)

164

165

### Lookup Options and Caching

166

167

Configuration for lookup join operations with caching, retry mechanisms, and async processing options.

168

169

```java { .api }

170

public class HBaseLookupOptions implements Serializable {

171

public static Builder builder();

172

173

public long getCacheMaxSize();

174

public long getCacheExpireMs();

175

public int getMaxRetryTimes();

176

public boolean getLookupAsync();

177

}

178

179

public static class Builder {

180

public Builder setCacheMaxSize(long cacheMaxSize);

181

public Builder setCacheExpireMs(long cacheExpireMs);

182

public Builder setMaxRetryTimes(int maxRetryTimes);

183

public Builder setLookupAsync(boolean lookupAsync);

184

public HBaseLookupOptions build();

185

}

186

```

187

188

[Lookup Options and Caching](./lookup-options.md)

189

190

## Types

191

192

```java { .api }

193

// Core configuration options

194

public class HBaseConnectorOptions {

195

public static final ConfigOption<String> TABLE_NAME;

196

public static final ConfigOption<String> ZOOKEEPER_QUORUM;

197

public static final ConfigOption<String> ZOOKEEPER_ZNODE_PARENT;

198

public static final ConfigOption<String> NULL_STRING_LITERAL;

199

public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_SIZE;

200

public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS;

201

public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL;

202

public static final ConfigOption<Boolean> LOOKUP_ASYNC;

203

public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS;

204

public static final ConfigOption<Duration> LOOKUP_CACHE_TTL;

205

public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES;

206

public static final ConfigOption<Integer> SINK_PARALLELISM;

207

}

208

209

// Input format for reading HBase data

210

public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> {

211

public HBaseRowDataInputFormat(

212

Configuration conf,

213

String tableName,

214

HBaseTableSchema schema,

215

String nullStringLiteral

216

);

217

}

218

219

// Abstract base class for HBase input formats

220

public abstract class AbstractTableInputFormat<T>

221

extends RichInputFormat<T, TableInputSplit> {

222

223

protected abstract void initTable() throws IOException;

224

protected abstract Scan getScanner();

225

protected abstract String getTableName();

226

protected abstract T mapResultToOutType(Result r);

227

}

228

```