or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-management.mdcomplex-event-processing.mdcore-table-operations.mddatastream-integration.mdindex.mdsql-processing.mdtype-system.mduser-defined-functions.mdwindow-operations.md
tile.json

type-system.mddocs/

Type System and Schema Management

This document covers the rich type definitions, schema inference, and catalog management capabilities in Apache Flink Table Uber Blink.

Data Types

Basic Data Types

class DataTypes {
    // Numeric types
    static DataType TINYINT();
    static DataType SMALLINT();
    static DataType INT();
    static DataType BIGINT();
    static DataType FLOAT();
    static DataType DOUBLE();
    static DataType DECIMAL(int precision, int scale);
    
    // String types
    static DataType CHAR(int length);
    static DataType VARCHAR(int length);
    static DataType STRING();
    
    // Binary types
    static DataType BINARY(int length);
    static DataType VARBINARY(int length);
    static DataType BYTES();
    
    // Boolean type
    static DataType BOOLEAN();
    
    // Date/Time types
    static DataType DATE();
    static DataType TIME();
    static DataType TIME(int precision);
    static DataType TIMESTAMP();
    static DataType TIMESTAMP(int precision);
    static DataType TIMESTAMP_WITH_TIME_ZONE();
    static DataType TIMESTAMP_WITH_TIME_ZONE(int precision);
    static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE();
    static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE(int precision);
    static DataType INTERVAL(DataTypes.Resolution resolution);
    static DataType INTERVAL(DataTypes.Day day);
    static DataType INTERVAL(DataTypes.Year year);
    
    // Complex types
    static DataType ARRAY(DataType elementType);
    static DataType MAP(DataType keyType, DataType valueType);
    static DataType ROW(DataTypes.Field... fields);
    static DataType MULTISET(DataType elementType);
    static DataType RAW(Class<?> clazz, TypeSerializer<?> serializer);
    
    // Field factory for ROW types
    static DataTypes.Field FIELD(String name, DataType dataType);
    static DataTypes.Field FIELD(String name, DataType dataType, String description);
}

Complex Type Construction

// Array type
DataType arrayType = DataTypes.ARRAY(DataTypes.STRING());

// Map type  
DataType mapType = DataTypes.MAP(DataTypes.STRING(), DataTypes.INT());

// Row type
DataType rowType = DataTypes.ROW(
    DataTypes.FIELD("name", DataTypes.STRING()),
    DataTypes.FIELD("age", DataTypes.INT()),
    DataTypes.FIELD("address", DataTypes.ROW(
        DataTypes.FIELD("street", DataTypes.STRING()),
        DataTypes.FIELD("city", DataTypes.STRING())
    ))
);

Schema Definition

Schema Builder

class Schema {
    static Schema.Builder newBuilder();
    
    interface Builder {
        Builder column(String columnName, DataType dataType);
        Builder column(String columnName, AbstractDataType<?> abstractDataType);
        Builder columnByExpression(String columnName, String expression);
        Builder columnByMetadata(String columnName, DataType dataType);
        Builder columnByMetadata(String columnName, DataType dataType, String metadataKey, boolean isVirtual);
        Builder watermark(String columnName, String watermarkExpression);
        Builder primaryKey(String... columnNames);
        Builder primaryKeyNamed(String constraintName, String... columnNames);  
        Schema build();
    }
}

Usage:

