or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdconfiguration-management.mdfactory-registration.mdfunction-module.mdindex.mdlookup-joins.mdtable-sources-sinks.md

factory-registration.mddocs/

0

# Factory Registration

1

2

Factory classes that enable automatic connector discovery through Flink's Service Provider Interface (SPI). These factories handle connector instantiation, configuration validation, and provide the entry points for using the Hive connector in Flink applications.

3

4

## Capabilities

5

6

### Hive Catalog Factory

7

8

Creates HiveCatalog instances for integrating with Hive metastore, enabling unified metadata management across Flink and Hive systems.

9

10

```java { .api }

11

/**

12

* Factory for creating HiveCatalog instances through Flink's catalog discovery mechanism

13

* Factory identifier: "hive"

14

*/

15

public class HiveCatalogFactory implements CatalogFactory {

16

/** Factory identifier constant */

17

public static final String IDENTIFIER = "hive";

18

19

/**

20

* Returns the unique identifier for this catalog factory

21

* @return "hive"

22

*/

23

public String factoryIdentifier();

24

25

/**

26

* Returns the set of configuration options supported by this factory

27

* Required options: default-database

28

* @return Set of required configuration options

29

*/

30

public Set<ConfigOption<?>> requiredOptions();

31

32

/**

33

* Returns the set of optional configuration options

34

* Optional options: hive-conf-dir, hive-version, hadoop-conf-dir, property-version

35

* @return Set of optional configuration options

36

*/

37

public Set<ConfigOption<?>> optionalOptions();

38

39

/**

40

* Creates a new HiveCatalog instance based on the provided context

41

* @param context Factory context containing configuration and class loader

42

* @return Configured HiveCatalog instance

43

*/

44

public Catalog createCatalog(Context context);

45

}

46

```

47

48

**Usage Example:**

49

50

```sql

51

-- SQL DDL usage (automatic factory discovery)

52

CREATE CATALOG hive_catalog WITH (

53

'type' = 'hive',

54

'hive-conf-dir' = '/opt/hive/conf',

55

'hive-version' = '3.1.2',

56

'hadoop-conf-dir' = '/opt/hadoop/conf',

57

'default-database' = 'default'

58

);

59

60

USE CATALOG hive_catalog;

61

```

62

63

**Configuration Options:**

64

65

- `hive-conf-dir` (required): Path to Hive configuration directory containing hive-site.xml

66

- `hive-version` (optional): Hive version for compatibility, defaults to "3.1.2"

67

- `hadoop-conf-dir` (optional): Path to Hadoop configuration directory

68

- `default-database` (optional): Default database name, defaults to "default"

69

70

### Hive Dynamic Table Factory

71

72

Creates HiveTableSource and HiveTableSink instances using Flink's modern dynamic table interface, supporting both batch and streaming operations.

73

74

```java { .api }

75

/**

76

* Dynamic table factory for creating Hive table sources and sinks

77

* Factory identifier: "hive"

78

*/

79

public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

80

/** Factory identifier constant */

81

public static final String IDENTIFIER = "hive";

82

83

/**

84

* Constructor requiring HiveConf instance

85

* @param hiveConf Hive configuration instance

86

*/

87

public HiveDynamicTableFactory(HiveConf hiveConf);

88

89

/**

90

* Returns the unique identifier for this table factory

91

* @return "hive"

92

*/

93

public String factoryIdentifier();

94

95

/**

96

* Returns required configuration options

97

* @return Set of required configuration options

98

*/

99

public Set<ConfigOption<?>> requiredOptions();

100

101

/**

102

* Returns optional configuration options

103

* @return Set of optional configuration options

104

*/

105

public Set<ConfigOption<?>> optionalOptions();

106

107

/**

108

* Creates a DynamicTableSource for reading from Hive tables

109

* @param context Factory context with table schema and options

110

* @return Configured HiveTableSource instance

111

*/

112

public DynamicTableSource createDynamicTableSource(Context context);

113

114

/**

115

* Creates a DynamicTableSink for writing to Hive tables

116

* @param context Factory context with table schema and options

117

* @return Configured HiveTableSink instance

118

*/

119

public DynamicTableSink createDynamicTableSink(Context context);

120

}

121

```

122

123

**Usage Example:**

124

125

```sql

126

-- Creating a Hive table with streaming source enabled

127

CREATE TABLE hive_stream_table (

128

id BIGINT,

129

name STRING,

130

event_time TIMESTAMP(3),

131

partition_date STRING

132

) PARTITIONED BY (partition_date)

133

WITH (

134

'connector' = 'hive',

135

'streaming-source.enable' = 'true',

136

'streaming-source.partition.include' = 'latest',

137

'streaming-source.monitor-interval' = '1 min'

138

);

139

```

140

141

### Hive Module Factory

142

143

Creates HiveModule instances that provide access to Hive built-in functions within Flink SQL environments.

