or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdlookup-options.mdsink-operations.mdsource-operations.mdtable-factory.mdwrite-options.md

table-factory.mddocs/

0

# Table Factory and Configuration

1

2

Core factory class for creating HBase table sources and sinks with comprehensive configuration options for the Apache Flink HBase 1.4 Connector.

3

4

## Capabilities

5

6

### HBase1DynamicTableFactory

7

8

Main factory class that implements both `DynamicTableSourceFactory` and `DynamicTableSinkFactory` interfaces to provide HBase table integration with Flink's Table API.

9

10

```java { .api }

11

/**

12

* HBase connector factory for creating dynamic table sources and sinks

13

* Supports connector identifier "hbase-1.4" in Flink SQL DDL statements

14

*/

15

@Internal

16

public class HBase1DynamicTableFactory

17

implements DynamicTableSourceFactory, DynamicTableSinkFactory {

18

19

/**

20

* Creates a dynamic table source for reading from HBase tables

21

* @param context Factory context containing catalog table information and options

22

* @return HBaseDynamicTableSource configured for the specified table

23

*/

24

public DynamicTableSource createDynamicTableSource(Context context);

25

26

/**

27

* Creates a dynamic table sink for writing to HBase tables

28

* @param context Factory context containing catalog table information and options

29

* @return HBaseDynamicTableSink configured for the specified table

30

*/

31

public DynamicTableSink createDynamicTableSink(Context context);

32

33

/**

34

* Returns the unique identifier for this connector factory

35

* @return String identifier "hbase-1.4" used in CREATE TABLE statements

36

*/

37

public String factoryIdentifier();

38

39

/**

40

* Returns the set of required configuration options

41

* @return Set containing TABLE_NAME and ZOOKEEPER_QUORUM options

42

*/

43

public Set<ConfigOption<?>> requiredOptions();

44

45

/**

46

* Returns the set of optional configuration options

47

* @return Set containing all optional configuration parameters

48

*/

49

public Set<ConfigOption<?>> optionalOptions();

50

}

51

```

52

53

**Usage Example:**

54

55

```sql

56

CREATE TABLE my_hbase_table (

57

rowkey STRING,

58

cf1 ROW<col1 STRING, col2 BIGINT>,

59

cf2 ROW<status BOOLEAN, timestamp TIMESTAMP(3)>,

60

PRIMARY KEY (rowkey) NOT ENFORCED

61

) WITH (

62

'connector' = 'hbase-1.4',

63

'table-name' = 'my_table',

64

'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',

65

'zookeeper.znode.parent' = '/hbase'

66

);

67

```

68

69

## Configuration Options

70

71

### Required Options

72

73

Configuration parameters that must be specified when creating HBase tables.

74

75

```java { .api }

76

// Required: Name of the HBase table to connect to

77

public static final ConfigOption<String> TABLE_NAME =

78

ConfigOptions.key("table-name")

79

.stringType()

80

.noDefaultValue()

81

.withDescription("The name of HBase table to connect.");

82

83

// Required: HBase Zookeeper quorum for cluster connection

84

public static final ConfigOption<String> ZOOKEEPER_QUORUM =

85

ConfigOptions.key("zookeeper.quorum")

86

.stringType()

87

.noDefaultValue()

88

.withDescription("The HBase Zookeeper quorum.");

89

```

90

91

### Optional Options

92

93

Configuration parameters with default values that can be customized for specific use cases.

94

95

