0
# Factory Registration
1
2
Factory classes that enable automatic connector discovery through Flink's Service Provider Interface (SPI). These factories handle connector instantiation, configuration validation, and provide the entry points for using the Hive connector in Flink applications.
3
4
## Capabilities
5
6
### Hive Catalog Factory
7
8
Creates HiveCatalog instances for integrating with Hive metastore, enabling unified metadata management across Flink and Hive systems.
9
10
```java { .api }
11
/**
12
* Factory for creating HiveCatalog instances through Flink's catalog discovery mechanism
13
* Factory identifier: "hive"
14
*/
15
public class HiveCatalogFactory implements CatalogFactory {
16
/** Factory identifier constant */
17
public static final String IDENTIFIER = "hive";
18
19
/**
20
* Returns the unique identifier for this catalog factory
21
* @return "hive"
22
*/
23
public String factoryIdentifier();
24
25
/**
26
* Returns the set of configuration options supported by this factory
27
* Required options: default-database
28
* @return Set of required configuration options
29
*/
30
public Set<ConfigOption<?>> requiredOptions();
31
32
/**
33
* Returns the set of optional configuration options
34
* Optional options: hive-conf-dir, hive-version, hadoop-conf-dir, property-version
35
* @return Set of optional configuration options
36
*/
37
public Set<ConfigOption<?>> optionalOptions();
38
39
/**
40
* Creates a new HiveCatalog instance based on the provided context
41
* @param context Factory context containing configuration and class loader
42
* @return Configured HiveCatalog instance
43
*/
44
public Catalog createCatalog(Context context);
45
}
46
```
47
48
**Usage Example:**
49
50
```sql
51
-- SQL DDL usage (automatic factory discovery)
52
CREATE CATALOG hive_catalog WITH (
53
'type' = 'hive',
54
'hive-conf-dir' = '/opt/hive/conf',
55
'hive-version' = '3.1.2',
56
'hadoop-conf-dir' = '/opt/hadoop/conf',
57
'default-database' = 'default'
58
);
59
60
USE CATALOG hive_catalog;
61
```
62
63
**Configuration Options:**
64
65
- `hive-conf-dir` (required): Path to Hive configuration directory containing hive-site.xml
66
- `hive-version` (optional): Hive version for compatibility, defaults to "3.1.2"
67
- `hadoop-conf-dir` (optional): Path to Hadoop configuration directory
68
- `default-database` (optional): Default database name, defaults to "default"
69
70
### Hive Dynamic Table Factory
71
72
Creates HiveTableSource and HiveTableSink instances using Flink's modern dynamic table interface, supporting both batch and streaming operations.
73
74
```java { .api }
75
/**
76
* Dynamic table factory for creating Hive table sources and sinks
77
* Factory identifier: "hive"
78
*/
79
public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
80
/** Factory identifier constant */
81
public static final String IDENTIFIER = "hive";
82
83
/**
84
* Constructor requiring HiveConf instance
85
* @param hiveConf Hive configuration instance
86
*/
87
public HiveDynamicTableFactory(HiveConf hiveConf);
88
89
/**
90
* Returns the unique identifier for this table factory
91
* @return "hive"
92
*/
93
public String factoryIdentifier();
94
95
/**
96
* Returns required configuration options
97
* @return Set of required configuration options
98
*/
99
public Set<ConfigOption<?>> requiredOptions();
100
101
/**
102
* Returns optional configuration options
103
* @return Set of optional configuration options
104
*/
105
public Set<ConfigOption<?>> optionalOptions();
106
107
/**
108
* Creates a DynamicTableSource for reading from Hive tables
109
* @param context Factory context with table schema and options
110
* @return Configured HiveTableSource instance
111
*/
112
public DynamicTableSource createDynamicTableSource(Context context);
113
114
/**
115
* Creates a DynamicTableSink for writing to Hive tables
116
* @param context Factory context with table schema and options
117
* @return Configured HiveTableSink instance
118
*/
119
public DynamicTableSink createDynamicTableSink(Context context);
120
}
121
```
122
123
**Usage Example:**
124
125
```sql
126
-- Creating a Hive table with streaming source enabled
127
CREATE TABLE hive_stream_table (
128
id BIGINT,
129
name STRING,
130
event_time TIMESTAMP(3),
131
partition_date STRING
132
) PARTITIONED BY (partition_date)
133
WITH (
134
'connector' = 'hive',
135
'streaming-source.enable' = 'true',
136
'streaming-source.partition.include' = 'latest',
137
'streaming-source.monitor-interval' = '1 min'
138
);
139
```
140
141
### Hive Module Factory
142
143
Creates HiveModule instances that provide access to Hive built-in functions within Flink SQL environments.
144
145
```java { .api }
146
/**
147
* Factory for creating HiveModule instances to access Hive built-in functions
148
* Factory identifier: "hive"
149
*/
150
public class HiveModuleFactory implements ModuleFactory {
151
/** Factory identifier constant */
152
public static final String IDENTIFIER = "hive";
153
154
/**
155
* Returns the unique identifier for this module factory
156
* @return "hive"
157
*/
158
public String factoryIdentifier();
159
160
/**
161
* Returns required configuration options
162
* @return Set of required configuration options (empty for HiveModule)
163
*/
164
public Set<ConfigOption<?>> requiredOptions();
165
166
/**
167
* Returns optional configuration options
168
* Optional options: hive-version
169
* @return Set of optional configuration options
170
*/
171
public Set<ConfigOption<?>> optionalOptions();
172
173
/**
174
* Creates a new HiveModule instance
175
* @param context Factory context containing configuration
176
* @return Configured HiveModule instance
177
*/
178
public Module createModule(Context context);
179
}
180
```
181
182
**Usage Example:**
183
184
```sql
185
-- Load Hive module to access Hive functions
186
LOAD MODULE hive WITH ('hive-version' = '3.1.2');
187
188
-- Use Hive built-in functions
189
SELECT id, name, substr(name, 1, 3) as name_prefix FROM my_table;
190
```
191
192
### Hive Dialect Factory
193
194
Creates Hive SQL dialect parser for executing Hive-compatible SQL statements within Flink.
195
196
```java { .api }
197
/**
198
* Factory for creating Hive SQL dialect parser
199
* Factory identifier: "hive"
200
*/
201
public class HiveDialectFactory implements SqlDialectFactory {
202
/**
203
* Returns the unique identifier for this dialect factory
204
* @return "hive"
205
*/
206
public String factoryIdentifier();
207
208
/**
209
* Creates a new SqlDialect instance for parsing Hive SQL
210
* @param context Factory context
211
* @return Configured Hive SQL dialect
212
*/
213
public SqlDialect createSqlDialect(Context context);
214
}
215
```
216
217
**Usage Example:**
218
219
```java
220
// Programmatic usage
221
TableEnvironment tableEnv = TableEnvironment.create(settings);
222
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
223
224
// Execute Hive-compatible SQL
225
tableEnv.executeSql("CREATE TABLE hive_table AS SELECT * FROM source_table");
226
```
227
228
### HiveServer2 Endpoint Factory
229
230
Creates HiveServer2-compatible endpoints for JDBC/ODBC access to Flink through the SQL Gateway.
231
232
```java { .api }
233
/**
234
* Factory for creating HiveServer2-compatible endpoints
235
*/
236
public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory {
237
/**
238
* Returns the unique identifier for this endpoint factory
239
* @return Factory identifier string
240
*/
241
public String factoryIdentifier();
242
243
/**
244
* Creates a new HiveServer2 endpoint
245
* @param context Factory context with configuration
246
* @return Configured HiveServer2 endpoint
247
*/
248
public SqlGatewayEndpoint createSqlGatewayEndpoint(Context context);
249
}
250
```
251
252
**Configuration Options:**
253
254
- `thrift.host`: Thrift server host address
255
- `thrift.port`: Thrift server port number
256
- `thrift.worker.threads.min`: Minimum worker threads
257
- `thrift.worker.threads.max`: Maximum worker threads
258
- `thrift.max.message.size`: Maximum message size
259
260
### Legacy Table Factory
261
262
Provides backward compatibility with older Flink table factory interface.
263
264
```java { .api }
265
/**
266
* Legacy table factory for backward compatibility
267
* @deprecated Use HiveDynamicTableFactory instead
268
*/
269
@Deprecated
270
public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Row> {
271
/**
272
* Creates a table source using legacy interface
273
* @param properties Configuration properties
274
* @return Configured table source
275
*/
276
public TableSource<Row> createTableSource(Map<String, String> properties);
277
278
/**
279
* Creates a table sink using legacy interface
280
* @param properties Configuration properties
281
* @return Configured table sink
282
*/
283
public TableSink<Row> createTableSink(Map<String, String> properties);
284
}
285
```
286
287
## Factory Discovery
288
289
All factories are automatically discovered through Java's Service Provider Interface (SPI) mechanism. The connector JAR includes the necessary service registration files in `META-INF/services/` that enable Flink to find and instantiate the appropriate factory classes based on the specified `type` or `connector` identifier.
290
291
**Service Registration Files:**
292
- `META-INF/services/org.apache.flink.table.factories.Factory`
293
- `META-INF/services/org.apache.flink.table.factories.CatalogFactory`
294
- `META-INF/services/org.apache.flink.table.factories.ModuleFactory`
295
296
This automatic discovery means that simply including the connector JAR in the classpath makes all Hive integration capabilities available without additional configuration steps.