Schema schema = Schema.newBuilder()
    .column("user_id", DataTypes.BIGINT())
    .column("name", DataTypes.STRING())
    .column("email", DataTypes.STRING())
    .column("created_at", DataTypes.TIMESTAMP(3))
    .column("metadata_timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
        .columnByMetadata("metadata_timestamp", DataTypes.TIMESTAMP(3), "timestamp")
    .watermark("created_at", "created_at - INTERVAL '5' SECOND")
    .primaryKey("user_id")
    .build();

Resolved Schema

class ResolvedSchema {
    List<Column> getColumns();
    List<String> getColumnNames();
    List<DataType> getColumnDataTypes();
    List<Column> getPhysicalColumns();
    List<Column> getComputedColumns();
    List<Column> getMetadataColumns();
    Optional<UniqueConstraint> getPrimaryKey();
    List<UniqueConstraint> getUniqueConstraints();
    List<WatermarkSpec> getWatermarkSpecs();
    Column getColumn(int index);
    Optional<Column> getColumn(String name);
    int getColumnIndex(String name);
}

Column Types

abstract class Column {
    String getName();
    DataType getDataType();
    String getComment();
    boolean isPhysical();
    boolean isComputed();
    boolean isMetadata();
}

class Column.PhysicalColumn extends Column;
class Column.ComputedColumn extends Column {
    ResolvedExpression getExpression();
}
class Column.MetadataColumn extends Column {
    String getMetadataKey();
    boolean isVirtual();
}

Type Inference

Automatic Type Inference

class TypeInference {
    static DataType inferDataType(Object object);
    static DataType inferDataType(Class<?> clazz);
    static DataType inferDataType(TypeInformation<?> typeInfo);
}

Usage:

// Infer from POJO class
DataType userType = TypeInference.inferDataType(User.class);

// Infer from TypeInformation
TypeInformation<Tuple2<String, Integer>> tupleTypeInfo = Types.TUPLE(Types.STRING, Types.INT);
DataType tupleType = TypeInference.inferDataType(tupleTypeInfo);

Custom Type Serialization

class DataTypes {
    static <T> DataType of(Class<T> clazz);
    static <T> DataType of(TypeInformation<T> typeInformation);
    static DataType RAW(Class<?> clazz, TypeSerializer<?> serializer);
}

Catalog System

Catalog Interface

interface Catalog {
    // Database operations
    boolean databaseExists(String databaseName);
    void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);
    void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);
    void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists);
    List<String> listDatabases();
    CatalogDatabase getDatabase(String databaseName);
    
    // Table operations
    boolean tableExists(ObjectPath tablePath);
    void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);
    void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
    void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists);
    List<String> listTables(String databaseName);
    CatalogBaseTable getTable(ObjectPath tablePath);
    
    // Function operations
    boolean functionExists(ObjectPath functionPath);
    void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists);
    void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists);
    void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists);
    List<String> listFunctions(String databaseName);
    CatalogFunction getFunction(ObjectPath functionPath);
}

Catalog Management

interface TableEnvironment {
    void registerCatalog(String catalogName, Catalog catalog);
    Optional<Catalog> getCatalog(String catalogName);
    void useCatalog(String catalogName);
    String getCurrentCatalog();
    void useDatabase(String databaseName);
    String getCurrentDatabase();
    String[] listCatalogs();
    String[] listDatabases();
    String[] listTables();
    String[] listViews();
    String[] listUserDefinedFunctions();
    String[] listFunctions();
    String[] listModules();
}

Usage:

// Register catalogs
HiveCatalog hiveCatalog = new HiveCatalog("myhive", "default", "/path/to/hive-conf");
tEnv.registerCatalog("myhive", hiveCatalog);

JdbcCatalog pgCatalog = new PostgresCatalog("mypg", "testdb", "username", "password", "jdbc:postgresql://localhost:5432/testdb");
tEnv.registerCatalog("mypg", pgCatalog);

// Switch contexts
tEnv.useCatalog("myhive");
tEnv.useDatabase("production");

// List metadata
String[] catalogs = tEnv.listCatalogs();
String[] databases = tEnv.listDatabases();
String[] tables = tEnv.listTables();

Built-in Catalogs

Generic In-Memory Catalog

class GenericInMemoryCatalog implements Catalog {
    GenericInMemoryCatalog(String name);
    GenericInMemoryCatalog(String name, String defaultDatabase);
}

