Comprehensive Table/SQL distribution for Apache Flink with Blink planner for optimized table processing in both batch and streaming modes.
This document covers the rich type definitions, schema inference, and catalog management capabilities in Apache Flink Table Uber Blink.
class DataTypes {
// Numeric types
static DataType TINYINT();
static DataType SMALLINT();
static DataType INT();
static DataType BIGINT();
static DataType FLOAT();
static DataType DOUBLE();
static DataType DECIMAL(int precision, int scale);
// String types
static DataType CHAR(int length);
static DataType VARCHAR(int length);
static DataType STRING();
// Binary types
static DataType BINARY(int length);
static DataType VARBINARY(int length);
static DataType BYTES();
// Boolean type
static DataType BOOLEAN();
// Date/Time types
static DataType DATE();
static DataType TIME();
static DataType TIME(int precision);
static DataType TIMESTAMP();
static DataType TIMESTAMP(int precision);
static DataType TIMESTAMP_WITH_TIME_ZONE();
static DataType TIMESTAMP_WITH_TIME_ZONE(int precision);
static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE();
static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE(int precision);
static DataType INTERVAL(DataTypes.Resolution resolution);
static DataType INTERVAL(DataTypes.Day day);
static DataType INTERVAL(DataTypes.Year year);
// Complex types
static DataType ARRAY(DataType elementType);
static DataType MAP(DataType keyType, DataType valueType);
static DataType ROW(DataTypes.Field... fields);
static DataType MULTISET(DataType elementType);
static DataType RAW(Class<?> clazz, TypeSerializer<?> serializer);
// Field factory for ROW types
static DataTypes.Field FIELD(String name, DataType dataType);
static DataTypes.Field FIELD(String name, DataType dataType, String description);
}// Array type
DataType arrayType = DataTypes.ARRAY(DataTypes.STRING());
// Map type
DataType mapType = DataTypes.MAP(DataTypes.STRING(), DataTypes.INT());
// Row type
DataType rowType = DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("address", DataTypes.ROW(
DataTypes.FIELD("street", DataTypes.STRING()),
DataTypes.FIELD("city", DataTypes.STRING())
))
);class Schema {
static Schema.Builder newBuilder();
interface Builder {
Builder column(String columnName, DataType dataType);
Builder column(String columnName, AbstractDataType<?> abstractDataType);
Builder columnByExpression(String columnName, String expression);
Builder columnByMetadata(String columnName, DataType dataType);
Builder columnByMetadata(String columnName, DataType dataType, String metadataKey, boolean isVirtual);
Builder watermark(String columnName, String watermarkExpression);
Builder primaryKey(String... columnNames);
Builder primaryKeyNamed(String constraintName, String... columnNames);
Schema build();
}
}Usage:
Schema schema = Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("email", DataTypes.STRING())
.column("created_at", DataTypes.TIMESTAMP(3))
.column("metadata_timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
.columnByMetadata("metadata_timestamp", DataTypes.TIMESTAMP(3), "timestamp")
.watermark("created_at", "created_at - INTERVAL '5' SECOND")
.primaryKey("user_id")
.build();class ResolvedSchema {
List<Column> getColumns();
List<String> getColumnNames();
List<DataType> getColumnDataTypes();
List<Column> getPhysicalColumns();
List<Column> getComputedColumns();
List<Column> getMetadataColumns();
Optional<UniqueConstraint> getPrimaryKey();
List<UniqueConstraint> getUniqueConstraints();
List<WatermarkSpec> getWatermarkSpecs();
Column getColumn(int index);
Optional<Column> getColumn(String name);
int getColumnIndex(String name);
}abstract class Column {
String getName();
DataType getDataType();
String getComment();
boolean isPhysical();
boolean isComputed();
boolean isMetadata();
}
class Column.PhysicalColumn extends Column;
class Column.ComputedColumn extends Column {
ResolvedExpression getExpression();
}
class Column.MetadataColumn extends Column {
String getMetadataKey();
boolean isVirtual();
}class TypeInference {
static DataType inferDataType(Object object);
static DataType inferDataType(Class<?> clazz);
static DataType inferDataType(TypeInformation<?> typeInfo);
}Usage:
// Infer from POJO class
DataType userType = TypeInference.inferDataType(User.class);
// Infer from TypeInformation
TypeInformation<Tuple2<String, Integer>> tupleTypeInfo = Types.TUPLE(Types.STRING, Types.INT);
DataType tupleType = TypeInference.inferDataType(tupleTypeInfo);class DataTypes {
static <T> DataType of(Class<T> clazz);
static <T> DataType of(TypeInformation<T> typeInformation);
static DataType RAW(Class<?> clazz, TypeSerializer<?> serializer);
}interface Catalog {
// Database operations
boolean databaseExists(String databaseName);
void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);
void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);
void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists);
List<String> listDatabases();
CatalogDatabase getDatabase(String databaseName);
// Table operations
boolean tableExists(ObjectPath tablePath);
void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists);
List<String> listTables(String databaseName);
CatalogBaseTable getTable(ObjectPath tablePath);
// Function operations
boolean functionExists(ObjectPath functionPath);
void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists);
void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists);
void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists);
List<String> listFunctions(String databaseName);
CatalogFunction getFunction(ObjectPath functionPath);
}interface TableEnvironment {
void registerCatalog(String catalogName, Catalog catalog);
Optional<Catalog> getCatalog(String catalogName);
void useCatalog(String catalogName);
String getCurrentCatalog();
void useDatabase(String databaseName);
String getCurrentDatabase();
String[] listCatalogs();
String[] listDatabases();
String[] listTables();
String[] listViews();
String[] listUserDefinedFunctions();
String[] listFunctions();
String[] listModules();
}Usage:
// Register catalogs
HiveCatalog hiveCatalog = new HiveCatalog("myhive", "default", "/path/to/hive-conf");
tEnv.registerCatalog("myhive", hiveCatalog);
JdbcCatalog pgCatalog = new PostgresCatalog("mypg", "testdb", "username", "password", "jdbc:postgresql://localhost:5432/testdb");
tEnv.registerCatalog("mypg", pgCatalog);
// Switch contexts
tEnv.useCatalog("myhive");
tEnv.useDatabase("production");
// List metadata
String[] catalogs = tEnv.listCatalogs();
String[] databases = tEnv.listDatabases();
String[] tables = tEnv.listTables();class GenericInMemoryCatalog implements Catalog {
GenericInMemoryCatalog(String name);
GenericInMemoryCatalog(String name, String defaultDatabase);
}class HiveCatalog implements Catalog {
HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir);
HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hiveVersion);
HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf);
}class JdbcCatalog implements Catalog {
JdbcCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);
}
class PostgresCatalog extends JdbcCatalog;
class MySqlCatalog extends JdbcCatalog;class TableDescriptor {
static TableDescriptor forConnector(String connector);
TableDescriptor schema(Schema schema);
TableDescriptor option(String key, String value);
TableDescriptor option(ConfigOption<String> option, String value);
TableDescriptor partitionedBy(String... partitionKeys);
TableDescriptor comment(String comment);
TableDescriptor like(String likeTablePath);
TableDescriptor like(String likeTablePath, LikeOption... likeOptions);
}Usage:
TableDescriptor descriptor = TableDescriptor
.forConnector("kafka")
.schema(Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("message", DataTypes.STRING())
.column("ts", DataTypes.TIMESTAMP(3))
.watermark("ts", "ts - INTERVAL '5' SECOND")
.build())
.option("topic", "user-events")
.option("properties.bootstrap.servers", "localhost:9092")
.option("format", "json")
.comment("User event stream from Kafka");
tEnv.createTable("user_events", descriptor);// Primitive mappings
int -> DataTypes.INT()
long -> DataTypes.BIGINT()
double -> DataTypes.DOUBLE()
boolean -> DataTypes.BOOLEAN()
String -> DataTypes.STRING()
// Object mappings
Integer -> DataTypes.INT()
Long -> DataTypes.BIGINT()
Double -> DataTypes.DOUBLE()
Boolean -> DataTypes.BOOLEAN()
BigDecimal -> DataTypes.DECIMAL()
LocalDate -> DataTypes.DATE()
LocalTime -> DataTypes.TIME()
LocalDateTime -> DataTypes.TIMESTAMP()
Instant -> DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
// Collection mappings
T[] -> DataTypes.ARRAY(...)
List<T> -> DataTypes.ARRAY(...)
Map<K,V> -> DataTypes.MAP(...)
Row -> DataTypes.ROW(...)// Handle schema changes
Schema oldSchema = Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.build();
Schema newSchema = Schema.newBuilder()
.column("id", DataTypes.BIGINT()) // Type widening
.column("name", DataTypes.STRING())
.column("email", DataTypes.STRING()) // New column
.build();class TableConfigOptions {
ConfigOption<String> TABLE_LOCAL_TIME_ZONE;
ConfigOption<Duration> TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
ConfigOption<Boolean> TABLE_EXEC_LEGACY_TYPE_SYSTEM;
}abstract class DataType {
LogicalType getLogicalType();
Class<?> getConversionClass();
boolean isNullable();
DataType nullable();
DataType notNull();
DataType bridgedTo(Class<?> newConversionClass);
}
abstract class LogicalType {
LogicalTypeRoot getTypeRoot();
String asSummaryString();
List<LogicalType> getChildren();
boolean isNullable();
LogicalType copy(boolean isNullable);
}
class ObjectPath {
ObjectPath(String databaseName, String objectName);
String getDatabaseName();
String getObjectName();
String getFullName();
}
interface CatalogBaseTable {
Map<String, String> getOptions();
TableSchema getSchema(); // Deprecated
String getComment();
CatalogBaseTable copy(Map<String, String> options);
Optional<String> getDescription();
Optional<String> getDetailedDescription();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-uber-blink-2-12