This document covers the rich type definitions, schema inference, and catalog management capabilities in Apache Flink Table Uber Blink.
class DataTypes {
// Numeric types
static DataType TINYINT();
static DataType SMALLINT();
static DataType INT();
static DataType BIGINT();
static DataType FLOAT();
static DataType DOUBLE();
static DataType DECIMAL(int precision, int scale);
// String types
static DataType CHAR(int length);
static DataType VARCHAR(int length);
static DataType STRING();
// Binary types
static DataType BINARY(int length);
static DataType VARBINARY(int length);
static DataType BYTES();
// Boolean type
static DataType BOOLEAN();
// Date/Time types
static DataType DATE();
static DataType TIME();
static DataType TIME(int precision);
static DataType TIMESTAMP();
static DataType TIMESTAMP(int precision);
static DataType TIMESTAMP_WITH_TIME_ZONE();
static DataType TIMESTAMP_WITH_TIME_ZONE(int precision);
static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE();
static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE(int precision);
static DataType INTERVAL(DataTypes.Resolution resolution);
static DataType INTERVAL(DataTypes.Day day);
static DataType INTERVAL(DataTypes.Year year);
// Complex types
static DataType ARRAY(DataType elementType);
static DataType MAP(DataType keyType, DataType valueType);
static DataType ROW(DataTypes.Field... fields);
static DataType MULTISET(DataType elementType);
static DataType RAW(Class<?> clazz, TypeSerializer<?> serializer);
// Field factory for ROW types
static DataTypes.Field FIELD(String name, DataType dataType);
static DataTypes.Field FIELD(String name, DataType dataType, String description);
}// Array type
DataType arrayType = DataTypes.ARRAY(DataTypes.STRING());
// Map type
DataType mapType = DataTypes.MAP(DataTypes.STRING(), DataTypes.INT());
// Row type
DataType rowType = DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("address", DataTypes.ROW(
DataTypes.FIELD("street", DataTypes.STRING()),
DataTypes.FIELD("city", DataTypes.STRING())
))
);class Schema {
static Schema.Builder newBuilder();
interface Builder {
Builder column(String columnName, DataType dataType);
Builder column(String columnName, AbstractDataType<?> abstractDataType);
Builder columnByExpression(String columnName, String expression);
Builder columnByMetadata(String columnName, DataType dataType);
Builder columnByMetadata(String columnName, DataType dataType, String metadataKey, boolean isVirtual);
Builder watermark(String columnName, String watermarkExpression);
Builder primaryKey(String... columnNames);
Builder primaryKeyNamed(String constraintName, String... columnNames);
Schema build();
}
}Usage:
Schema schema = Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("email", DataTypes.STRING())
.column("created_at", DataTypes.TIMESTAMP(3))
.column("metadata_timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
.columnByMetadata("metadata_timestamp", DataTypes.TIMESTAMP(3), "timestamp")
.watermark("created_at", "created_at - INTERVAL '5' SECOND")
.primaryKey("user_id")
.build();class ResolvedSchema {
List<Column> getColumns();
List<String> getColumnNames();
List<DataType> getColumnDataTypes();
List<Column> getPhysicalColumns();
List<Column> getComputedColumns();
List<Column> getMetadataColumns();
Optional<UniqueConstraint> getPrimaryKey();
List<UniqueConstraint> getUniqueConstraints();
List<WatermarkSpec> getWatermarkSpecs();
Column getColumn(int index);
Optional<Column> getColumn(String name);
int getColumnIndex(String name);
}abstract class Column {
String getName();
DataType getDataType();
String getComment();
boolean isPhysical();
boolean isComputed();
boolean isMetadata();
}
class Column.PhysicalColumn extends Column;
class Column.ComputedColumn extends Column {
ResolvedExpression getExpression();
}
class Column.MetadataColumn extends Column {
String getMetadataKey();
boolean isVirtual();
}class TypeInference {
static DataType inferDataType(Object object);
static DataType inferDataType(Class<?> clazz);
static DataType inferDataType(TypeInformation<?> typeInfo);
}Usage:
// Infer from POJO class
DataType userType = TypeInference.inferDataType(User.class);
// Infer from TypeInformation
TypeInformation<Tuple2<String, Integer>> tupleTypeInfo = Types.TUPLE(Types.STRING, Types.INT);
DataType tupleType = TypeInference.inferDataType(tupleTypeInfo);class DataTypes {
static <T> DataType of(Class<T> clazz);
static <T> DataType of(TypeInformation<T> typeInformation);
static DataType RAW(Class<?> clazz, TypeSerializer<?> serializer);
}interface Catalog {
// Database operations
boolean databaseExists(String databaseName);
void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);
void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);
void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists);
List<String> listDatabases();
CatalogDatabase getDatabase(String databaseName);
// Table operations
boolean tableExists(ObjectPath tablePath);
void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists);
List<String> listTables(String databaseName);
CatalogBaseTable getTable(ObjectPath tablePath);
// Function operations
boolean functionExists(ObjectPath functionPath);
void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists);
void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists);
void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists);
List<String> listFunctions(String databaseName);
CatalogFunction getFunction(ObjectPath functionPath);
}interface TableEnvironment {
void registerCatalog(String catalogName, Catalog catalog);
Optional<Catalog> getCatalog(String catalogName);
void useCatalog(String catalogName);
String getCurrentCatalog();
void useDatabase(String databaseName);
String getCurrentDatabase();
String[] listCatalogs();
String[] listDatabases();
String[] listTables();
String[] listViews();
String[] listUserDefinedFunctions();
String[] listFunctions();
String[] listModules();
}Usage:
// Register catalogs
HiveCatalog hiveCatalog = new HiveCatalog("myhive", "default", "/path/to/hive-conf");
tEnv.registerCatalog("myhive", hiveCatalog);
JdbcCatalog pgCatalog = new PostgresCatalog("mypg", "testdb", "username", "password", "jdbc:postgresql://localhost:5432/testdb");
tEnv.registerCatalog("mypg", pgCatalog);
// Switch contexts
tEnv.useCatalog("myhive");
tEnv.useDatabase("production");
// List metadata
String[] catalogs = tEnv.listCatalogs();
String[] databases = tEnv.listDatabases();
String[] tables = tEnv.listTables();class GenericInMemoryCatalog implements Catalog {
GenericInMemoryCatalog(String name);
GenericInMemoryCatalog(String name, String defaultDatabase);
}class HiveCatalog implements Catalog {
HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir);
HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hiveVersion);
HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf);
}class JdbcCatalog implements Catalog {
JdbcCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);
}
class PostgresCatalog extends JdbcCatalog;
class MySqlCatalog extends JdbcCatalog;class TableDescriptor {
static TableDescriptor forConnector(String connector);
TableDescriptor schema(Schema schema);
TableDescriptor option(String key, String value);
TableDescriptor option(ConfigOption<String> option, String value);
TableDescriptor partitionedBy(String... partitionKeys);
TableDescriptor comment(String comment);
TableDescriptor like(String likeTablePath);
TableDescriptor like(String likeTablePath, LikeOption... likeOptions);
}Usage:
TableDescriptor descriptor = TableDescriptor
.forConnector("kafka")
.schema(Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("message", DataTypes.STRING())
.column("ts", DataTypes.TIMESTAMP(3))
.watermark("ts", "ts - INTERVAL '5' SECOND")
.build())
.option("topic", "user-events")
.option("properties.bootstrap.servers", "localhost:9092")
.option("format", "json")
.comment("User event stream from Kafka");
tEnv.createTable("user_events", descriptor);// Primitive mappings
int -> DataTypes.INT()
long -> DataTypes.BIGINT()
double -> DataTypes.DOUBLE()
boolean -> DataTypes.BOOLEAN()
String -> DataTypes.STRING()
// Object mappings
Integer -> DataTypes.INT()
Long -> DataTypes.BIGINT()
Double -> DataTypes.DOUBLE()
Boolean -> DataTypes.BOOLEAN()
BigDecimal -> DataTypes.DECIMAL()
LocalDate -> DataTypes.DATE()
LocalTime -> DataTypes.TIME()
LocalDateTime -> DataTypes.TIMESTAMP()
Instant -> DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
// Collection mappings
T[] -> DataTypes.ARRAY(...)
List<T> -> DataTypes.ARRAY(...)
Map<K,V> -> DataTypes.MAP(...)
Row -> DataTypes.ROW(...)// Handle schema changes
Schema oldSchema = Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.build();
Schema newSchema = Schema.newBuilder()
.column("id", DataTypes.BIGINT()) // Type widening
.column("name", DataTypes.STRING())
.column("email", DataTypes.STRING()) // New column
.build();class TableConfigOptions {
ConfigOption<String> TABLE_LOCAL_TIME_ZONE;
ConfigOption<Duration> TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
ConfigOption<Boolean> TABLE_EXEC_LEGACY_TYPE_SYSTEM;
}abstract class DataType {
LogicalType getLogicalType();
Class<?> getConversionClass();
boolean isNullable();
DataType nullable();
DataType notNull();
DataType bridgedTo(Class<?> newConversionClass);
}
abstract class LogicalType {
LogicalTypeRoot getTypeRoot();
String asSummaryString();
List<LogicalType> getChildren();
boolean isNullable();
LogicalType copy(boolean isNullable);
}
class ObjectPath {
ObjectPath(String databaseName, String objectName);
String getDatabaseName();
String getObjectName();
String getFullName();
}
interface CatalogBaseTable {
Map<String, String> getOptions();
TableSchema getSchema(); // Deprecated
String getComment();
CatalogBaseTable copy(Map<String, String> options);
Optional<String> getDescription();
Optional<String> getDetailedDescription();
}