Apache Flink's Table API and SQL module for unified stream and batch processing
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table@1.20.0Apache Flink's Table API and SQL module provides unified stream and batch processing capabilities through both Table API and SQL interfaces. It offers language-integrated query APIs for Java, Scala, and Python with intuitive composition of queries using relational operators such as selection, filter, and join.
The Table API is built around the core concept of Tables - pipeline descriptions that can be optimized and executed on either bounded or unbounded data streams. This enables both real-time streaming analytics and traditional batch processing with the same unified API.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.20.2</version>
</dependency>import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;For DataStream integration:
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import static org.apache.flink.table.api.Expressions.*;
// Create table environment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Execute SQL queries
tableEnv.executeSql("CREATE TABLE source_table (id INT, name STRING) WITH (...)");
Table result = tableEnv.sqlQuery("SELECT id, UPPER(name) as name FROM source_table WHERE id > 10");
// Table API operations
Table filtered = result
.select($("id"), $("name"))
.where($("id").isGreater(5));
// Execute and collect results
filtered.execute().print();Apache Flink Table API is built around several key architectural components:
Core entry point and central context for creating Table and SQL API programs. Handles catalog management, SQL execution, and table operations.
interface TableEnvironment {
// Factory methods
static TableEnvironment create(EnvironmentSettings settings);
static TableEnvironment create(Configuration configuration);
// SQL execution
Table sqlQuery(String query);
TableResult executeSql(String statement);
// Table creation and access
Table from(String path);
Table from(TableDescriptor descriptor);
void createTemporaryTable(String path, TableDescriptor descriptor);
void createTable(String path, TableDescriptor descriptor);
// Catalog and database management
void useCatalog(String catalogName);
void useDatabase(String databaseName);
String[] listTables();
String[] listCatalogs();
String[] listDatabases();
}Core table abstraction providing fluent API for data transformations, joins, aggregations, and window operations.
interface Table extends Explainable<Table>, Executable {
// Schema access
ResolvedSchema getResolvedSchema();
// Basic transformations
Table select(Expression... fields);
Table filter(Expression predicate);
Table where(Expression predicate);
Table as(Expression... fields);
Table as(String field, String... fields);
Table distinct();
// Column operations
Table addColumns(Expression... fields);
Table addOrReplaceColumns(Expression... fields);
Table renameColumns(Expression... fields);
Table dropColumns(Expression... fields);
// Aggregations
GroupedTable groupBy(Expression... fields);
AggregatedTable aggregate(Expression aggregateFunction);
AggregatedTable flatAggregate(Expression tableAggregateFunction);
// Joins
Table join(Table right);
Table join(Table right, Expression joinPredicate);
Table leftOuterJoin(Table right);
Table leftOuterJoin(Table right, Expression joinPredicate);
Table rightOuterJoin(Table right);
Table rightOuterJoin(Table right, Expression joinPredicate);
Table fullOuterJoin(Table right);
Table fullOuterJoin(Table right, Expression joinPredicate);
// Lateral joins
Table joinLateral(Expression tableFunctionCall);
Table joinLateral(Expression tableFunctionCall, Expression joinPredicate);
Table leftOuterJoinLateral(Expression tableFunctionCall);
Table leftOuterJoinLateral(Expression tableFunctionCall, Expression joinPredicate);
// Set operations
Table union(Table right);
Table unionAll(Table right);
Table intersect(Table right);
Table intersectAll(Table right);
Table minus(Table right);
Table minusAll(Table right);
// Function operations
Table map(Expression mapFunction);
Table flatMap(Expression tableFunction);
// Ordering and limiting
Table orderBy(Expression... fields);
Table offset(int offset);
Table fetch(int fetch);
Table limit(int fetch);
Table limit(int offset, int fetch);
// Window operations
GroupWindowedTable window(GroupWindow groupWindow);
OverWindowedTable window(OverWindow... overWindows);
// Temporal operations
TemporalTableFunction createTemporalTableFunction(Expression timeAttribute, Expression primaryKey);
// Insert operations
TablePipeline insertInto(String tablePath);
TablePipeline insertInto(String tablePath, boolean overwrite);
TablePipeline insertInto(TableDescriptor descriptor);
TablePipeline insertInto(TableDescriptor descriptor, boolean overwrite);
TableResult executeInsert(String tablePath);
TableResult executeInsert(String tablePath, boolean overwrite);
TableResult executeInsert(TableDescriptor descriptor);
TableResult executeInsert(TableDescriptor descriptor, boolean overwrite);
// Execution
TableResult execute();
String explain();
}Integration layer between Table API and Flink's DataStream API, enabling conversion between tables and data streams for complex pipelines.
interface StreamTableEnvironment extends TableEnvironment {
// Factory methods
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
// DataStream conversion
<T> Table fromDataStream(DataStream<T> dataStream);
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
DataStream<Row> toDataStream(Table table);
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
// Changelog conversion
Table fromChangelogStream(DataStream<Row> dataStream);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
DataStream<Row> toChangelogStream(Table table);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
// Temporary view creation
<T> void createTemporaryView(String path, DataStream<T> dataStream);
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
// Statement set creation
StreamStatementSet createStatementSet();
}Comprehensive type system supporting primitive types, complex nested structures, and user-defined types with full serialization support.
class DataTypes {
// Primitive types
static DataType BOOLEAN();
static DataType TINYINT();
static DataType SMALLINT();
static DataType INT();
static DataType BIGINT();
static DataType FLOAT();
static DataType DOUBLE();
static DataType DECIMAL(int precision, int scale);
// String and binary types
static DataType CHAR(int length);
static DataType VARCHAR(int length);
static DataType STRING();
static DataType BINARY(int length);
static DataType VARBINARY(int length);
static DataType BYTES();
// Temporal types
static DataType DATE();
static DataType TIME();
static DataType TIMESTAMP();
static DataType TIMESTAMP_WITH_TIME_ZONE();
static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE();
// Complex types
static DataType ARRAY(DataType elementType);
static DataType MAP(DataType keyType, DataType valueType);
static DataType ROW(Field... fields);
}
class Schema {
static Builder newBuilder();
Column[] getColumns();
List<UniqueConstraint> getPrimaryKey();
List<WatermarkSpec> getWatermarkSpecs();
}Framework for creating custom scalar, table, and aggregate functions with support for multiple programming languages.
abstract class UserDefinedFunction implements FunctionDefinition {
// Context and configuration access
FunctionContext getFunctionContext();
}
abstract class ScalarFunction extends UserDefinedFunction {
// Implementation provided by user
// public ReturnType eval(InputType1 input1, InputType2 input2, ...);
}
abstract class TableFunction<T> extends UserDefinedFunction {
// Emit results using collect()
// public void eval(InputType1 input1, InputType2 input2, ...);
protected void collect(T result);
}
abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
public abstract ACC createAccumulator();
public abstract T getValue(ACC accumulator);
// public void accumulate(ACC accumulator, InputType1 input1, InputType2 input2, ...);
}Direct SQL query execution with support for DDL, DML, and query operations, including statement batching and result handling.
interface TableEnvironment {
// SQL execution
Table sqlQuery(String query);
TableResult executeSql(String statement);
StatementSet createStatementSet();
// Function registration
void createTemporarySystemFunction(String name, UserDefinedFunction function);
void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);
}
interface TableResult {
ResultKind getResultKind();
ResolvedSchema getResolvedSchema();
CloseableIterator<Row> collect();
void print();
}
interface StatementSet {
StatementSet addInsertSql(String statement);
StatementSet addInsert(String targetPath, Table table);
TableResult execute();
}Pluggable metadata management system for tables, functions, databases, and user-defined catalogs with persistent storage support.
interface Catalog {
// Database operations
void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);
void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);
List<String> listDatabases();
CatalogDatabase getDatabase(String databaseName);
// Table operations
void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists);
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
List<String> listTables(String databaseName);
CatalogTable getTable(ObjectPath tablePath);
// Function operations
void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists);
void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists);
List<String> listFunctions(String databaseName);
}
class ObjectIdentifier {
static ObjectIdentifier of(String catalogName, String databaseName, String objectName);
String getCatalogName();
String getDatabaseName();
String getObjectName();
}abstract class DataType {
LogicalType getLogicalType();
Class<?> getConversionClass();
DataType notNull();
DataType nullable();
DataType bridgedTo(Class<?> newConversionClass);
}
// Primitive wrapper types
class AtomicDataType extends DataType { }
class CollectionDataType extends DataType { }
class FieldsDataType extends DataType { }
class KeyValueDataType extends DataType { }class EnvironmentSettings {
static EnvironmentSettings.Builder newInstance();
interface Builder {
Builder useBlinkPlanner();
Builder useAnyPlanner();
Builder inStreamingMode();
Builder inBatchMode();
Builder withConfiguration(Configuration configuration);
EnvironmentSettings build();
}
}
class TableConfig {
Configuration getConfiguration();
String getSqlDialect();
ZoneId getLocalTimeZone();
}enum ResultKind {
SUCCESS,
SUCCESS_WITH_CONTENT
}
enum SqlDialect {
DEFAULT,
HIVE
}
enum ExplainFormat {
TEXT,
JSON
}
class Row {
Object getField(int pos);
Object getField(String name);
int getArity();
RowKind getKind();
}class TableException extends RuntimeException { }
class TableRuntimeException extends RuntimeException { }
class ValidationException extends TableException { }
class TableNotExistException extends TableException { }
class AmbiguousTableFactoryException extends TableException { }