144

145

```java { .api }

146

/**

147

* Factory for creating HiveModule instances to access Hive built-in functions

148

* Factory identifier: "hive"

149

*/

150

public class HiveModuleFactory implements ModuleFactory {

151

/** Factory identifier constant */

152

public static final String IDENTIFIER = "hive";

153

154

/**

155

* Returns the unique identifier for this module factory

156

* @return "hive"

157

*/

158

public String factoryIdentifier();

159

160

/**

161

* Returns required configuration options

162

* @return Set of required configuration options (empty for HiveModule)

163

*/

164

public Set<ConfigOption<?>> requiredOptions();

165

166

/**

167

* Returns optional configuration options

168

* Optional options: hive-version

169

* @return Set of optional configuration options

170

*/

171

public Set<ConfigOption<?>> optionalOptions();

172

173

/**

174

* Creates a new HiveModule instance

175

* @param context Factory context containing configuration

176

* @return Configured HiveModule instance

177

*/

178

public Module createModule(Context context);

179

}

180

```

181

182

**Usage Example:**

183

184

```sql

185

-- Load Hive module to access Hive functions

186

LOAD MODULE hive WITH ('hive-version' = '3.1.2');

187

188

-- Use Hive built-in functions

189

SELECT id, name, substr(name, 1, 3) as name_prefix FROM my_table;

190

```

191

192

### Hive Dialect Factory

193

194

Creates Hive SQL dialect parser for executing Hive-compatible SQL statements within Flink.

195

196

```java { .api }

197

/**

198

* Factory for creating Hive SQL dialect parser

199

* Factory identifier: "hive"

200

*/

201

public class HiveDialectFactory implements SqlDialectFactory {

202

/**

203

* Returns the unique identifier for this dialect factory

204

* @return "hive"

205

*/

206

public String factoryIdentifier();

207

208

/**

209

* Creates a new SqlDialect instance for parsing Hive SQL

210

* @param context Factory context

211

* @return Configured Hive SQL dialect

212

*/

213

public SqlDialect createSqlDialect(Context context);

214

}

215

```

216

217

**Usage Example:**

218

219

```java

220

// Programmatic usage

221

TableEnvironment tableEnv = TableEnvironment.create(settings);

222

tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

223

224

// Execute Hive-compatible SQL

225

tableEnv.executeSql("CREATE TABLE hive_table AS SELECT * FROM source_table");

226

```

227

228

### HiveServer2 Endpoint Factory

229

230

Creates HiveServer2-compatible endpoints for JDBC/ODBC access to Flink through the SQL Gateway.

231

232

```java { .api }

233

/**

234

* Factory for creating HiveServer2-compatible endpoints

235

*/

236

public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory {

237

/**

238

* Returns the unique identifier for this endpoint factory

239

* @return Factory identifier string

240

*/

241

public String factoryIdentifier();

242

243

/**

244

* Creates a new HiveServer2 endpoint

245

* @param context Factory context with configuration

246

* @return Configured HiveServer2 endpoint

247

*/

248

public SqlGatewayEndpoint createSqlGatewayEndpoint(Context context);

249

}

250

```

251

252

**Configuration Options:**

253

254

- `thrift.host`: Thrift server host address

255

- `thrift.port`: Thrift server port number

256

- `thrift.worker.threads.min`: Minimum worker threads

257

- `thrift.worker.threads.max`: Maximum worker threads

258

- `thrift.max.message.size`: Maximum message size

259

260

### Legacy Table Factory

261

262

Provides backward compatibility with older Flink table factory interface.

263

264

```java { .api }

265

/**

266

* Legacy table factory for backward compatibility

267

* @deprecated Use HiveDynamicTableFactory instead

268

*/

269

@Deprecated

270

public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Row> {

271

/**

272

* Creates a table source using legacy interface

273

* @param properties Configuration properties

274

* @return Configured table source

275

*/

276

public TableSource<Row> createTableSource(Map<String, String> properties);

277

278

/**

279

* Creates a table sink using legacy interface

280

* @param properties Configuration properties

281

* @return Configured table sink

282

*/

283

public TableSink<Row> createTableSink(Map<String, String> properties);

284

}

285

```

286

287

## Factory Discovery

288

289

All factories are automatically discovered through Java's Service Provider Interface (SPI) mechanism. The connector JAR includes the necessary service registration files in `META-INF/services/` that enable Flink to find and instantiate the appropriate factory classes based on the specified `type` or `connector` identifier.

290

291

**Service Registration Files:**

292

- `META-INF/services/org.apache.flink.table.factories.Factory`

293

- `META-INF/services/org.apache.flink.table.factories.CatalogFactory`

294

- `META-INF/services/org.apache.flink.table.factories.ModuleFactory`

295

296

This automatic discovery means that simply including the connector JAR in the classpath makes all Hive integration capabilities available without additional configuration steps.