Apache Flink's Table API Planner Blink module providing sophisticated SQL and Table API query planning and execution engine with advanced query optimization, code generation, and comprehensive support for both streaming and batch workloads.
—
The catalog integration system provides seamless integration with Flink's catalog system for metadata management, table registration, schema information, and function discovery. It supports multiple catalog types, schema evolution, and metadata persistence across different storage systems.
Integration with Flink's central catalog management system for metadata operations:
/**
* Catalog manager integration (from flink-table-api-java)
* Provides centralized catalog and database management
*/
public interface CatalogManager {
/**
* Registers a catalog with the catalog manager
* @param catalogName Name of the catalog
* @param catalog Catalog instance to register
*/
void registerCatalog(String catalogName, Catalog catalog);
/**
* Gets a registered catalog by name
* @param catalogName Name of the catalog
* @return Optional catalog instance
*/
Optional<Catalog> getCatalog(String catalogName);
/**
* Lists all registered catalog names
* @return Set of catalog names
*/
Set<String> listCatalogs();
/**
* Sets the current catalog
* @param catalogName Name of catalog to set as current
*/
void setCurrentCatalog(String catalogName);
/**
* Gets the current catalog name
* @return Current catalog name
*/
String getCurrentCatalog();
/**
* Sets the current database within current catalog
* @param databaseName Database name to set as current
*/
void setCurrentDatabase(String databaseName);
/**
* Gets the current database name
* @return Current database name
*/
String getCurrentDatabase();
/**
* Resolves object identifier to qualified name
* @param identifier Object identifier to resolve
* @return Fully qualified object identifier
*/
ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);
}Integration with Flink's function catalog for user-defined function management:
/**
* Function catalog integration (from flink-table-api-java)
* Manages function registration, lookup, and resolution
*/
public interface FunctionCatalog {
/**
* Registers a temporary system function
* @param name Function name
* @param functionDefinition Function definition
*/
void registerTemporarySystemFunction(String name, FunctionDefinition functionDefinition);
/**
* Registers a temporary catalog function
* @param objectIdentifier Function identifier with catalog/database/name
* @param functionDefinition Function definition
* @param ignoreIfExists Whether to ignore if function already exists
*/
void registerTemporaryCatalogFunction(
ObjectIdentifier objectIdentifier,
FunctionDefinition functionDefinition,
boolean ignoreIfExists
);
/**
* Drops a temporary system function
* @param name Function name to drop
* @return True if function was dropped, false if not found
*/
boolean dropTemporarySystemFunction(String name);
/**
* Drops a temporary catalog function
* @param identifier Function identifier
* @param ignoreIfNotExist Whether to ignore if function doesn't exist
* @return True if function was dropped
*/
boolean dropTemporaryCatalogFunction(ObjectIdentifier identifier, boolean ignoreIfNotExist);
/**
* Looks up function by identifier
* @param objectIdentifier Function identifier
* @return Optional function lookup result
*/
Optional<FunctionLookup.Result> lookupFunction(ObjectIdentifier objectIdentifier);
/**
* Lists user-defined functions in given catalog and database
* @param catalogName Catalog name
* @param databaseName Database name
* @return Array of function names
*/
String[] listFunctions(String catalogName, String databaseName);
/**
* Lists temporary functions
* @return Array of temporary function names
*/
String[] listTemporaryFunctions();
}Integration with table sources and sinks through the catalog system:
/**
* Table source utilities for catalog integration
*/
object TableSourceUtil {
/**
* Validates table source capabilities and configuration
* @param tableSource Table source to validate
* @param schema Expected table schema
* @throws ValidationException if source is invalid
*/
def validateTableSource(tableSource: TableSource[_], schema: TableSchema): Unit
/**
* Creates dynamic table source from catalog table
* @param catalogTable Catalog table definition
* @param context Dynamic table context
* @return Dynamic table source instance
*/
def createDynamicTableSource(
catalogTable: CatalogBaseTable,
context: DynamicTableSource.Context
): DynamicTableSource
/**
* Extracts watermark strategy from table source
* @param tableSource Table source with watermark information
* @return Optional watermark strategy
*/
def extractWatermarkStrategy(tableSource: TableSource[_]): Option[WatermarkStrategy[_]]
/**
* Validates partition information for partitioned sources
* @param partitions Partition specifications
* @param tableSchema Table schema with partition keys
* @throws ValidationException if partitions are invalid
*/
def validatePartitions(
partitions: java.util.List[CatalogPartitionSpec],
tableSchema: TableSchema
): Unit
/**
* Creates table source from legacy table factory
* @param properties Table properties from catalog
* @param isTemporary Whether this is a temporary table
* @return Legacy table source instance
*/
def createTableSource(
properties: java.util.Map[String, String],
isTemporary: Boolean
): TableSource[_]
}Support for schema evolution and compatibility checking:
/**
* Schema evolution utilities for catalog integration
*/
public class SchemaEvolutionUtil {
/**
* Checks schema compatibility between versions
* @param oldSchema Previous schema version
* @param newSchema New schema version
* @return Compatibility check result
*/
public static CompatibilityResult checkCompatibility(
TableSchema oldSchema,
TableSchema newSchema
);
/**
* Evolves schema with backward compatibility
* @param currentSchema Current schema
* @param evolutionSpec Schema evolution specification
* @return Evolved schema
* @throws SchemaEvolutionException if evolution is not compatible
*/
public static TableSchema evolveSchema(
TableSchema currentSchema,
SchemaEvolutionSpec evolutionSpec
) throws SchemaEvolutionException;
/**
* Validates column changes for compatibility
* @param oldColumn Old column definition
* @param newColumn New column definition
* @return True if change is compatible
*/
public static boolean isColumnChangeCompatible(
TableColumn oldColumn,
TableColumn newColumn
);
/**
* Creates schema projection for subset of columns
* @param originalSchema Original table schema
* @param selectedFields Selected field names
* @return Projected schema with selected fields only
*/
public static TableSchema projectSchema(
TableSchema originalSchema,
List<String> selectedFields
);
}Support for various catalog implementations:
/**
* Built-in catalog types supported by Flink
*/
public enum CatalogType {
GENERIC_IN_MEMORY, // GenericInMemoryCatalog - for testing and temporary use
HIVE, // HiveCatalog - Apache Hive Metastore integration
JDBC, // JdbcCatalog - JDBC-based catalog storage
ELASTICSEARCH, // ElasticsearchCatalog - for Elasticsearch integration
CUSTOM // Custom catalog implementations
}
/**
* Catalog configuration properties
*/
public class CatalogProperties {
public static final String CATALOG_TYPE = "type";
public static final String CATALOG_DEFAULT_DATABASE = "default-database";
public static final String CATALOG_HIVE_CONF_DIR = "hive-conf-dir";
public static final String CATALOG_HIVE_VERSION = "hive-version";
public static final String CATALOG_JDBC_URL = "default-database.jdbc.url";
public static final String CATALOG_JDBC_USERNAME = "default-database.jdbc.username";
public static final String CATALOG_JDBC_PASSWORD = "default-database.jdbc.password";
}Integration with Flink's catalog factory system:
/**
* Catalog factory for creating catalog instances
*/
public interface CatalogFactory extends Factory {
/**
* Creates catalog instance from configuration
* @param context Factory context with configuration
* @return Created catalog instance
*/
Catalog createCatalog(Context context);
/**
* Returns factory identifier
* @return Unique factory identifier
*/
String factoryIdentifier();
/**
* Returns required configuration options
* @return Set of required options
*/
Set<ConfigOption<?>> requiredOptions();
/**
* Returns optional configuration options
* @return Set of optional options
*/
Set<ConfigOption<?>> optionalOptions();
}Operations for managing table metadata through catalogs:
// Create table through catalog
CatalogTable catalogTable = CatalogTable.of(
Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("ts", DataTypes.TIMESTAMP(3))
.watermark("ts", "ts - INTERVAL '5' SECOND")
.primaryKey("id")
.build(),
"Table for user data",
Collections.emptyList(),
tableProperties
);
ObjectPath tablePath = new ObjectPath("default_database", "users");
catalog.createTable(tablePath, catalogTable, false);
// Query table metadata
CatalogBaseTable table = catalog.getTable(tablePath);
TableSchema schema = table.getUnresolvedSchema().resolve(typeFactory);Database-level operations through catalog integration:
// Create database
CatalogDatabase database = new CatalogDatabaseImpl(
Collections.singletonMap("location", "/path/to/database"),
"User database for analytics"
);
catalog.createDatabase("analytics_db", database, false);
// List databases and tables
List<String> databases = catalog.listDatabases();
List<String> tables = catalog.listTables("analytics_db");Support for partitioned table management:
/**
* Partition management through catalog integration
*/
public interface PartitionManager {
/**
* Creates partition in catalog table
* @param tablePath Table object path
* @param partitionSpec Partition specification
* @param partition Partition metadata
* @param ignoreIfExists Whether to ignore if partition exists
*/
void createPartition(
ObjectPath tablePath,
CatalogPartitionSpec partitionSpec,
CatalogPartition partition,
boolean ignoreIfExists
);
/**
* Lists partitions for table
* @param tablePath Table object path
* @return List of partition specifications
*/
List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath);
/**
* Gets partition metadata
* @param tablePath Table object path
* @param partitionSpec Partition specification
* @return Partition metadata
*/
CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec);
/**
* Drops partition from table
* @param tablePath Table object path
* @param partitionSpec Partition specification
* @param ignoreIfNotExists Whether to ignore if partition doesn't exist
*/
void dropPartition(
ObjectPath tablePath,
CatalogPartitionSpec partitionSpec,
boolean ignoreIfNotExists
);
}Integration with table and column statistics:
/**
* Statistics management through catalog
*/
public interface StatisticsManager {
/**
* Gets table statistics from catalog
* @param objectPath Table path
* @return Table statistics or empty if not available
*/
Optional<CatalogTableStatistics> getTableStatistics(ObjectPath objectPath);
/**
* Gets column statistics from catalog
* @param objectPath Table path
* @return Column statistics or empty if not available
*/
Optional<CatalogColumnStatistics> getTableColumnStatistics(ObjectPath objectPath);
/**
* Updates table statistics in catalog
* @param objectPath Table path
* @param tableStatistics Updated table statistics
* @param ignoreIfNotExists Whether to ignore if table doesn't exist
*/
void alterTableStatistics(
ObjectPath objectPath,
CatalogTableStatistics tableStatistics,
boolean ignoreIfNotExists
);
/**
* Updates column statistics in catalog
* @param objectPath Table path
* @param columnStatistics Updated column statistics
* @param ignoreIfNotExists Whether to ignore if table doesn't exist
*/
void alterTableColumnStatistics(
ObjectPath objectPath,
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists
);
}Support for view creation and management:
// Create view through catalog
CatalogView catalogView = CatalogView.of(
Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("user_name", DataTypes.STRING())
.build(),
"Active users view",
"SELECT id as user_id, name as user_name FROM users WHERE active = true",
Collections.emptyList(),
"Comment for active users view"
);
ObjectPath viewPath = new ObjectPath("default_database", "active_users");
catalog.createTable(viewPath, catalogView, false);Common catalog integration error scenarios:
try {
// Catalog operations that may fail
catalog.createTable(tablePath, catalogTable, false);
} catch (TableAlreadyExistException e) {
// Handle table already exists
} catch (DatabaseNotExistException e) {
// Handle database doesn't exist
} catch (CatalogException e) {
// Handle general catalog errors
}
// Function catalog error handling
try {
functionCatalog.registerTemporaryCatalogFunction(identifier, functionDef, false);
} catch (FunctionAlreadyExistException e) {
// Handle function already exists
} catch (ValidationException e) {
// Handle function validation errors
}Map<String, String> hiveProperties = new HashMap<>();
hiveProperties.put(CatalogProperties.CATALOG_TYPE, "hive");
hiveProperties.put(CatalogProperties.CATALOG_DEFAULT_DATABASE, "default");
hiveProperties.put(CatalogProperties.CATALOG_HIVE_CONF_DIR, "/opt/hive/conf");
hiveProperties.put(CatalogProperties.CATALOG_HIVE_VERSION, "3.1.2");
HiveCatalog hiveCatalog = new HiveCatalog("hive_catalog", "default", hiveConfDir);
catalogManager.registerCatalog("hive_catalog", hiveCatalog);
catalogManager.setCurrentCatalog("hive_catalog");Map<String, String> jdbcProperties = new HashMap<>();
jdbcProperties.put(CatalogProperties.CATALOG_TYPE, "jdbc");
jdbcProperties.put(CatalogProperties.CATALOG_DEFAULT_DATABASE, "analytics");
jdbcProperties.put(CatalogProperties.CATALOG_JDBC_URL, "jdbc:postgresql://localhost:5432/metadata");
jdbcProperties.put(CatalogProperties.CATALOG_JDBC_USERNAME, "catalog_user");
jdbcProperties.put(CatalogProperties.CATALOG_JDBC_PASSWORD, "catalog_password");
JdbcCatalog jdbcCatalog = new JdbcCatalog("jdbc_catalog", "analytics", jdbcUrl, username, password);
catalogManager.registerCatalog("jdbc_catalog", jdbcCatalog);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-planner-blink-2-12