Apache Flink's Table API and SQL module for unified stream and batch processing
—
Apache Flink's Table API provides a comprehensive type system supporting primitive types, complex nested structures, temporal types, and user-defined types. The type system bridges logical type definitions with physical data representations and provides full serialization support.
Basic numeric, boolean, and binary data types for fundamental data representation.
class DataTypes {
/**
* Boolean data type
* @return DataType for boolean values
*/
static DataType BOOLEAN();
/**
* 8-bit signed integer data type
* @return DataType for tinyint values (-128 to 127)
*/
static DataType TINYINT();
/**
* 16-bit signed integer data type
* @return DataType for smallint values (-32,768 to 32,767)
*/
static DataType SMALLINT();
/**
* 32-bit signed integer data type
* @return DataType for int values (-2,147,483,648 to 2,147,483,647)
*/
static DataType INT();
/**
* 64-bit signed integer data type
* @return DataType for bigint values
*/
static DataType BIGINT();
/**
* 32-bit IEEE 754 floating point data type
* @return DataType for float values
*/
static DataType FLOAT();
/**
* 64-bit IEEE 754 floating point data type
* @return DataType for double values
*/
static DataType DOUBLE();
/**
* Fixed precision and scale decimal data type
* @param precision Total number of digits
* @param scale Number of digits after decimal point
* @return DataType for decimal values
*/
static DataType DECIMAL(int precision, int scale);
}Usage Examples:
// Primitive types in schema definition
Schema schema = Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("active", DataTypes.BOOLEAN())
.column("score", DataTypes.FLOAT())
.column("balance", DataTypes.DECIMAL(10, 2))
.build();
// Using in SQL DDL
tableEnv.executeSql(
"CREATE TABLE accounts (" +
" account_id BIGINT," +
" is_active BOOLEAN," +
" credit_score FLOAT," +
" balance DECIMAL(10, 2)" +
") WITH (...)"
);Text and binary data types with length specifications.
class DataTypes {
/**
* Fixed-length character string data type
* @param length Fixed length of the string
* @return DataType for char values
*/
static DataType CHAR(int length);
/**
* Variable-length character string data type
* @param length Maximum length of the string
* @return DataType for varchar values
*/
static DataType VARCHAR(int length);
/**
* Variable-length character string with maximum length
* @return DataType for string values
*/
static DataType STRING();
/**
* Fixed-length binary data type
* @param length Fixed length of the binary data
* @return DataType for binary values
*/
static DataType BINARY(int length);
/**
* Variable-length binary data type
* @param length Maximum length of the binary data
* @return DataType for varbinary values
*/
static DataType VARBINARY(int length);
/**
* Variable-length binary data with maximum length
* @return DataType for bytes values
*/
static DataType BYTES();
}Usage Examples:
// String types in table definition
Schema userSchema = Schema.newBuilder()
.column("username", DataTypes.VARCHAR(50))
.column("first_name", DataTypes.STRING())
.column("country_code", DataTypes.CHAR(2))
.column("profile_image", DataTypes.BYTES())
.build();Date, time, timestamp, and interval types for temporal data processing.
class DataTypes {
/**
* Date data type (year-month-day)
* @return DataType for date values
*/
static DataType DATE();
/**
* Time data type (hour:minute:second[.fractional])
* @return DataType for time values
*/
static DataType TIME();
/**
* Time data type with precision
* @param precision Precision of fractional seconds (0-9)
* @return DataType for time values with specified precision
*/
static DataType TIME(int precision);
/**
* Timestamp data type (date and time without timezone)
* @return DataType for timestamp values
*/
static DataType TIMESTAMP();
/**
* Timestamp data type with precision
* @param precision Precision of fractional seconds (0-9)
* @return DataType for timestamp values with specified precision
*/
static DataType TIMESTAMP(int precision);
/**
* Timestamp with time zone data type
* @return DataType for timestamp values with timezone
*/
static DataType TIMESTAMP_WITH_TIME_ZONE();
/**
* Timestamp with time zone and precision
* @param precision Precision of fractional seconds (0-9)
* @return DataType for timestamp values with timezone and precision
*/
static DataType TIMESTAMP_WITH_TIME_ZONE(int precision);
/**
* Timestamp with local time zone data type
* @return DataType for local timestamp values
*/
static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE();
/**
* Timestamp with local time zone and precision
* @param precision Precision of fractional seconds (0-9)
* @return DataType for local timestamp values with precision
*/
static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE(int precision);
/**
* Day-time interval data type
* @param resolution Day-time resolution (DAY, HOUR, MINUTE, SECOND)
* @return DataType for day-time interval values
*/
static DataType INTERVAL(DataTypes.DayTimeResolution resolution);
/**
* Year-month interval data type
* @param resolution Year-month resolution (YEAR, MONTH)
* @return DataType for year-month interval values
*/
static DataType INTERVAL(DataTypes.YearMonthResolution resolution);
}Usage Examples:
// Temporal types in event table
Schema eventSchema = Schema.newBuilder()
.column("event_id", DataTypes.BIGINT())
.column("event_date", DataTypes.DATE())
.column("event_time", DataTypes.TIME(3))
.column("created_at", DataTypes.TIMESTAMP(3))
.column("created_at_utc", DataTypes.TIMESTAMP_WITH_TIME_ZONE(3))
.column("session_duration", DataTypes.INTERVAL(DataTypes.SECOND()))
.watermark("created_at", $("created_at").minus(lit(5).seconds()))
.build();Nested structures including arrays, maps, and row types for complex data modeling.
class DataTypes {
/**
* Array data type
* @param elementType Data type of array elements
* @return DataType for array values
*/
static DataType ARRAY(DataType elementType);
/**
* Map data type
* @param keyType Data type of map keys
* @param valueType Data type of map values
* @return DataType for map values
*/
static DataType MAP(DataType keyType, DataType valueType);
/**
* Row data type (structured type)
* @param fields Fields defining the row structure
* @return DataType for row values
*/
static DataType ROW(Field... fields);
/**
* Creates a field for row data type
* @param name Field name
* @param dataType Field data type
* @return Field definition
*/
static Field FIELD(String name, DataType dataType);
/**
* Multiset data type (bag of elements)
* @param elementType Data type of multiset elements
* @return DataType for multiset values
*/
static DataType MULTISET(DataType elementType);
}Usage Examples:
// Complex nested structure
DataType addressType = DataTypes.ROW(
DataTypes.FIELD("street", DataTypes.STRING()),
DataTypes.FIELD("city", DataTypes.STRING()),
DataTypes.FIELD("zipcode", DataTypes.STRING()),
DataTypes.FIELD("country", DataTypes.STRING())
);
Schema customerSchema = Schema.newBuilder()
.column("customer_id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("addresses", DataTypes.ARRAY(addressType))
.column("preferences", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
.column("order_history", DataTypes.ARRAY(DataTypes.BIGINT()))
.build();
// Accessing nested fields in queries
Table customers = tableEnv.from("customers");
Table result = customers.select(
$("customer_id"),
$("name"),
$("addresses").at(1).get("city").as("primary_city"),
$("preferences").get("language").as("preferred_language")
);Comprehensive schema definition with columns, constraints, and watermarks.
class Schema {
/**
* Creates a new schema builder
* @return Builder for constructing schemas
*/
static Builder newBuilder();
interface Builder {
/**
* Adds a physical column to the schema
* @param columnName Name of the column
* @param dataType Data type of the column
* @return Builder for method chaining
*/
Builder column(String columnName, AbstractDataType<?> dataType);
/**
* Adds a computed column defined by an expression
* @param columnName Name of the computed column
* @param expression SQL expression defining the column
* @return Builder for method chaining
*/
Builder columnByExpression(String columnName, String expression);
/**
* Adds a metadata column
* @param columnName Name of the metadata column
* @param dataType Data type of the metadata
* @return Builder for method chaining
*/
Builder columnByMetadata(String columnName, AbstractDataType<?> dataType);
/**
* Adds a metadata column with explicit metadata key
* @param columnName Name of the metadata column
* @param dataType Data type of the metadata
* @param metadataKey Key for accessing the metadata
* @return Builder for method chaining
*/
Builder columnByMetadata(String columnName, AbstractDataType<?> dataType, String metadataKey);
/**
* Adds a watermark specification for event time
* @param columnName Name of the time column
* @param watermarkExpression Expression defining the watermark
* @return Builder for method chaining
*/
Builder watermark(String columnName, Expression watermarkExpression);
/**
* Defines primary key constraints
* @param columnNames Names of columns forming the primary key
* @return Builder for method chaining
*/
Builder primaryKey(String... columnNames);
/**
* Builds the schema
* @return Constructed Schema
*/
Schema build();
}
}Usage Examples:
// Complete schema with all features
Schema orderSchema = Schema.newBuilder()
// Physical columns
.column("order_id", DataTypes.BIGINT())
.column("customer_id", DataTypes.BIGINT())
.column("product_id", DataTypes.BIGINT())
.column("quantity", DataTypes.INT())
.column("unit_price", DataTypes.DECIMAL(10, 2))
.column("order_time", DataTypes.TIMESTAMP(3))
// Computed columns
.columnByExpression("total_amount", "quantity * unit_price")
.columnByExpression("order_year", "EXTRACT(YEAR FROM order_time)")
// Metadata columns (for Kafka integration)
.columnByMetadata("kafka_partition", DataTypes.INT(), "partition")
.columnByMetadata("kafka_offset", DataTypes.BIGINT(), "offset")
.columnByMetadata("kafka_timestamp", DataTypes.TIMESTAMP(3), "timestamp")
// Watermark for event time processing
.watermark("order_time", $("order_time").minus(lit(30).seconds()))
// Primary key constraint
.primaryKey("order_id")
.build();Methods for type conversion between logical and physical representations.
abstract class DataType {
/**
* Gets the logical type of this data type
* @return LogicalType representing the logical structure
*/
LogicalType getLogicalType();
/**
* Gets the conversion class for physical representation
* @return Class used for Java object conversion
*/
Class<?> getConversionClass();
/**
* Creates a non-nullable version of this data type
* @return DataType that doesn't accept null values
*/
DataType notNull();
/**
* Creates a nullable version of this data type
* @return DataType that accepts null values
*/
DataType nullable();
/**
* Bridges this data type to a different conversion class
* @param newConversionClass New class for physical representation
* @return DataType with different conversion class
*/
DataType bridgedTo(Class<?> newConversionClass);
}Usage Examples:
// Custom type bridging
DataType customDecimalType = DataTypes.DECIMAL(20, 4)
.bridgedTo(java.math.BigDecimal.class)
.notNull();
// Type inspection
DataType stringType = DataTypes.STRING();
LogicalType logicalType = stringType.getLogicalType();
Class<?> conversionClass = stringType.getConversionClass(); // String.class
boolean isNullable = logicalType.isNullable();Special types for JSON data and raw binary serialized objects.
class DataTypes {
/**
* JSON data type for structured JSON documents
* @return DataType for JSON values
*/
static DataType JSON();
/**
* Raw data type for arbitrary serialized objects
* @param originatingClass Class of the original object
* @param serializer TypeSerializer for the object
* @return DataType for raw serialized values
*/
static <T> DataType RAW(Class<T> originatingClass, TypeSerializer<T> serializer);
/**
* Raw data type with type information
* @param typeInformation TypeInformation for the object
* @return DataType for raw serialized values
*/
static <T> DataType RAW(TypeInformation<T> typeInformation);
}Usage Examples:
// JSON column for flexible document storage
Schema documentSchema = Schema.newBuilder()
.column("doc_id", DataTypes.BIGINT())
.column("metadata", DataTypes.JSON())
.column("content", DataTypes.JSON())
.build();
// Raw type for custom objects
public class CustomEvent {
public String eventType;
public long timestamp;
public Map<String, Object> payload;
}
DataType customEventType = DataTypes.RAW(
TypeInformation.of(CustomEvent.class)
);abstract class DataType {
LogicalType getLogicalType();
Class<?> getConversionClass();
DataType notNull();
DataType nullable();
DataType bridgedTo(Class<?> newConversionClass);
}
class AtomicDataType extends DataType {
// For primitive and atomic types
}
class CollectionDataType extends DataType {
DataType getElementDataType();
// For arrays and multisets
}
class FieldsDataType extends DataType {
List<DataType> getFieldDataTypes();
// For row/struct types
}
class KeyValueDataType extends DataType {
DataType getKeyDataType();
DataType getValueDataType();
// For map types
}abstract class LogicalType {
boolean isNullable();
LogicalTypeRoot getTypeRoot();
String asSummaryString();
// Type checking methods
boolean is(LogicalTypeRoot... typeRoots);
boolean isAnyOf(LogicalTypeFamily... families);
}
enum LogicalTypeRoot {
CHAR, VARCHAR, BOOLEAN, BINARY, VARBINARY,
DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT,
FLOAT, DOUBLE, DATE, TIME_WITHOUT_TIME_ZONE,
TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_TIME_ZONE,
TIMESTAMP_WITH_LOCAL_TIME_ZONE, INTERVAL_YEAR_MONTH,
INTERVAL_DAY_TIME, ARRAY, MULTISET, MAP, ROW,
DISTINCT_TYPE, STRUCTURED_TYPE, NULL, RAW, SYMBOL,
UNRESOLVED
}class ResolvedSchema {
List<Column> getColumns();
Optional<Column> getColumn(String name);
List<String> getColumnNames();
List<DataType> getColumnDataTypes();
Optional<UniqueConstraint> getPrimaryKey();
List<WatermarkSpec> getWatermarkSpecs();
}
abstract class Column {
String getName();
DataType getDataType();
String getComment();
boolean isPhysical();
boolean isComputed();
boolean isMetadata();
}
class WatermarkSpec {
String getRowtimeAttribute();
ResolvedExpression getWatermarkExpression();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table