0
# Catalog Integration
1
2
The catalog integration system provides seamless integration with Flink's catalog system for metadata management, table registration, schema information, and function discovery. It supports multiple catalog types, schema evolution, and metadata persistence across different storage systems.
3
4
## Capabilities
5
6
### CatalogManager Integration
7
8
Integration with Flink's central catalog management system for metadata operations:
9
10
```java { .api }
11
/**
12
* Catalog manager integration (from flink-table-api-java)
13
* Provides centralized catalog and database management
14
*/
15
public interface CatalogManager {
16
17
/**
18
* Registers a catalog with the catalog manager
19
* @param catalogName Name of the catalog
20
* @param catalog Catalog instance to register
21
*/
22
void registerCatalog(String catalogName, Catalog catalog);
23
24
/**
25
* Gets a registered catalog by name
26
* @param catalogName Name of the catalog
27
* @return Optional catalog instance
28
*/
29
Optional<Catalog> getCatalog(String catalogName);
30
31
/**
32
* Lists all registered catalog names
33
* @return Set of catalog names
34
*/
35
Set<String> listCatalogs();
36
37
/**
38
* Sets the current catalog
39
* @param catalogName Name of catalog to set as current
40
*/
41
void setCurrentCatalog(String catalogName);
42
43
/**
44
* Gets the current catalog name
45
* @return Current catalog name
46
*/
47
String getCurrentCatalog();
48
49
/**
50
* Sets the current database within current catalog
51
* @param databaseName Database name to set as current
52
*/
53
void setCurrentDatabase(String databaseName);
54
55
/**
56
* Gets the current database name
57
* @return Current database name
58
*/
59
String getCurrentDatabase();
60
61
/**
62
* Resolves object identifier to qualified name
63
* @param identifier Object identifier to resolve
64
* @return Fully qualified object identifier
65
*/
66
ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);
67
}
68
```
69
70
### FunctionCatalog Integration
71
72
Integration with Flink's function catalog for user-defined function management:
73
74
```java { .api }
75
/**
76
* Function catalog integration (from flink-table-api-java)
77
* Manages function registration, lookup, and resolution
78
*/
79
public interface FunctionCatalog {
80
81
/**
82
* Registers a temporary system function
83
* @param name Function name
84
* @param functionDefinition Function definition
85
*/
86
void registerTemporarySystemFunction(String name, FunctionDefinition functionDefinition);
87
88
/**
89
* Registers a temporary catalog function
90
* @param objectIdentifier Function identifier with catalog/database/name
91
* @param functionDefinition Function definition
92
* @param ignoreIfExists Whether to ignore if function already exists
93
*/
94
void registerTemporaryCatalogFunction(
95
ObjectIdentifier objectIdentifier,
96
FunctionDefinition functionDefinition,
97
boolean ignoreIfExists
98
);
99
100
/**
101
* Drops a temporary system function
102
* @param name Function name to drop
103
* @return True if function was dropped, false if not found
104
*/
105
boolean dropTemporarySystemFunction(String name);
106
107
/**
108
* Drops a temporary catalog function
109
* @param identifier Function identifier
110
* @param ignoreIfNotExist Whether to ignore if function doesn't exist
111
* @return True if function was dropped
112
*/
113
boolean dropTemporaryCatalogFunction(ObjectIdentifier identifier, boolean ignoreIfNotExist);
114
115
/**
116
* Looks up function by identifier
117
* @param objectIdentifier Function identifier
118
* @return Optional function lookup result
119
*/
120
Optional<FunctionLookup.Result> lookupFunction(ObjectIdentifier objectIdentifier);
121
122
/**
123
* Lists user-defined functions in given catalog and database
124
* @param catalogName Catalog name
125
* @param databaseName Database name
126
* @return Array of function names
127
*/
128
String[] listFunctions(String catalogName, String databaseName);
129
130
/**
131
* Lists temporary functions
132
* @return Array of temporary function names
133
*/
134
String[] listTemporaryFunctions();
135
}
136
```
137
138
### Table Source and Sink Integration
139
140
Integration with table sources and sinks through the catalog system:
141
142
```scala { .api }
143
/**
144
* Table source utilities for catalog integration
145
*/
146
object TableSourceUtil {
147
148
/**
149
* Validates table source capabilities and configuration
150
* @param tableSource Table source to validate
151
* @param schema Expected table schema
152
* @throws ValidationException if source is invalid
153
*/
154
def validateTableSource(tableSource: TableSource[_], schema: TableSchema): Unit
155
156
/**
157
* Creates dynamic table source from catalog table
158
* @param catalogTable Catalog table definition
159
* @param context Dynamic table context
160
* @return Dynamic table source instance
161
*/
162
def createDynamicTableSource(
163
catalogTable: CatalogBaseTable,
164
context: DynamicTableSource.Context
165
): DynamicTableSource
166
167
/**
168
* Extracts watermark strategy from table source
169
* @param tableSource Table source with watermark information
170
* @return Optional watermark strategy
171
*/
172
def extractWatermarkStrategy(tableSource: TableSource[_]): Option[WatermarkStrategy[_]]
173
174
/**
175
* Validates partition information for partitioned sources
176
* @param partitions Partition specifications
177
* @param tableSchema Table schema with partition keys
178
* @throws ValidationException if partitions are invalid
179
*/
180
def validatePartitions(
181
partitions: java.util.List[CatalogPartitionSpec],
182
tableSchema: TableSchema
183
): Unit
184
185
/**
186
* Creates table source from legacy table factory
187
* @param properties Table properties from catalog
188
* @param isTemporary Whether this is a temporary table
189
* @return Legacy table source instance
190
*/
191
def createTableSource(
192
properties: java.util.Map[String, String],
193
isTemporary: Boolean
194
): TableSource[_]
195
}
196
```
197
198
### Schema Evolution Support
199
200
Support for schema evolution and compatibility checking:
201
202
```java { .api }
203
/**
204
* Schema evolution utilities for catalog integration
205
*/
206
public class SchemaEvolutionUtil {
207
208
/**
209
* Checks schema compatibility between versions
210
* @param oldSchema Previous schema version
211
* @param newSchema New schema version
212
* @return Compatibility check result
213
*/
214
public static CompatibilityResult checkCompatibility(
215
TableSchema oldSchema,
216
TableSchema newSchema
217
);
218
219
/**
220
* Evolves schema with backward compatibility
221
* @param currentSchema Current schema
222
* @param evolutionSpec Schema evolution specification
223
* @return Evolved schema
224
* @throws SchemaEvolutionException if evolution is not compatible
225
*/
226
public static TableSchema evolveSchema(
227
TableSchema currentSchema,
228
SchemaEvolutionSpec evolutionSpec
229
) throws SchemaEvolutionException;
230
231
/**
232
* Validates column changes for compatibility
233
* @param oldColumn Old column definition
234
* @param newColumn New column definition
235
* @return True if change is compatible
236
*/
237
public static boolean isColumnChangeCompatible(
238
TableColumn oldColumn,
239
TableColumn newColumn
240
);
241
242
/**
243
* Creates schema projection for subset of columns
244
* @param originalSchema Original table schema
245
* @param selectedFields Selected field names
246
* @return Projected schema with selected fields only
247
*/
248
public static TableSchema projectSchema(
249
TableSchema originalSchema,
250
List<String> selectedFields
251
);
252
}
253
```
254
255
## Catalog Types and Configuration
256
257
### Built-in Catalog Support
258
259
Support for various catalog implementations:
260
261
```java { .api }
262
/**
263
* Built-in catalog types supported by Flink
264
*/
265
public enum CatalogType {
266
GENERIC_IN_MEMORY, // GenericInMemoryCatalog - for testing and temporary use
267
HIVE, // HiveCatalog - Apache Hive Metastore integration
268
JDBC, // JdbcCatalog - JDBC-based catalog storage
269
ELASTICSEARCH, // ElasticsearchCatalog - for Elasticsearch integration
270
CUSTOM // Custom catalog implementations
271
}
272
273
/**
274
* Catalog configuration properties
275
*/
276
public class CatalogProperties {
277
public static final String CATALOG_TYPE = "type";
278
public static final String CATALOG_DEFAULT_DATABASE = "default-database";
279
public static final String CATALOG_HIVE_CONF_DIR = "hive-conf-dir";
280
public static final String CATALOG_HIVE_VERSION = "hive-version";
281
public static final String CATALOG_JDBC_URL = "default-database.jdbc.url";
282
public static final String CATALOG_JDBC_USERNAME = "default-database.jdbc.username";
283
public static final String CATALOG_JDBC_PASSWORD = "default-database.jdbc.password";
284
}
285
```
286
287
### Catalog Factory Integration
288
289
Integration with Flink's catalog factory system:
290
291
```java { .api }
292
/**
293
* Catalog factory for creating catalog instances
294
*/
295
public interface CatalogFactory extends Factory {
296
297
/**
298
* Creates catalog instance from configuration
299
* @param context Factory context with configuration
300
* @return Created catalog instance
301
*/
302
Catalog createCatalog(Context context);
303
304
/**
305
* Returns factory identifier
306
* @return Unique factory identifier
307
*/
308
String factoryIdentifier();
309
310
/**
311
* Returns required configuration options
312
* @return Set of required options
313
*/
314
Set<ConfigOption<?>> requiredOptions();
315
316
/**
317
* Returns optional configuration options
318
* @return Set of optional options
319
*/
320
Set<ConfigOption<?>> optionalOptions();
321
}
322
```
323
324
## Metadata Management
325
326
### Table Metadata Operations
327
328
Operations for managing table metadata through catalogs:
329
330
```java
331
// Create table through catalog
332
CatalogTable catalogTable = CatalogTable.of(
333
Schema.newBuilder()
334
.column("id", DataTypes.BIGINT())
335
.column("name", DataTypes.STRING())
336
.column("ts", DataTypes.TIMESTAMP(3))
337
.watermark("ts", "ts - INTERVAL '5' SECOND")
338
.primaryKey("id")
339
.build(),
340
"Table for user data",
341
Collections.emptyList(),
342
tableProperties
343
);
344
345
ObjectPath tablePath = new ObjectPath("default_database", "users");
346
catalog.createTable(tablePath, catalogTable, false);
347
348
// Query table metadata
349
CatalogBaseTable table = catalog.getTable(tablePath);
350
TableSchema schema = table.getUnresolvedSchema().resolve(typeFactory);
351
```
352
353
### Database Operations
354
355
Database-level operations through catalog integration:
356
357
```java
358
// Create database
359
CatalogDatabase database = new CatalogDatabaseImpl(
360
Collections.singletonMap("location", "/path/to/database"),
361
"User database for analytics"
362
);
363
364
catalog.createDatabase("analytics_db", database, false);
365
366
// List databases and tables
367
List<String> databases = catalog.listDatabases();
368
List<String> tables = catalog.listTables("analytics_db");
369
```
370
371
### Partition Management
372
373
Support for partitioned table management:
374
375
```java { .api }
376
/**
377
* Partition management through catalog integration
378
*/
379
public interface PartitionManager {
380
381
/**
382
* Creates partition in catalog table
383
* @param tablePath Table object path
384
* @param partitionSpec Partition specification
385
* @param partition Partition metadata
386
* @param ignoreIfExists Whether to ignore if partition exists
387
*/
388
void createPartition(
389
ObjectPath tablePath,
390
CatalogPartitionSpec partitionSpec,
391
CatalogPartition partition,
392
boolean ignoreIfExists
393
);
394
395
/**
396
* Lists partitions for table
397
* @param tablePath Table object path
398
* @return List of partition specifications
399
*/
400
List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath);
401
402
/**
403
* Gets partition metadata
404
* @param tablePath Table object path
405
* @param partitionSpec Partition specification
406
* @return Partition metadata
407
*/
408
CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec);
409
410
/**
411
* Drops partition from table
412
* @param tablePath Table object path
413
* @param partitionSpec Partition specification
414
* @param ignoreIfNotExists Whether to ignore if partition doesn't exist
415
*/
416
void dropPartition(
417
ObjectPath tablePath,
418
CatalogPartitionSpec partitionSpec,
419
boolean ignoreIfNotExists
420
);
421
}
422
```
423
424
## Advanced Catalog Features
425
426
### Statistics Integration
427
428
Integration with table and column statistics:
429
430
```java { .api }
431
/**
432
* Statistics management through catalog
433
*/
434
public interface StatisticsManager {
435
436
/**
437
* Gets table statistics from catalog
438
* @param objectPath Table path
439
* @return Table statistics or empty if not available
440
*/
441
Optional<CatalogTableStatistics> getTableStatistics(ObjectPath objectPath);
442
443
/**
444
* Gets column statistics from catalog
445
* @param objectPath Table path
446
* @return Column statistics or empty if not available
447
*/
448
Optional<CatalogColumnStatistics> getTableColumnStatistics(ObjectPath objectPath);
449
450
/**
451
* Updates table statistics in catalog
452
* @param objectPath Table path
453
* @param tableStatistics Updated table statistics
454
* @param ignoreIfNotExists Whether to ignore if table doesn't exist
455
*/
456
void alterTableStatistics(
457
ObjectPath objectPath,
458
CatalogTableStatistics tableStatistics,
459
boolean ignoreIfNotExists
460
);
461
462
/**
463
* Updates column statistics in catalog
464
* @param objectPath Table path
465
* @param columnStatistics Updated column statistics
466
* @param ignoreIfNotExists Whether to ignore if table doesn't exist
467
*/
468
void alterTableColumnStatistics(
469
ObjectPath objectPath,
470
CatalogColumnStatistics columnStatistics,
471
boolean ignoreIfNotExists
472
);
473
}
474
```
475
476
### View Management
477
478
Support for view creation and management:
479
480
```java
481
// Create view through catalog
482
CatalogView catalogView = CatalogView.of(
483
Schema.newBuilder()
484
.column("user_id", DataTypes.BIGINT())
485
.column("user_name", DataTypes.STRING())
486
.build(),
487
"Active users view",
488
"SELECT id as user_id, name as user_name FROM users WHERE active = true",
489
Collections.emptyList(),
490
"Comment for active users view"
491
);
492
493
ObjectPath viewPath = new ObjectPath("default_database", "active_users");
494
catalog.createTable(viewPath, catalogView, false);
495
```
496
497
## Error Handling and Validation
498
499
Common catalog integration error scenarios:
500
501
```java
502
try {
503
// Catalog operations that may fail
504
catalog.createTable(tablePath, catalogTable, false);
505
} catch (TableAlreadyExistException e) {
506
// Handle table already exists
507
} catch (DatabaseNotExistException e) {
508
// Handle database doesn't exist
509
} catch (CatalogException e) {
510
// Handle general catalog errors
511
}
512
513
// Function catalog error handling
514
try {
515
functionCatalog.registerTemporaryCatalogFunction(identifier, functionDef, false);
516
} catch (FunctionAlreadyExistException e) {
517
// Handle function already exists
518
} catch (ValidationException e) {
519
// Handle function validation errors
520
}
521
```
522
523
## Configuration Examples
524
525
### Hive Catalog Configuration
526
527
```java
528
Map<String, String> hiveProperties = new HashMap<>();
529
hiveProperties.put(CatalogProperties.CATALOG_TYPE, "hive");
530
hiveProperties.put(CatalogProperties.CATALOG_DEFAULT_DATABASE, "default");
531
hiveProperties.put(CatalogProperties.CATALOG_HIVE_CONF_DIR, "/opt/hive/conf");
532
hiveProperties.put(CatalogProperties.CATALOG_HIVE_VERSION, "3.1.2");
533
534
HiveCatalog hiveCatalog = new HiveCatalog("hive_catalog", "default", hiveConfDir);
535
catalogManager.registerCatalog("hive_catalog", hiveCatalog);
536
catalogManager.setCurrentCatalog("hive_catalog");
537
```
538
539
### JDBC Catalog Configuration
540
541
```java
542
Map<String, String> jdbcProperties = new HashMap<>();
543
jdbcProperties.put(CatalogProperties.CATALOG_TYPE, "jdbc");
544
jdbcProperties.put(CatalogProperties.CATALOG_DEFAULT_DATABASE, "analytics");
545
jdbcProperties.put(CatalogProperties.CATALOG_JDBC_URL, "jdbc:postgresql://localhost:5432/metadata");
546
jdbcProperties.put(CatalogProperties.CATALOG_JDBC_USERNAME, "catalog_user");
547
jdbcProperties.put(CatalogProperties.CATALOG_JDBC_PASSWORD, "catalog_password");
548
549
JdbcCatalog jdbcCatalog = new JdbcCatalog("jdbc_catalog", "analytics", jdbcUrl, username, password);
550
catalogManager.registerCatalog("jdbc_catalog", jdbcCatalog);
551
```