0
# Catalog and Metadata Management
1
2
The catalog system manages metadata for tables, functions, and data sources in Flink. It provides a centralized registry for database objects and supports multiple catalog backends with persistent storage capabilities.
3
4
## Capabilities
5
6
### Catalog Registration and Management
7
8
Register and manage multiple catalogs within a table environment.
9
10
```java { .api }
11
/**
12
* Register a catalog instance with the specified name
13
* @param catalogName Name to register catalog under
14
* @param catalog Catalog implementation instance
15
*/
16
public void registerCatalog(String catalogName, Catalog catalog);
17
18
/**
19
* Get a registered catalog by name
20
* @param catalogName Name of the catalog to retrieve
21
* @return Optional containing the catalog if found
22
*/
23
public Optional<Catalog> getCatalog(String catalogName);
24
25
/**
26
* Set the current catalog for table resolution
27
* @param catalogName Name of catalog to set as current
28
*/
29
public void useCatalog(String catalogName);
30
31
/**
32
* Set the current database within the current catalog
33
* @param databaseName Name of database to set as current
34
*/
35
public void useDatabase(String databaseName);
36
37
/**
38
* Get the name of the current catalog
39
* @return Current catalog name
40
*/
41
public String getCurrentCatalog();
42
43
/**
44
* Get the name of the current database
45
* @return Current database name
46
*/
47
public String getCurrentDatabase();
48
```
49
50
**Usage Examples:**
51
52
```java
53
// Register custom catalog
54
Catalog hiveCatalog = new HiveCatalog("my_hive", "default", hiveConf);
55
tableEnv.registerCatalog("hive_catalog", hiveCatalog);
56
57
// Register in-memory catalog
58
Catalog memoryCatalog = new GenericInMemoryCatalog("memory_catalog");
59
tableEnv.registerCatalog("memory", memoryCatalog);
60
61
// Switch catalog context
62
tableEnv.useCatalog("hive_catalog");
63
tableEnv.useDatabase("production_db");
64
65
// Now table references resolve to hive_catalog.production_db
66
Table prodTable = tableEnv.from("orders"); // resolves to hive_catalog.production_db.orders
67
68
// Fully qualified table access
69
Table specificTable = tableEnv.from("memory.default.temp_table");
70
```
71
72
### Catalog Listing Operations
73
74
List available catalogs, databases, and tables for discovery and exploration.
75
76
```java { .api }
77
/**
78
* List all registered catalog names
79
* @return Array of catalog names
80
*/
81
public String[] listCatalogs();
82
83
/**
84
* List all databases in the current catalog
85
* @return Array of database names
86
*/
87
public String[] listDatabases();
88
89
/**
90
* List all databases in the specified catalog
91
* @param catalogName Name of catalog to list databases from
92
* @return Array of database names
93
*/
94
public String[] listDatabases(String catalogName);
95
96
/**
97
* List all tables in the current database
98
* @return Array of table names
99
*/
100
public String[] listTables();
101
102
/**
103
* List all tables in the specified database
104
* @param databaseName Database name to list tables from
105
* @return Array of table names
106
*/
107
public String[] listTables(String databaseName);
108
109
/**
110
* List all functions in the current catalog and database
111
* @return Array of function names
112
*/
113
public String[] listFunctions();
114
```
115
116
**Usage Examples:**
117
118
```java
119
// Discover available catalogs
120
String[] catalogs = tableEnv.listCatalogs();
121
System.out.println("Available catalogs: " + Arrays.toString(catalogs));
122
123
// List databases in current catalog
124
String[] databases = tableEnv.listDatabases();
125
for (String db : databases) {
126
System.out.println("Database: " + db);
127
128
// List tables in each database
129
String[] tables = tableEnv.listTables(db);
130
for (String table : tables) {
131
System.out.println(" Table: " + table);
132
}
133
}
134
135
// List functions
136
String[] functions = tableEnv.listFunctions();
137
System.out.println("Available functions: " + Arrays.toString(functions));
138
```
139
140
### Built-in Catalog Implementations
141
142
Flink provides several catalog implementations for different storage backends.
143
144
```java { .api }
145
/**
146
* In-memory catalog for testing and temporary metadata storage
147
*/
148
public class GenericInMemoryCatalog implements Catalog {
149
/**
150
* Creates an in-memory catalog with default database
151
* @param catalogName Name of the catalog
152
* @param defaultDatabase Name of the default database
153
*/
154
public GenericInMemoryCatalog(String catalogName, String defaultDatabase);
155
156
/**
157
* Creates an in-memory catalog with "default" as default database
158
* @param catalogName Name of the catalog
159
*/
160
public GenericInMemoryCatalog(String catalogName);
161
}
162
163
/**
164
* Factory for creating in-memory catalogs
165
*/
166
public class GenericInMemoryCatalogFactory implements CatalogFactory {
167
public static final String IDENTIFIER = "generic_in_memory";
168
}
169
```
170
171
**Usage Examples:**
172
173
```java
174
// Create and register in-memory catalog
175
GenericInMemoryCatalog memoryCatalog = new GenericInMemoryCatalog(
176
"test_catalog",
177
"test_db"
178
);
179
tableEnv.registerCatalog("test", memoryCatalog);
180
181
// Use catalog factory for configuration-based creation
182
Map<String, String> properties = new HashMap<>();
183
properties.put("type", "generic_in_memory");
184
properties.put("default-database", "my_database");
185
186
// Register through SQL DDL
187
tableEnv.executeSql(
188
"CREATE CATALOG test_catalog WITH (" +
189
" 'type' = 'generic_in_memory'," +
190
" 'default-database' = 'my_database'" +
191
")"
192
);
193
```
194
195
### Catalog Interface Operations
196
197
Core catalog interface for implementing custom catalog backends.
198
199
```java { .api }
200
public interface Catalog {
201
/**
202
* Open the catalog and establish connections
203
* @throws CatalogException if opening fails
204
*/
205
void open() throws CatalogException;
206
207
/**
208
* Close the catalog and clean up resources
209
* @throws CatalogException if closing fails
210
*/
211
void close() throws CatalogException;
212
213
/**
214
* Get the default database name
215
* @return Default database name
216
*/
217
String getDefaultDatabase();
218
219
/**
220
* List all database names
221
* @return List of database names
222
* @throws CatalogException if listing fails
223
*/
224
List<String> listDatabases() throws CatalogException;
225
226
/**
227
* Get database metadata
228
* @param databaseName Name of database to retrieve
229
* @return CatalogDatabase with metadata
230
* @throws CatalogException if database not found or error occurs
231
*/
232
CatalogDatabase getDatabase(String databaseName) throws CatalogException;
233
234
/**
235
* Check if database exists
236
* @param databaseName Name of database to check
237
* @return true if database exists
238
* @throws CatalogException if check fails
239
*/
240
boolean databaseExists(String databaseName) throws CatalogException;
241
242
/**
243
* Create a new database
244
* @param databaseName Name of database to create
245
* @param database Database metadata
246
* @param ignoreIfExists If true, don't throw error if database already exists
247
* @throws CatalogException if creation fails
248
*/
249
void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists)
250
throws CatalogException;
251
}
252
```
253
254
### Table Management Operations
255
256
Operations for managing table metadata within catalogs.
257
258
```java { .api }
259
/**
260
* List all tables in the specified database
261
* @param databaseName Database name
262
* @return List of table names
263
* @throws CatalogException if listing fails
264
*/
265
List<String> listTables(String databaseName) throws CatalogException;
266
267
/**
268
* Get table metadata
269
* @param tablePath Object path identifying the table
270
* @return CatalogTable with complete metadata
271
* @throws CatalogException if table not found or error occurs
272
*/
273
CatalogTable getTable(ObjectPath tablePath) throws CatalogException;
274
275
/**
276
* Check if table exists
277
* @param tablePath Object path identifying the table
278
* @return true if table exists
279
* @throws CatalogException if check fails
280
*/
281
boolean tableExists(ObjectPath tablePath) throws CatalogException;
282
283
/**
284
* Create a new table
285
* @param tablePath Object path for the new table
286
* @param table Table metadata
287
* @param ignoreIfExists If true, don't throw error if table already exists
288
* @throws CatalogException if creation fails
289
*/
290
void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists)
291
throws CatalogException;
292
293
/**
294
* Drop an existing table
295
* @param tablePath Object path identifying the table to drop
296
* @param ignoreIfNotExists If true, don't throw error if table doesn't exist
297
* @throws CatalogException if drop fails
298
*/
299
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws CatalogException;
300
```
301
302
**Usage Examples:**
303
304
```java
305
// Create table through catalog interface
306
ObjectPath tablePath = new ObjectPath("my_database", "my_table");
307
308
// Define table schema
309
Schema schema = Schema.newBuilder()
310
.column("id", DataTypes.BIGINT())
311
.column("name", DataTypes.STRING())
312
.column("created_at", DataTypes.TIMESTAMP(3))
313
.primaryKey("id")
314
.build();
315
316
// Create catalog table
317
Map<String, String> properties = new HashMap<>();
318
properties.put("connector", "filesystem");
319
properties.put("path", "/path/to/data");
320
properties.put("format", "parquet");
321
322
CatalogTable catalogTable = CatalogTable.of(
323
schema,
324
"Customer data table",
325
Collections.emptyList(),
326
properties
327
);
328
329
// Create table in catalog
330
catalog.createTable(tablePath, catalogTable, false);
331
332
// List tables
333
List<String> tables = catalog.listTables("my_database");
334
System.out.println("Tables: " + tables);
335
336
// Check if table exists
337
boolean exists = catalog.tableExists(tablePath);
338
System.out.println("Table exists: " + exists);
339
```
340
341
### Function Management
342
343
Manage user-defined functions within the catalog system.
344
345
```java { .api }
346
/**
347
* List all functions in the specified database
348
* @param databaseName Database name
349
* @return List of function names
350
* @throws CatalogException if listing fails
351
*/
352
List<String> listFunctions(String databaseName) throws CatalogException;
353
354
/**
355
* Get function metadata
356
* @param functionPath Object path identifying the function
357
* @return CatalogFunction with metadata
358
* @throws CatalogException if function not found or error occurs
359
*/
360
CatalogFunction getFunction(ObjectPath functionPath) throws CatalogException;
361
362
/**
363
* Check if function exists
364
* @param functionPath Object path identifying the function
365
* @return true if function exists
366
* @throws CatalogException if check fails
367
*/
368
boolean functionExists(ObjectPath functionPath) throws CatalogException;
369
370
/**
371
* Create a new function
372
* @param functionPath Object path for the new function
373
* @param function Function metadata
374
* @param ignoreIfExists If true, don't throw error if function already exists
375
* @throws CatalogException if creation fails
376
*/
377
void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
378
throws CatalogException;
379
```
380
381
### Context-Resolved Objects
382
383
Objects resolved within a specific catalog context with full metadata.
384
385
```java { .api }
386
public interface ContextResolvedTable {
387
/**
388
* Get the identifier for this table
389
* @return Table identifier
390
*/
391
Identifier getIdentifier();
392
393
/**
394
* Get the resolved table
395
* @return CatalogTable with full metadata
396
*/
397
CatalogTable getTable();
398
399
/**
400
* Get the resolved schema
401
* @return ResolvedSchema for the table
402
*/
403
ResolvedSchema getResolvedSchema();
404
405
/**
406
* Check if this is a temporary table
407
* @return true if temporary
408
*/
409
boolean isTemporary();
410
}
411
412
public interface ContextResolvedFunction {
413
/**
414
* Get the function identifier
415
* @return Function identifier
416
*/
417
Identifier getIdentifier();
418
419
/**
420
* Get the catalog function metadata
421
* @return CatalogFunction with metadata
422
*/
423
CatalogFunction getCatalogFunction();
424
425
/**
426
* Get the function definition
427
* @return FunctionDefinition for execution
428
*/
429
FunctionDefinition getFunctionDefinition();
430
}
431
```
432
433
### Database and Table Metadata Types
434
435
Metadata structures for databases and tables.
436
437
```java { .api }
438
public interface CatalogDatabase {
439
/**
440
* Get database properties
441
* @return Map of property key-value pairs
442
*/
443
Map<String, String> getProperties();
444
445
/**
446
* Get database comment/description
447
* @return Database description
448
*/
449
String getComment();
450
}
451
452
public interface CatalogTable extends CatalogBaseTable {
453
/**
454
* Check if this table is partitioned
455
* @return true if partitioned
456
*/
457
boolean isPartitioned();
458
459
/**
460
* Get partition keys for partitioned tables
461
* @return List of partition key column names
462
*/
463
List<String> getPartitionKeys();
464
465
/**
466
* Create a copy of this table with new properties
467
* @param properties New properties map
468
* @return New CatalogTable with updated properties
469
*/
470
CatalogTable copy(Map<String, String> properties);
471
}
472
473
public class CatalogTableImpl implements CatalogTable {
474
/**
475
* Creates a catalog table implementation
476
* @param schema Table schema
477
* @param partitionKeys Partition key columns
478
* @param properties Table properties
479
* @param comment Table description
480
*/
481
public CatalogTableImpl(
482
Schema schema,
483
List<String> partitionKeys,
484
Map<String, String> properties,
485
String comment
486
);
487
}
488
```
489
490
**Usage Examples:**
491
492
```java
493
// Create database metadata
494
Map<String, String> dbProps = new HashMap<>();
495
dbProps.put("location", "/warehouse/analytics");
496
dbProps.put("owner", "analytics_team");
497
498
CatalogDatabase database = new CatalogDatabaseImpl(
499
dbProps,
500
"Analytics database for business intelligence"
501
);
502
503
// Create table metadata with partitioning
504
Schema tableSchema = Schema.newBuilder()
505
.column("transaction_id", DataTypes.BIGINT())
506
.column("customer_id", DataTypes.BIGINT())
507
.column("amount", DataTypes.DECIMAL(10, 2))
508
.column("transaction_date", DataTypes.DATE())
509
.column("region", DataTypes.STRING())
510
.build();
511
512
List<String> partitionKeys = Arrays.asList("transaction_date", "region");
513
514
Map<String, String> tableProps = new HashMap<>();
515
tableProps.put("connector", "filesystem");
516
tableProps.put("path", "/data/transactions");
517
tableProps.put("format", "parquet");
518
519
CatalogTable partitionedTable = new CatalogTableImpl(
520
tableSchema,
521
partitionKeys,
522
tableProps,
523
"Daily transaction data partitioned by date and region"
524
);
525
526
// Create in catalog
527
ObjectPath tablePath = new ObjectPath("analytics", "transactions");
528
catalog.createTable(tablePath, partitionedTable, false);
529
```