```java { .api }

96

// Optional: Zookeeper root directory for HBase cluster

97

public static final ConfigOption<String> ZOOKEEPER_ZNODE_PARENT =

98

ConfigOptions.key("zookeeper.znode.parent")

99

.stringType()

100

.defaultValue("/hbase")

101

.withDescription("The root dir in Zookeeper for HBase cluster.");

102

103

// Optional: Null value representation for string fields

104

public static final ConfigOption<String> NULL_STRING_LITERAL =

105

ConfigOptions.key("null-string-literal")

106

.stringType()

107

.defaultValue("null")

108

.withDescription("Representation for null values for string fields.");

109

110

// Optional: Maximum buffer size for write operations

111

public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_SIZE =

112

ConfigOptions.key("sink.buffer-flush.max-size")

113

.memoryType()

114

.defaultValue(MemorySize.parse("2mb"))

115

.withDescription("Maximum size in memory of buffered rows for each writing request.");

116

117

// Optional: Maximum number of buffered rows

118

public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =

119

ConfigOptions.key("sink.buffer-flush.max-rows")

120

.intType()

121

.defaultValue(1000)

122

.withDescription("Maximum number of rows to buffer for each writing request.");

123

124

// Optional: Buffer flush interval

125

public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =

126

ConfigOptions.key("sink.buffer-flush.interval")

127

.durationType()

128

.defaultValue(Duration.ofSeconds(1))

129

.withDescription("The interval to flush any buffered rows.");

130

131

// Optional: Sink operator parallelism

132

public static final ConfigOption<Integer> SINK_PARALLELISM =

133

FactoryUtil.SINK_PARALLELISM;

134

135

// Optional: Enable async lookup operations

136

public static final ConfigOption<Boolean> LOOKUP_ASYNC =

137

ConfigOptions.key("lookup.async")

138

.booleanType()

139

.defaultValue(false)

140

.withDescription("whether to set async lookup.");

141

142

// Optional: Maximum lookup cache size

143

public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =

144

ConfigOptions.key("lookup.cache.max-rows")

145

.longType()

146

.defaultValue(-1L)

147

.withDescription("the max number of rows of lookup cache.");

148

149

// Optional: Lookup cache time-to-live

150

public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =

151

ConfigOptions.key("lookup.cache.ttl")

152

.durationType()

153

.defaultValue(Duration.ofSeconds(0))

154

.withDescription("the cache time to live.");

155

156

// Optional: Maximum lookup retry attempts

157

public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =

158

ConfigOptions.key("lookup.max-retries")

159

.intType()

160

.defaultValue(3)

161

.withDescription("the max retry times if lookup database failed.");

162

```

163

164

**Configuration Example:**

165

166

```sql

167

CREATE TABLE orders (

168

order_id STRING,

169

customer ROW<id STRING, name STRING>,

170

order_details ROW<amount DECIMAL(10,2), status STRING, created_at TIMESTAMP(3)>,

171

PRIMARY KEY (order_id) NOT ENFORCED

172

) WITH (

173

'connector' = 'hbase-1.4',

174

'table-name' = 'orders_table',

175

'zookeeper.quorum' = 'zk1:2181,zk2:2181',

176

'zookeeper.znode.parent' = '/hbase',

177

'null-string-literal' = 'NULL',

178

'sink.buffer-flush.max-size' = '4mb',

179

'sink.buffer-flush.max-rows' = '2000',

180

'sink.buffer-flush.interval' = '2s',

181

'lookup.cache.max-rows' = '10000',

182

'lookup.cache.ttl' = '5min',

183

'lookup.max-retries' = '5'

184

);

185

```

186

187

## Integration with Flink Table API

188

189

### Table Schema Requirements

190

191

HBase tables in Flink require a primary key definition that maps to the HBase row key. The schema supports column families through nested ROW types.

192

193

**Schema Validation:**

194

195

- Primary key must be defined (maps to HBase row key)

196

- Column families represented as ROW types

197

- All Flink data types supported through serialization

198

199

**Supported Data Types:**

200

201

- Primitive types: STRING, BIGINT, INT, DOUBLE, FLOAT, BOOLEAN, etc.

202

- Timestamp types: TIMESTAMP(3), TIMESTAMP_LTZ(3)

203

- Complex types: ROW (for column families), ARRAY, MAP

204

- Decimal types: DECIMAL(precision, scale)

205

206

### Error Handling

207

208

The factory provides comprehensive error handling and validation:

209

210

- **Table validation**: Ensures primary key is properly defined

211

- **Configuration validation**: Validates required options are present

212

- **Connection validation**: Verifies HBase cluster connectivity

213

- **Schema validation**: Ensures schema compatibility with HBase storage model