0
# Catalog and Metadata Management
1
2
This document covers multi-catalog support with database and table metadata management capabilities in Apache Flink Table Uber Blink.
3
4
## Catalog Operations
5
6
### Catalog Registration
7
8
```java { .api }
9
interface TableEnvironment {
10
void registerCatalog(String catalogName, Catalog catalog);
11
Optional<Catalog> getCatalog(String catalogName);
12
void useCatalog(String catalogName);
13
String getCurrentCatalog();
14
void useDatabase(String databaseName);
15
String getCurrentDatabase();
16
}
17
```
18
19
**Usage:**
20
21
```java
22
// Register Hive catalog
23
HiveCatalog hiveCatalog = new HiveCatalog("myhive", "default", "/path/to/hive-conf");
24
tEnv.registerCatalog("myhive", hiveCatalog);
25
26
// Register JDBC catalog
27
JdbcCatalog jdbcCatalog = new PostgresCatalog("mypg", "testdb", "user", "pass", "jdbc:postgresql://localhost:5432/testdb");
28
tEnv.registerCatalog("mypg", jdbcCatalog);
29
30
// Switch catalog context
31
tEnv.useCatalog("myhive");
32
tEnv.useDatabase("production");
33
```
34
35
## Built-in Catalogs
36
37
### Generic In-Memory Catalog
38
39
```java { .api }
40
class GenericInMemoryCatalog implements Catalog {
41
GenericInMemoryCatalog(String name);
42
GenericInMemoryCatalog(String name, String defaultDatabase);
43
}
44
```
45
46
**Usage:**
47
48
```java
49
// Create and register in-memory catalog
50
GenericInMemoryCatalog memoryCatalog = new GenericInMemoryCatalog("memory_catalog", "my_db");
51
tEnv.registerCatalog("memory", memoryCatalog);
52
```
53
54
### Hive Catalog
55
56
```java { .api }
57
class HiveCatalog implements Catalog {
58
HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir);
59
HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hiveVersion);
60
HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf);
61
HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, String hiveVersion);
62
}
63
```
64
65
**Usage:**
66
67
```java
68
// Register Hive catalog
69
HiveCatalog hive = new HiveCatalog(
70
"myhive", // catalog name
71
"default", // default database
72
"/opt/hive/conf", // hive conf directory
73
"2.3.4" // hive version
74
);
75
tEnv.registerCatalog("myhive", hive);
76
77
// Use Hive tables
78
tEnv.useCatalog("myhive");
79
Table hiveTable = tEnv.from("hive_database.hive_table");
80
```
81
82
### JDBC Catalogs
83
84
```java { .api }
85
class JdbcCatalog implements Catalog {
86
JdbcCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);
87
}
88
89
class PostgresCatalog extends JdbcCatalog {
90
PostgresCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);
91
}
92
93
class MySqlCatalog extends JdbcCatalog {
94
MySqlCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);
95
}
96
```
97
98
**Usage:**
99
100
```java
101
// PostgreSQL catalog
102
PostgresCatalog pgCatalog = new PostgresCatalog(
103
"mypg",
104
"postgres",
105
"user",
106
"password",
107
"jdbc:postgresql://localhost:5432/"
108
);
109
tEnv.registerCatalog("mypg", pgCatalog);
110
111
// MySQL catalog
112
MySqlCatalog mysqlCatalog = new MySqlCatalog(
113
"mysql",
114
"test",
115
"root",
116
"root",
117
"jdbc:mysql://localhost:3306"
118
);
119
tEnv.registerCatalog("mysql", mysqlCatalog);
120
```
121
122
## Metadata Listing
123
124
### Listing Operations
125
126
```java { .api }
127
interface TableEnvironment {
128
String[] listCatalogs();
129
String[] listDatabases();
130
String[] listTables();
131
String[] listViews();
132
String[] listUserDefinedFunctions();
133
String[] listFunctions();
134
String[] listModules();
135
}
136
```
137
138
**Usage:**
139
140
```java
141
// List all metadata
142
String[] catalogs = tEnv.listCatalogs();
143
String[] databases = tEnv.listDatabases();
144
String[] tables = tEnv.listTables();
145
String[] views = tEnv.listViews();
146
String[] functions = tEnv.listUserDefinedFunctions();
147
148
// Print metadata hierarchy
149
for (String catalog : catalogs) {
150
System.out.println("Catalog: " + catalog);
151
tEnv.useCatalog(catalog);
152
153
for (String database : tEnv.listDatabases()) {
154
System.out.println(" Database: " + database);
155
tEnv.useDatabase(database);
156
157
for (String table : tEnv.listTables()) {
158
System.out.println(" Table: " + table);
159
}
160
}
161
}
162
```
163
164
## Database Operations
165
166
### Database Management
167
168
```java { .api }
169
interface Catalog {
170
boolean databaseExists(String databaseName);
171
void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);
172
void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);
173
void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists);
174
List<String> listDatabases();
175
CatalogDatabase getDatabase(String databaseName);
176
}
177
```
178
179
**Usage:**
180
181
```java
182
// Create database
183
CatalogDatabase newDatabase = new CatalogDatabaseImpl(
184
Map.of("location", "/path/to/database"),
185
"My custom database"
186
);
187
188
Catalog catalog = tEnv.getCatalog("myhive").get();
189
catalog.createDatabase("my_db", newDatabase, false);
190
191
// Check database existence
192
boolean exists = catalog.databaseExists("my_db");
193
194
// Get database metadata
195
CatalogDatabase dbMeta = catalog.getDatabase("my_db");
196
Map<String, String> properties = dbMeta.getProperties();
197
String comment = dbMeta.getComment();
198
```
199
200
### Database SQL Operations
201
202
```sql
203
-- Create database
204
CREATE DATABASE [IF NOT EXISTS] database_name
205
[COMMENT 'comment']
206
[WITH (key1=val1, key2=val2, ...)];
207
208
-- Drop database
209
DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];
210
211
-- Show databases
212
SHOW DATABASES;
213
214
-- Describe database
215
DESCRIBE DATABASE database_name;
216
DESC DATABASE database_name;
217
218
-- Use database
219
USE database_name;
220
```
221
222
## Table Operations
223
224
### Table Management
225
226
```java { .api }
227
interface Catalog {
228
boolean tableExists(ObjectPath tablePath);
229
void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
230
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
231
void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists);
232
List<String> listTables(String databaseName);
233
CatalogBaseTable getTable(ObjectPath tablePath);
234
TableStatistics getTableStatistics(ObjectPath tablePath);
235
CatalogTableStatistics getTableStatistics(ObjectPath tablePath);
236
CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath);
237
}
238
```
239
240
**Usage:**
241
242
```java
243
ObjectPath tablePath = new ObjectPath("my_db", "my_table");
244
245
// Check table existence
246
boolean tableExists = catalog.tableExists(tablePath);
247
248
// Get table metadata
249
CatalogBaseTable table = catalog.getTable(tablePath);
250
Map<String, String> options = table.getOptions();
251
TableSchema schema = table.getSchema();
252
String comment = table.getComment();
253
254
// Get table statistics
255
CatalogTableStatistics stats = catalog.getTableStatistics(tablePath);
256
long rowCount = stats.getRowCount();
257
Map<String, CatalogColumnStatistics> columnStats = stats.getColumnStatisticsData();
258
```
259
260
### Object Path Handling
261
262
```java { .api }
263
class ObjectPath {
264
ObjectPath(String databaseName, String objectName);
265
String getDatabaseName();
266
String getObjectName();
267
String getFullName();
268
269
static ObjectPath fromString(String fullName);
270
}
271
```
272
273
**Usage:**
274
275
```java
276
// Create object paths
277
ObjectPath path1 = new ObjectPath("database", "table");
278
ObjectPath path2 = ObjectPath.fromString("database.table");
279
280
// Multi-part identifiers
281
String fullName = "catalog.database.table";
282
// Parse into components for object path
283
String[] parts = fullName.split("\\.");
284
ObjectPath path = new ObjectPath(parts[1], parts[2]); // database.table
285
```
286
287
## Custom Catalog Implementation
288
289
### Catalog Interface Implementation
290
291
```java
292
public class CustomCatalog implements Catalog {
293
private final String catalogName;
294
private final String defaultDatabase;
295
296
public CustomCatalog(String catalogName, String defaultDatabase) {
297
this.catalogName = catalogName;
298
this.defaultDatabase = defaultDatabase;
299
}
300
301
@Override
302
public void open() throws CatalogException {
303
// Initialize catalog connection
304
}
305
306
@Override
307
public void close() throws CatalogException {
308
// Clean up resources
309
}
310
311
@Override
312
public String getDefaultDatabase() throws CatalogException {
313
return defaultDatabase;
314
}
315
316
@Override
317
public boolean databaseExists(String databaseName) throws CatalogException {
318
// Implementation to check database existence
319
return checkDatabaseExists(databaseName);
320
}
321
322
@Override
323
public List<String> listDatabases() throws CatalogException {
324
// Implementation to list databases
325
return getDatabaseList();
326
}
327
328
@Override
329
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
330
// Implementation to check table existence
331
return checkTableExists(tablePath);
332
}
333
334
@Override
335
public List<String> listTables(String databaseName) throws CatalogException {
336
// Implementation to list tables
337
return getTableList(databaseName);
338
}
339
340
@Override
341
public CatalogBaseTable getTable(ObjectPath tablePath) throws CatalogException {
342
// Implementation to get table metadata
343
return loadTableMetadata(tablePath);
344
}
345
346
// Implement other required methods...
347
}
348
```
349
350
## Multi-Catalog Queries
351
352
### Cross-Catalog Operations
353
354
```sql
355
-- Query across multiple catalogs
356
SELECT
357
h.user_id,
358
h.purchase_amount,
359
p.user_name,
360
p.email
361
FROM myhive.sales.purchases h
362
JOIN mypg.users.profiles p ON h.user_id = p.user_id
363
WHERE h.purchase_date >= CURRENT_DATE - INTERVAL '7' DAY;
364
365
-- Insert from one catalog to another
366
INSERT INTO myhive.warehouse.sales_summary
367
SELECT
368
user_id,
369
SUM(amount) as total_amount,
370
COUNT(*) as transaction_count
371
FROM mypg.transactional.orders
372
WHERE order_date >= CURRENT_DATE - INTERVAL '1' DAY
373
GROUP BY user_id;
374
```
375
376
### Catalog Resolution
377
378
```java
379
// Fully qualified table names
380
Table hiveTable = tEnv.from("myhive.production.user_events");
381
Table pgTable = tEnv.from("mypg.analytics.user_profiles");
382
383
// Join across catalogs
384
Table joined = hiveTable
385
.join(pgTable, $("myhive.production.user_events.user_id").isEqual($("mypg.analytics.user_profiles.id")))
386
.select($("user_id"), $("event_type"), $("name"), $("email"));
387
```
388
389
## Configuration
390
391
### Catalog Configuration
392
393
```java
394
// Configure catalog properties
395
Map<String, String> hiveProperties = new HashMap<>();
396
hiveProperties.put("hive.metastore.uris", "thrift://localhost:9083");
397
hiveProperties.put("hadoop.conf.dir", "/etc/hadoop/conf");
398
399
HiveCatalog hiveCatalog = new HiveCatalog("myhive", "default", hiveProperties);
400
401
// JDBC catalog with connection pool
402
Map<String, String> jdbcProperties = new HashMap<>();
403
jdbcProperties.put("connection.pool.max-size", "10");
404
jdbcProperties.put("connection.timeout", "30000");
405
406
PostgresCatalog pgCatalog = new PostgresCatalog("mypg", "postgres", "user", "pass", "jdbc:postgresql://localhost:5432/", jdbcProperties);
407
```
408
409
### Metadata Cache Configuration
410
411
```java
412
Configuration config = tEnv.getConfig().getConfiguration();
413
414
// Configure metadata cache
415
config.setString("table.catalog.cache.expiration-time", "10 min");
416
config.setBoolean("table.catalog.cache.enabled", true);
417
418
// Configure Hive metastore client
419
config.setString("table.catalog.hive.metastore.client.factory", "org.apache.hadoop.hive.metastore.HiveMetaStoreClientFactory");
420
```
421
422
## Error Handling
423
424
```java { .api }
425
class CatalogException extends Exception {
426
CatalogException(String message);
427
CatalogException(String message, Throwable cause);
428
}
429
430
class DatabaseNotExistException extends CatalogException;
431
class DatabaseAlreadyExistException extends CatalogException;
432
class TableNotExistException extends CatalogException;
433
class TableAlreadyExistException extends CatalogException;
434
```
435
436
## Types
437
438
```java { .api }
439
interface CatalogDatabase {
440
Map<String, String> getProperties();
441
String getComment();
442
CatalogDatabase copy();
443
CatalogDatabase copy(Map<String, String> properties);
444
Optional<String> getDescription();
445
Optional<String> getDetailedDescription();
446
}
447
448
interface CatalogBaseTable {
449
Map<String, String> getOptions();
450
String getComment();
451
CatalogBaseTable copy(Map<String, String> options);
452
Optional<String> getDescription();
453
Optional<String> getDetailedDescription();
454
TableSchema getSchema(); // Deprecated
455
Schema getUnresolvedSchema();
456
}
457
458
class CatalogTable implements CatalogBaseTable {
459
CatalogTable(TableSchema tableSchema, Map<String, String> properties, String comment);
460
CatalogTable(Schema schema, String comment, List<String> partitionKeys, Map<String, String> options);
461
462
boolean isPartitioned();
463
List<String> getPartitionKeys();
464
}
465
466
class CatalogView implements CatalogBaseTable {
467
CatalogView(String originalQuery, String expandedQuery, TableSchema schema, Map<String, String> properties, String comment);
468
469
String getOriginalQuery();
470
String getExpandedQuery();
471
}
472
473
interface CatalogFunction {
474
String getClassName();
475
FunctionLanguage getFunctionLanguage();
476
List<String> getFunctionResources();
477
}
478
479
enum FunctionLanguage {
480
JVM,
481
PYTHON,
482
SCALA
483
}
484
```