0
# Catalog System
1
2
Flink's catalog system provides pluggable metadata management for tables, functions, databases, and user-defined catalogs. It supports persistent storage, schema evolution, and integration with external metadata systems like Hive Metastore.
3
4
## Capabilities
5
6
### Catalog Management
7
8
Manage multiple catalogs and switch between different metadata repositories.
9
10
```java { .api }
11
/**
12
* Registers a catalog under a unique name
13
* @param catalogName Name for the catalog
14
* @param catalog Catalog implementation to register
15
*/
16
void registerCatalog(String catalogName, Catalog catalog);
17
18
/**
19
* Gets a registered catalog by name
20
* @param catalogName Name of the catalog to retrieve
21
* @return Optional containing the catalog if found
22
*/
23
Optional<Catalog> getCatalog(String catalogName);
24
25
/**
26
* Sets the current catalog for table operations
27
* @param catalogName Name of the catalog to use as current
28
*/
29
void useCatalog(String catalogName);
30
31
/**
32
* Gets the name of the current catalog
33
* @return Current catalog name
34
*/
35
String getCurrentCatalog();
36
37
/**
38
* Lists all registered catalog names
39
* @return Array of catalog names
40
*/
41
String[] listCatalogs();
42
```
43
44
**Usage Examples:**
45
46
```java
47
// Register different catalog types
48
HiveCatalog hiveCatalog = new HiveCatalog(
49
"my_hive",
50
"default",
51
"path/to/hive-conf",
52
"2.3.4"
53
);
54
tableEnv.registerCatalog("hive_catalog", hiveCatalog);
55
56
JdbcCatalog jdbcCatalog = new JdbcCatalog(
57
"my_jdbc_catalog",
58
"default",
59
"postgres",
60
"jdbc:postgresql://localhost:5432/metadata",
61
"username",
62
"password"
63
);
64
tableEnv.registerCatalog("postgres_catalog", jdbcCatalog);
65
66
// Switch between catalogs
67
tableEnv.useCatalog("hive_catalog");
68
String[] hiveTables = tableEnv.listTables();
69
70
tableEnv.useCatalog("postgres_catalog");
71
String[] postgresTables = tableEnv.listTables();
72
73
// Get current context
74
String currentCatalog = tableEnv.getCurrentCatalog();
75
```
76
77
### Database Operations
78
79
Manage databases within catalogs with full CRUD operations.
80
81
```java { .api }
82
interface Catalog {
83
/**
84
* Creates a database in the catalog
85
* @param name Database name
86
* @param database Database metadata
87
* @param ignoreIfExists Skip creation if database already exists
88
*/
89
void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
90
throws DatabaseAlreadyExistException, CatalogException;
91
92
/**
93
* Drops a database from the catalog
94
* @param name Database name to drop
95
* @param ignoreIfNotExists Skip error if database doesn't exist
96
* @param cascade Drop all tables in the database
97
*/
98
void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
99
throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException;
100
101
/**
102
* Lists all databases in the catalog
103
* @return List of database names
104
*/
105
List<String> listDatabases() throws CatalogException;
106
107
/**
108
* Gets database metadata
109
* @param databaseName Name of the database
110
* @return CatalogDatabase containing metadata
111
*/
112
CatalogDatabase getDatabase(String databaseName)
113
throws DatabaseNotExistException, CatalogException;
114
115
/**
116
* Checks if a database exists
117
* @param databaseName Name of the database to check
118
* @return true if database exists
119
*/
120
boolean databaseExists(String databaseName) throws CatalogException;
121
}
122
```
123
124
**Usage Examples:**
125
126
```java
127
// Create database with properties
128
Map<String, String> dbProperties = new HashMap<>();
129
dbProperties.put("owner", "analytics_team");
130
dbProperties.put("created_date", "2024-01-01");
131
132
CatalogDatabase analyticsDb = new CatalogDatabaseImpl(
133
dbProperties,
134
"Database for analytics workflows"
135
);
136
137
Catalog catalog = tableEnv.getCatalog("hive_catalog").get();
138
catalog.createDatabase("analytics", analyticsDb, false);
139
140
// Use the new database
141
tableEnv.useDatabase("analytics");
142
143
// List databases
144
List<String> databases = catalog.listDatabases();
145
for (String db : databases) {
146
System.out.println("Database: " + db);
147
}
148
```
149
150
### Table Operations
151
152
Comprehensive table management with metadata, partitioning, and constraints.
153
154
```java { .api }
155
interface Catalog {
156
/**
157
* Creates a table in the catalog
158
* @param tablePath Path to the table (database.table)
159
* @param table Table metadata and schema
160
* @param ignoreIfExists Skip creation if table already exists
161
*/
162
void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists)
163
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;
164
165
/**
166
* Drops a table from the catalog
167
* @param tablePath Path to the table to drop
168
* @param ignoreIfNotExists Skip error if table doesn't exist
169
*/
170
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
171
throws TableNotExistException, CatalogException;
172
173
/**
174
* Lists all tables in a database
175
* @param databaseName Database name
176
* @return List of table names
177
*/
178
List<String> listTables(String databaseName)
179
throws DatabaseNotExistException, CatalogException;
180
181
/**
182
* Gets table metadata
183
* @param tablePath Path to the table
184
* @return CatalogTable containing metadata
185
*/
186
CatalogTable getTable(ObjectPath tablePath)
187
throws TableNotExistException, CatalogException;
188
189
/**
190
* Checks if a table exists
191
* @param tablePath Path to the table to check
192
* @return true if table exists
193
*/
194
boolean tableExists(ObjectPath tablePath) throws CatalogException;
195
196
/**
197
* Renames a table
198
* @param tablePath Current table path
199
* @param newTableName New table name
200
*/
201
void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
202
throws TableNotExistException, TableAlreadyExistException, CatalogException;
203
204
/**
205
* Alters table metadata
206
* @param tablePath Path to the table
207
* @param newTable New table metadata
208
* @param ignoreIfNotExists Skip error if table doesn't exist
209
*/
210
void alterTable(ObjectPath tablePath, CatalogTable newTable, boolean ignoreIfNotExists)
211
throws TableNotExistException, CatalogException;
212
}
213
```
214
215
**Usage Examples:**
216
217
```java
218
// Create table with comprehensive metadata
219
Schema schema = Schema.newBuilder()
220
.column("order_id", DataTypes.BIGINT())
221
.column("customer_id", DataTypes.BIGINT())
222
.column("product_id", DataTypes.BIGINT())
223
.column("quantity", DataTypes.INT())
224
.column("unit_price", DataTypes.DECIMAL(10, 2))
225
.column("order_date", DataTypes.DATE())
226
.column("region", DataTypes.STRING())
227
.primaryKey("order_id")
228
.build();
229
230
Map<String, String> properties = new HashMap<>();
231
properties.put("connector", "kafka");
232
properties.put("topic", "orders");
233
properties.put("properties.bootstrap.servers", "localhost:9092");
234
properties.put("format", "json");
235
236
CatalogTable ordersTable = CatalogTable.of(
237
schema,
238
"Orders table for e-commerce analytics",
239
Arrays.asList("region", "order_date"), // Partition keys
240
properties
241
);
242
243
ObjectPath tablePath = new ObjectPath("analytics", "orders");
244
catalog.createTable(tablePath, ordersTable, false);
245
246
// List and inspect tables
247
List<String> tables = catalog.listTables("analytics");
248
for (String tableName : tables) {
249
ObjectPath path = new ObjectPath("analytics", tableName);
250
CatalogTable table = catalog.getTable(path);
251
System.out.println("Table: " + tableName);
252
System.out.println("Schema: " + table.getUnresolvedSchema());
253
System.out.println("Properties: " + table.getOptions());
254
}
255
```
256
257
### Function Management
258
259
Manage user-defined functions in the catalog with versioning and metadata.
260
261
```java { .api }
262
interface Catalog {
263
/**
264
* Creates a function in the catalog
265
* @param functionPath Path to the function (database.function)
266
* @param function Function metadata
267
* @param ignoreIfExists Skip creation if function already exists
268
*/
269
void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
270
throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;
271
272
/**
273
* Drops a function from the catalog
274
* @param functionPath Path to the function to drop
275
* @param ignoreIfNotExists Skip error if function doesn't exist
276
*/
277
void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
278
throws FunctionNotExistException, CatalogException;
279
280
/**
281
* Lists all functions in a database
282
* @param databaseName Database name
283
* @return List of function names
284
*/
285
List<String> listFunctions(String databaseName)
286
throws DatabaseNotExistException, CatalogException;
287
288
/**
289
* Gets function metadata
290
* @param functionPath Path to the function
291
* @return CatalogFunction containing metadata
292
*/
293
CatalogFunction getFunction(ObjectPath functionPath)
294
throws FunctionNotExistException, CatalogException;
295
296
/**
297
* Checks if a function exists
298
* @param functionPath Path to the function to check
299
* @return true if function exists
300
*/
301
boolean functionExists(ObjectPath functionPath) throws CatalogException;
302
}
303
```
304
305
**Usage Examples:**
306
307
```java
308
// Register UDF in catalog
309
CatalogFunction myFunction = new CatalogFunctionImpl(
310
"com.company.functions.MyCustomFunction",
311
FunctionLanguage.JAVA,
312
Arrays.asList("dependency1.jar", "dependency2.jar"),
313
"Custom function for business logic"
314
);
315
316
ObjectPath functionPath = new ObjectPath("analytics", "my_custom_function");
317
catalog.createFunction(functionPath, myFunction, false);
318
319
// Use function in SQL
320
tableEnv.useCatalog("hive_catalog");
321
tableEnv.useDatabase("analytics");
322
Table result = tableEnv.sqlQuery(
323
"SELECT customer_id, my_custom_function(customer_data) as processed_data " +
324
"FROM customers"
325
);
326
```
327
328
### Partition Management
329
330
Handle partitioned tables with dynamic partition discovery and pruning.
331
332
```java { .api }
333
interface Catalog {
334
/**
335
* Lists all partitions of a partitioned table
336
* @param tablePath Path to the partitioned table
337
* @return List of partition specifications
338
*/
339
List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
340
throws TableNotExistException, TableNotPartitionedException, CatalogException;
341
342
/**
343
* Lists partitions matching a partial specification
344
* @param tablePath Path to the partitioned table
345
* @param partitionSpec Partial partition specification for filtering
346
* @return List of matching partition specifications
347
*/
348
List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
349
throws TableNotExistException, TableNotPartitionedException, CatalogException;
350
351
/**
352
* Gets partition metadata
353
* @param tablePath Path to the partitioned table
354
* @param partitionSpec Partition specification
355
* @return CatalogPartition containing metadata
356
*/
357
CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
358
throws PartitionNotExistException, CatalogException;
359
360
/**
361
* Checks if a partition exists
362
* @param tablePath Path to the partitioned table
363
* @param partitionSpec Partition specification to check
364
* @return true if partition exists
365
*/
366
boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
367
throws CatalogException;
368
}
369
```
370
371
**Usage Examples:**
372
373
```java
374
// Work with partitioned tables
375
ObjectPath partitionedTable = new ObjectPath("analytics", "daily_sales");
376
377
// List all partitions
378
List<CatalogPartitionSpec> allPartitions = catalog.listPartitions(partitionedTable);
379
for (CatalogPartitionSpec spec : allPartitions) {
380
System.out.println("Partition: " + spec.getPartitionSpec());
381
}
382
383
// List partitions for specific year
384
CatalogPartitionSpec yearFilter = new CatalogPartitionSpec(
385
Collections.singletonMap("year", "2024")
386
);
387
List<CatalogPartitionSpec> yearPartitions = catalog.listPartitions(partitionedTable, yearFilter);
388
389
// Check specific partition
390
CatalogPartitionSpec specificPartition = new CatalogPartitionSpec(
391
Map.of("year", "2024", "month", "01", "day", "15")
392
);
393
boolean exists = catalog.partitionExists(partitionedTable, specificPartition);
394
```
395
396
### Object Path Resolution
397
398
Navigate catalog hierarchies with full path resolution and validation.
399
400
```java { .api }
401
class ObjectPath {
402
/**
403
* Creates an object path for database.object
404
* @param databaseName Database name
405
* @param objectName Object name (table, function, etc.)
406
*/
407
ObjectPath(String databaseName, String objectName);
408
409
/**
410
* Gets the database name
411
* @return Database name
412
*/
413
String getDatabaseName();
414
415
/**
416
* Gets the object name
417
* @return Object name
418
*/
419
String getObjectName();
420
421
/**
422
* Gets the full path as a string
423
* @return String representation of the path
424
*/
425
String getFullName();
426
}
427
428
class ObjectIdentifier {
429
/**
430
* Creates a full object identifier
431
* @param catalogName Catalog name
432
* @param databaseName Database name
433
* @param objectName Object name
434
*/
435
static ObjectIdentifier of(String catalogName, String databaseName, String objectName);
436
437
/**
438
* Gets the catalog name
439
* @return Catalog name
440
*/
441
String getCatalogName();
442
443
/**
444
* Gets the database name
445
* @return Database name
446
*/
447
String getDatabaseName();
448
449
/**
450
* Gets the object name
451
* @return Object name
452
*/
453
String getObjectName();
454
}
455
```
456
457
**Usage Examples:**
458
459
```java
460
// Object path resolution
461
ObjectPath tablePath = new ObjectPath("sales_db", "orders");
462
ObjectIdentifier fullIdentifier = ObjectIdentifier.of("hive_catalog", "sales_db", "orders");
463
464
// Use in catalog operations
465
CatalogTable table = catalog.getTable(tablePath);
466
String fullPath = fullIdentifier.getCatalogName() + "." +
467
fullIdentifier.getDatabaseName() + "." +
468
fullIdentifier.getObjectName();
469
```
470
471
## Types
472
473
### Catalog Interfaces
474
475
```java { .api }
476
interface Catalog {
477
// Database operations
478
void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);
479
void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);
480
List<String> listDatabases();
481
CatalogDatabase getDatabase(String databaseName);
482
boolean databaseExists(String databaseName);
483
484
// Table operations
485
void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists);
486
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
487
List<String> listTables(String databaseName);
488
CatalogTable getTable(ObjectPath tablePath);
489
boolean tableExists(ObjectPath tablePath);
490
491
// Function operations
492
void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists);
493
void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists);
494
List<String> listFunctions(String databaseName);
495
CatalogFunction getFunction(ObjectPath functionPath);
496
boolean functionExists(ObjectPath functionPath);
497
}
498
```
499
500
### Catalog Metadata Types
501
502
```java { .api }
503
interface CatalogDatabase {
504
Map<String, String> getProperties();
505
String getComment();
506
CatalogDatabase copy();
507
CatalogDatabase copy(Map<String, String> properties);
508
}
509
510
interface CatalogTable extends CatalogBaseTable {
511
boolean isPartitioned();
512
List<String> getPartitionKeys();
513
CatalogTable copy();
514
CatalogTable copy(Map<String, String> options);
515
}
516
517
interface CatalogFunction {
518
String getClassName();
519
FunctionLanguage getLanguage();
520
List<String> getFunctionResources();
521
String getDescription();
522
CatalogFunction copy();
523
}
524
525
enum FunctionLanguage {
526
JVM,
527
PYTHON,
528
SCALA
529
}
530
```
531
532
### Exception Types
533
534
```java { .api }
535
class CatalogException extends Exception { }
536
class DatabaseAlreadyExistException extends CatalogException { }
537
class DatabaseNotExistException extends CatalogException { }
538
class DatabaseNotEmptyException extends CatalogException { }
539
class TableAlreadyExistException extends CatalogException { }
540
class TableNotExistException extends CatalogException { }
541
class FunctionAlreadyExistException extends CatalogException { }
542
class FunctionNotExistException extends CatalogException { }
543
class PartitionNotExistException extends CatalogException { }
544
class TableNotPartitionedException extends CatalogException { }
545
```