Hive Catalog

class HiveCatalog implements Catalog {
    HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir);
    HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hiveVersion);
    HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf);
}

JDBC Catalogs

class JdbcCatalog implements Catalog {
    JdbcCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);
}

class PostgresCatalog extends JdbcCatalog;
class MySqlCatalog extends JdbcCatalog;

Table Descriptors

Table Descriptor Builder

class TableDescriptor {
    static TableDescriptor forConnector(String connector);
    
    TableDescriptor schema(Schema schema);
    TableDescriptor option(String key, String value);
    TableDescriptor option(ConfigOption<String> option, String value);
    TableDescriptor partitionedBy(String... partitionKeys);
    TableDescriptor comment(String comment);
    TableDescriptor like(String likeTablePath);
    TableDescriptor like(String likeTablePath, LikeOption... likeOptions);
}

Usage:

TableDescriptor descriptor = TableDescriptor
    .forConnector("kafka")
    .schema(Schema.newBuilder()
        .column("user_id", DataTypes.BIGINT())
        .column("message", DataTypes.STRING())
        .column("ts", DataTypes.TIMESTAMP(3))
        .watermark("ts", "ts - INTERVAL '5' SECOND")
        .build())
    .option("topic", "user-events")
    .option("properties.bootstrap.servers", "localhost:9092")
    .option("format", "json")
    .comment("User event stream from Kafka");

tEnv.createTable("user_events", descriptor);

Type Conversion

Java Type Mapping

// Primitive mappings
int -> DataTypes.INT()
long -> DataTypes.BIGINT()  
double -> DataTypes.DOUBLE()
boolean -> DataTypes.BOOLEAN()
String -> DataTypes.STRING()

// Object mappings
Integer -> DataTypes.INT()
Long -> DataTypes.BIGINT()
Double -> DataTypes.DOUBLE()
Boolean -> DataTypes.BOOLEAN()
BigDecimal -> DataTypes.DECIMAL()
LocalDate -> DataTypes.DATE()
LocalTime -> DataTypes.TIME()
LocalDateTime -> DataTypes.TIMESTAMP()
Instant -> DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()

// Collection mappings
T[] -> DataTypes.ARRAY(...)
List<T> -> DataTypes.ARRAY(...)
Map<K,V> -> DataTypes.MAP(...)
Row -> DataTypes.ROW(...)

Schema Evolution

// Handle schema changes
Schema oldSchema = Schema.newBuilder()
    .column("id", DataTypes.INT())
    .column("name", DataTypes.STRING())
    .build();

Schema newSchema = Schema.newBuilder()
    .column("id", DataTypes.BIGINT())        // Type widening
    .column("name", DataTypes.STRING())
    .column("email", DataTypes.STRING())     // New column
    .build();

Configuration

class TableConfigOptions {
    ConfigOption<String> TABLE_LOCAL_TIME_ZONE;
    ConfigOption<Duration> TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
    ConfigOption<Boolean> TABLE_EXEC_LEGACY_TYPE_SYSTEM;
}

Types

abstract class DataType {
    LogicalType getLogicalType();
    Class<?> getConversionClass();
    boolean isNullable();
    DataType nullable();
    DataType notNull();
    DataType bridgedTo(Class<?> newConversionClass);
}

abstract class LogicalType {
    LogicalTypeRoot getTypeRoot();
    String asSummaryString();
    List<LogicalType> getChildren();
    boolean isNullable();
    LogicalType copy(boolean isNullable);
}

class ObjectPath {
    ObjectPath(String databaseName, String objectName);
    String getDatabaseName();
    String getObjectName();
    String getFullName();
}

interface CatalogBaseTable {
    Map<String, String> getOptions();
    TableSchema getSchema();  // Deprecated
    String getComment();
    CatalogBaseTable copy(Map<String, String> options);
    Optional<String> getDescription();
    Optional<String> getDetailedDescription();
}