0
# Catalog Integration
1
2
Complete Hive metastore integration providing schema discovery, table management, and metadata operations. The catalog integration enables transparent access to existing Hive data warehouses with full support for databases, tables, partitions, and metadata management.
3
4
## Capabilities
5
6
### HiveCatalog
7
8
Main catalog implementation that integrates with Hive metastore for comprehensive metadata operations.
9
10
```java { .api }
11
/**
12
* Hive catalog implementation providing full metastore integration
13
* Extends AbstractCatalog to provide Flink-compatible catalog operations
14
*/
15
public class HiveCatalog extends AbstractCatalog {
16
17
/**
18
* Create HiveCatalog instance
19
* @param catalogName Name for this catalog instance
20
* @param defaultDatabase Default database to use
21
* @param hiveConfDir Directory containing hive-site.xml (optional)
22
* @param hadoopConfDir Directory containing Hadoop configuration (optional)
23
* @param hiveVersion Hive version for compatibility (e.g., "2.3.9")
24
*/
25
public HiveCatalog(String catalogName, String defaultDatabase,
26
String hiveConfDir, String hadoopConfDir, String hiveVersion);
27
28
// Database operations
29
/**
30
* List all databases in the Hive metastore
31
* @return List of database names
32
* @throws DatabaseNotExistException If catalog is not accessible
33
* @throws CatalogException If operation fails
34
*/
35
public List<String> listDatabases() throws DatabaseNotExistException, CatalogException;
36
37
/**
38
* Check if database exists
39
* @param databaseName Database name to check
40
* @return true if database exists
41
* @throws CatalogException If operation fails
42
*/
43
public boolean databaseExists(String databaseName) throws CatalogException;
44
45
/**
46
* Create new database
47
* @param databaseName Name of database to create
48
* @param database Database metadata
49
* @param ignoreIfExists Whether to ignore if database already exists
50
* @throws DatabaseAlreadyExistException If database exists and ignoreIfExists is false
51
* @throws CatalogException If operation fails
52
*/
53
public void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists)
54
throws DatabaseAlreadyExistException, CatalogException;
55
56
/**
57
* Get database metadata
58
* @param databaseName Database name
59
* @return Database metadata
60
* @throws DatabaseNotExistException If database doesn't exist
61
* @throws CatalogException If operation fails
62
*/
63
public CatalogDatabase getDatabase(String databaseName)
64
throws DatabaseNotExistException, CatalogException;
65
66
// Table operations
67
/**
68
* List all tables in a database
69
* @param databaseName Database name
70
* @return List of table names
71
* @throws DatabaseNotExistException If database doesn't exist
72
* @throws CatalogException If operation fails
73
*/
74
public List<String> listTables(String databaseName)
75
throws DatabaseNotExistException, CatalogException;
76
77
/**
78
* List tables matching a pattern
79
* @param databaseName Database name
80
* @param tableNamePattern SQL-like pattern for table names
81
* @return List of matching table names
82
* @throws DatabaseNotExistException If database doesn't exist
83
* @throws CatalogException If operation fails
84
*/
85
public List<String> listTables(String databaseName, String tableNamePattern)
86
throws DatabaseNotExistException, CatalogException;
87
88
/**
89
* Check if table exists
90
* @param tablePath Table path (database.table)
91
* @return true if table exists
92
* @throws CatalogException If operation fails
93
*/
94
public boolean tableExists(ObjectPath tablePath) throws CatalogException;
95
96
/**
97
* Get table metadata and schema
98
* @param tablePath Table path (database.table)
99
* @return Table metadata including schema
100
* @throws TableNotExistException If table doesn't exist
101
* @throws CatalogException If operation fails
102
*/
103
public CatalogBaseTable getTable(ObjectPath tablePath)
104
throws TableNotExistException, CatalogException;
105
106
/**
107
* Create new table
108
* @param tablePath Table path (database.table)
109
* @param table Table metadata and schema
110
* @param ignoreIfExists Whether to ignore if table already exists
111
* @throws TableAlreadyExistException If table exists and ignoreIfExists is false
112
* @throws DatabaseNotExistException If database doesn't exist
113
* @throws CatalogException If operation fails
114
*/
115
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
116
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;
117
118
/**
119
* Drop table
120
* @param tablePath Table path (database.table)
121
* @param ignoreIfNotExists Whether to ignore if table doesn't exist
122
* @throws TableNotExistException If table doesn't exist and ignoreIfNotExists is false
123
* @throws CatalogException If operation fails
124
*/
125
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
126
throws TableNotExistException, CatalogException;
127
128
/**
129
* Rename table
130
* @param tablePath Current table path
131
* @param newTableName New table name
132
* @throws TableNotExistException If table doesn't exist
133
* @throws TableAlreadyExistException If new name already exists
134
* @throws CatalogException If operation fails
135
*/
136
public void renameTable(ObjectPath tablePath, String newTableName)
137
throws TableNotExistException, TableAlreadyExistException, CatalogException;
138
139
// Partition operations
140
/**
141
* List all partitions for a partitioned table
142
* @param tablePath Table path (database.table)
143
* @return List of partition specifications
144
* @throws TableNotExistException If table doesn't exist
145
* @throws TableNotPartitionedException If table is not partitioned
146
* @throws CatalogException If operation fails
147
*/
148
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
149
throws TableNotExistException, TableNotPartitionedException, CatalogException;
150
151
/**
152
* List partitions matching partial specification
153
* @param tablePath Table path (database.table)
154
* @param partitionSpec Partial partition specification
155
* @return List of matching partition specifications
156
* @throws TableNotExistException If table doesn't exist
157
* @throws TableNotPartitionedException If table is not partitioned
158
* @throws PartitionSpecInvalidException If partition spec is invalid
159
* @throws CatalogException If operation fails
160
*/
161
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath,
162
CatalogPartitionSpec partitionSpec)
163
throws TableNotExistException, TableNotPartitionedException,
164
PartitionSpecInvalidException, CatalogException;
165
166
/**
167
* Get partition metadata
168
* @param tablePath Table path (database.table)
169
* @param partitionSpec Partition specification
170
* @return Partition metadata
171
* @throws PartitionNotExistException If partition doesn't exist
172
* @throws CatalogException If operation fails
173
*/
174
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
175
throws PartitionNotExistException, CatalogException;
176
177
/**
178
* Check if partition exists
179
* @param tablePath Table path (database.table)
180
* @param partitionSpec Partition specification
181
* @return true if partition exists
182
* @throws CatalogException If operation fails
183
*/
184
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
185
throws CatalogException;
186
187
/**
188
* Create new partition
189
* @param tablePath Table path (database.table)
190
* @param partitionSpec Partition specification
191
* @param partition Partition metadata
192
* @param ignoreIfExists Whether to ignore if partition already exists
193
* @throws TableNotExistException If table doesn't exist
194
* @throws TableNotPartitionedException If table is not partitioned
195
* @throws PartitionSpecInvalidException If partition spec is invalid
196
* @throws PartitionAlreadyExistsException If partition exists and ignoreIfExists is false
197
* @throws CatalogException If operation fails
198
*/
199
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
200
CatalogPartition partition, boolean ignoreIfExists)
201
throws TableNotExistException, TableNotPartitionedException,
202
PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException;
203
204
/**
205
* Drop partition
206
* @param tablePath Table path (database.table)
207
* @param partitionSpec Partition specification
208
* @param ignoreIfNotExists Whether to ignore if partition doesn't exist
209
* @throws PartitionNotExistException If partition doesn't exist and ignoreIfNotExists is false
210
* @throws CatalogException If operation fails
211
*/
212
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
213
boolean ignoreIfNotExists)
214
throws PartitionNotExistException, CatalogException;
215
216
// Function operations
217
/**
218
* List user-defined functions in database
219
* @param databaseName Database name
220
* @return List of function names
221
* @throws DatabaseNotExistException If database doesn't exist
222
* @throws CatalogException If operation fails
223
*/
224
public List<String> listFunctions(String databaseName)
225
throws DatabaseNotExistException, CatalogException;
226
227
/**
228
* Get function metadata
229
* @param functionPath Function path (database.function)
230
* @return Function metadata
231
* @throws FunctionNotExistException If function doesn't exist
232
* @throws CatalogException If operation fails
233
*/
234
public CatalogFunction getFunction(ObjectPath functionPath)
235
throws FunctionNotExistException, CatalogException;
236
237
/**
238
* Check if function exists
239
* @param functionPath Function path (database.function)
240
* @return true if function exists
241
* @throws CatalogException If operation fails
242
*/
243
public boolean functionExists(ObjectPath functionPath) throws CatalogException;
244
245
/**
246
* Create user-defined function
247
* @param functionPath Function path (database.function)
248
* @param function Function metadata
249
* @param ignoreIfExists Whether to ignore if function already exists
250
* @throws FunctionAlreadyExistException If function exists and ignoreIfExists is false
251
* @throws DatabaseNotExistException If database doesn't exist
252
* @throws CatalogException If operation fails
253
*/
254
public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
255
throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;
256
257
/**
258
* Alter function metadata
259
* @param functionPath Function path (database.function)
260
* @param newFunction New function metadata
261
* @param ignoreIfNotExists Whether to ignore if function doesn't exist
262
* @throws FunctionNotExistException If function doesn't exist and ignoreIfNotExists is false
263
* @throws CatalogException If operation fails
264
*/
265
public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
266
throws FunctionNotExistException, CatalogException;
267
268
/**
269
* Drop function
270
* @param functionPath Function path (database.function)
271
* @param ignoreIfNotExists Whether to ignore if function doesn't exist
272
* @throws FunctionNotExistException If function doesn't exist and ignoreIfNotExists is false
273
* @throws CatalogException If operation fails
274
*/
275
public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
276
throws FunctionNotExistException, CatalogException;
277
278
// Statistics operations
279
/**
280
* Get table statistics for cost-based optimization
281
* @param tablePath Table path (database.table)
282
* @return Table statistics
283
* @throws TableNotExistException If table doesn't exist
284
* @throws CatalogException If operation fails
285
*/
286
public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
287
throws TableNotExistException, CatalogException;
288
289
/**
290
* Get column statistics for cost-based optimization
291
* @param tablePath Table path (database.table)
292
* @return Column statistics
293
* @throws TableNotExistException If table doesn't exist
294
* @throws CatalogException If operation fails
295
*/
296
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
297
throws TableNotExistException, CatalogException;
298
299
/**
300
* Get partition statistics
301
* @param tablePath Table path (database.table)
302
* @param partitionSpec Partition specification
303
* @return Partition statistics
304
* @throws PartitionNotExistException If partition doesn't exist
305
* @throws CatalogException If operation fails
306
*/
307
public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath,
308
CatalogPartitionSpec partitionSpec)
309
throws PartitionNotExistException, CatalogException;
310
}
311
```
312
313
**Usage Examples:**
314
315
```java
316
import org.apache.flink.table.catalog.hive.HiveCatalog;
317
import org.apache.flink.table.catalog.ObjectPath;
318
319
// Create and configure Hive catalog
320
String catalogName = "production_hive";
321
String defaultDatabase = "analytics";
322
String hiveConfDir = "/etc/hive/conf"; // Contains hive-site.xml
323
String hadoopConfDir = "/etc/hadoop/conf"; // Contains core-site.xml, hdfs-site.xml
324
String hiveVersion = "2.3.9";
325
326
HiveCatalog catalog = new HiveCatalog(
327
catalogName,
328
defaultDatabase,
329
hiveConfDir,
330
hadoopConfDir,
331
hiveVersion
332
);
333
334
// Register with Table Environment
335
TableEnvironment tableEnv = TableEnvironment.create(settings);
336
tableEnv.registerCatalog("hive", catalog);
337
tableEnv.useCatalog("hive");
338
339
// Database operations
340
List<String> databases = catalog.listDatabases();
341
System.out.println("Available databases: " + databases);
342
343
boolean dbExists = catalog.databaseExists("user_data");
344
if (!dbExists) {
345
CatalogDatabase newDb = new CatalogDatabaseImpl(
346
Map.of("description", "User analytics data"),
347
"Database for user analytics"
348
);
349
catalog.createDatabase("user_data", newDb, false);
350
}
351
352
// Table operations
353
ObjectPath tablePath = new ObjectPath("user_data", "events");
354
if (catalog.tableExists(tablePath)) {
355
CatalogBaseTable table = catalog.getTable(tablePath);
356
TableSchema schema = table.getSchema();
357
System.out.println("Table schema: " + schema);
358
359
// List partitions if table is partitioned
360
if (table instanceof CatalogTable) {
361
CatalogTable catalogTable = (CatalogTable) table;
362
if (catalogTable.isPartitioned()) {
363
List<CatalogPartitionSpec> partitions = catalog.listPartitions(tablePath);
364
System.out.println("Partitions: " + partitions.size());
365
}
366
}
367
}
368
369
// Query through catalog
370
Table result = tableEnv.sqlQuery("SELECT * FROM hive.user_data.events WHERE event_date = '2024-01-01'");
371
```
372
373