or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-system.mddatastream-integration.mdindex.mdsql-execution.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.md
tile.json

tessl/maven-org-apache-flink--flink-table

Apache Flink's Table API and SQL module for unified stream and batch processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-table@1.20.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-table@1.20.0

index.mddocs/

Apache Flink Table API

Apache Flink's Table API and SQL module provides unified stream and batch processing capabilities through both Table API and SQL interfaces. It offers language-integrated query APIs for Java, Scala, and Python with intuitive composition of queries using relational operators such as selection, filter, and join.

The Table API is built around the core concept of Tables - pipeline descriptions that can be optimized and executed on either bounded or unbounded data streams. This enables both real-time streaming analytics and traditional batch processing with the same unified API.

Package Information

  • Package Name: org.apache.flink:flink-table
  • Package Type: maven
  • Language: Java (with Scala support)
  • Version: 1.20.2
  • Installation: Add to Maven dependencies:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java</artifactId>
      <version>1.20.2</version>
    </dependency>

Core Imports

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;

For DataStream integration:

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

Basic Usage

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import static org.apache.flink.table.api.Expressions.*;

// Create table environment
EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    .build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// Execute SQL queries
tableEnv.executeSql("CREATE TABLE source_table (id INT, name STRING) WITH (...)");
Table result = tableEnv.sqlQuery("SELECT id, UPPER(name) as name FROM source_table WHERE id > 10");

// Table API operations
Table filtered = result
    .select($("id"), $("name"))
    .where($("id").isGreater(5));

// Execute and collect results
filtered.execute().print();

Architecture

Apache Flink Table API is built around several key architectural components:

  • Table Environment: Central context for creating Table and SQL API programs, managing catalogs and configuration
  • Table Abstraction: Core abstraction representing data transformation pipelines (not actual data)
  • Type System: Comprehensive data type system supporting primitives, complex types, and user-defined types
  • Expression API: Type-safe expression system for table transformations and computations
  • Catalog System: Pluggable metadata management for tables, functions, and databases
  • Connector Framework: Extensible source/sink architecture for data integration
  • Function Framework: Support for scalar, table, and aggregate user-defined functions
  • Query Planning: Advanced query optimization and execution planning with Calcite integration

Capabilities

Table Environment

Core entry point and central context for creating Table and SQL API programs. Handles catalog management, SQL execution, and table operations.

interface TableEnvironment {
    // Factory methods
    static TableEnvironment create(EnvironmentSettings settings);
    static TableEnvironment create(Configuration configuration);
    
    // SQL execution
    Table sqlQuery(String query);
    TableResult executeSql(String statement);
    
    // Table creation and access
    Table from(String path);
    Table from(TableDescriptor descriptor);
    void createTemporaryTable(String path, TableDescriptor descriptor);
    void createTable(String path, TableDescriptor descriptor);
    
    // Catalog and database management
    void useCatalog(String catalogName);
    void useDatabase(String databaseName);
    String[] listTables();
    String[] listCatalogs();
    String[] listDatabases();
}

Table Environment

Table Operations

Core table abstraction providing fluent API for data transformations, joins, aggregations, and window operations.

interface Table extends Explainable<Table>, Executable {
    // Schema access
    ResolvedSchema getResolvedSchema();
    
    // Basic transformations
    Table select(Expression... fields);
    Table filter(Expression predicate);
    Table where(Expression predicate);
    Table as(Expression... fields);
    Table as(String field, String... fields);
    Table distinct();
    
    // Column operations
    Table addColumns(Expression... fields);
    Table addOrReplaceColumns(Expression... fields);
    Table renameColumns(Expression... fields);
    Table dropColumns(Expression... fields);
    
    // Aggregations
    GroupedTable groupBy(Expression... fields);
    AggregatedTable aggregate(Expression aggregateFunction);
    AggregatedTable flatAggregate(Expression tableAggregateFunction);
    
    // Joins
    Table join(Table right);
    Table join(Table right, Expression joinPredicate);
    Table leftOuterJoin(Table right);
    Table leftOuterJoin(Table right, Expression joinPredicate);
    Table rightOuterJoin(Table right);
    Table rightOuterJoin(Table right, Expression joinPredicate);
    Table fullOuterJoin(Table right);
    Table fullOuterJoin(Table right, Expression joinPredicate);
    
    // Lateral joins
    Table joinLateral(Expression tableFunctionCall);
    Table joinLateral(Expression tableFunctionCall, Expression joinPredicate);
    Table leftOuterJoinLateral(Expression tableFunctionCall);
    Table leftOuterJoinLateral(Expression tableFunctionCall, Expression joinPredicate);
    
    // Set operations
    Table union(Table right);
    Table unionAll(Table right);
    Table intersect(Table right);
    Table intersectAll(Table right);
    Table minus(Table right);
    Table minusAll(Table right);
    
    // Function operations
    Table map(Expression mapFunction);
    Table flatMap(Expression tableFunction);
    
    // Ordering and limiting
    Table orderBy(Expression... fields);
    Table offset(int offset);
    Table fetch(int fetch);
    Table limit(int fetch);
    Table limit(int offset, int fetch);
    
    // Window operations
    GroupWindowedTable window(GroupWindow groupWindow);
    OverWindowedTable window(OverWindow... overWindows);
    
    // Temporal operations
    TemporalTableFunction createTemporalTableFunction(Expression timeAttribute, Expression primaryKey);
    
    // Insert operations
    TablePipeline insertInto(String tablePath);
    TablePipeline insertInto(String tablePath, boolean overwrite);
    TablePipeline insertInto(TableDescriptor descriptor);
    TablePipeline insertInto(TableDescriptor descriptor, boolean overwrite);
    TableResult executeInsert(String tablePath);
    TableResult executeInsert(String tablePath, boolean overwrite);
    TableResult executeInsert(TableDescriptor descriptor);
    TableResult executeInsert(TableDescriptor descriptor, boolean overwrite);
    
    // Execution
    TableResult execute();
    String explain();
}

Table Operations

DataStream Integration

Integration layer between Table API and Flink's DataStream API, enabling conversion between tables and data streams for complex pipelines.

interface StreamTableEnvironment extends TableEnvironment {
    // Factory methods
    static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
    static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
    
    // DataStream conversion
    <T> Table fromDataStream(DataStream<T> dataStream);
    <T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
    DataStream<Row> toDataStream(Table table);
    <T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
    <T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
    
    // Changelog conversion
    Table fromChangelogStream(DataStream<Row> dataStream);
    Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
    Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
    DataStream<Row> toChangelogStream(Table table);
    DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
    DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
    
    // Temporary view creation
    <T> void createTemporaryView(String path, DataStream<T> dataStream);
    <T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
    
    // Statement set creation
    StreamStatementSet createStatementSet();
}

DataStream Integration

Type System

Comprehensive type system supporting primitive types, complex nested structures, and user-defined types with full serialization support.

class DataTypes {
    // Primitive types
    static DataType BOOLEAN();
    static DataType TINYINT();
    static DataType SMALLINT();
    static DataType INT();
    static DataType BIGINT();
    static DataType FLOAT();
    static DataType DOUBLE();
    static DataType DECIMAL(int precision, int scale);
    
    // String and binary types
    static DataType CHAR(int length);
    static DataType VARCHAR(int length);
    static DataType STRING();
    static DataType BINARY(int length);
    static DataType VARBINARY(int length);
    static DataType BYTES();
    
    // Temporal types
    static DataType DATE();
    static DataType TIME();
    static DataType TIMESTAMP();
    static DataType TIMESTAMP_WITH_TIME_ZONE();
    static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE();
    
    // Complex types
    static DataType ARRAY(DataType elementType);
    static DataType MAP(DataType keyType, DataType valueType);
    static DataType ROW(Field... fields);
}

class Schema {
    static Builder newBuilder();
    Column[] getColumns();
    List<UniqueConstraint> getPrimaryKey();
    List<WatermarkSpec> getWatermarkSpecs();
}

Type System

User-Defined Functions

Framework for creating custom scalar, table, and aggregate functions with support for multiple programming languages.

abstract class UserDefinedFunction implements FunctionDefinition {
    // Context and configuration access
    FunctionContext getFunctionContext();
}

abstract class ScalarFunction extends UserDefinedFunction {
    // Implementation provided by user
    // public ReturnType eval(InputType1 input1, InputType2 input2, ...);
}

abstract class TableFunction<T> extends UserDefinedFunction {
    // Emit results using collect()
    // public void eval(InputType1 input1, InputType2 input2, ...);
    protected void collect(T result);
}

abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
    public abstract ACC createAccumulator();
    public abstract T getValue(ACC accumulator);
    // public void accumulate(ACC accumulator, InputType1 input1, InputType2 input2, ...);
}

User-Defined Functions

SQL Execution

Direct SQL query execution with support for DDL, DML, and query operations, including statement batching and result handling.

interface TableEnvironment {
    // SQL execution
    Table sqlQuery(String query);
    TableResult executeSql(String statement);
    StatementSet createStatementSet();
    
    // Function registration
    void createTemporarySystemFunction(String name, UserDefinedFunction function);
    void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);
}

interface TableResult {
    ResultKind getResultKind();
    ResolvedSchema getResolvedSchema();
    CloseableIterator<Row> collect();
    void print();
}

interface StatementSet {
    StatementSet addInsertSql(String statement);
    StatementSet addInsert(String targetPath, Table table);
    TableResult execute();
}

SQL Execution

Catalog System

Pluggable metadata management system for tables, functions, databases, and user-defined catalogs with persistent storage support.

interface Catalog {
    // Database operations
    void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);
    void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);
    List<String> listDatabases();
    CatalogDatabase getDatabase(String databaseName);
    
    // Table operations
    void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists);
    void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
    List<String> listTables(String databaseName);
    CatalogTable getTable(ObjectPath tablePath);
    
    // Function operations
    void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists);
    void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists);
    List<String> listFunctions(String databaseName);
}

class ObjectIdentifier {
    static ObjectIdentifier of(String catalogName, String databaseName, String objectName);
    String getCatalogName();
    String getDatabaseName();
    String getObjectName();
}

Catalog System

Types

Core Data Types

abstract class DataType {
    LogicalType getLogicalType();
    Class<?> getConversionClass();
    DataType notNull();
    DataType nullable();
    DataType bridgedTo(Class<?> newConversionClass);
}

// Primitive wrapper types
class AtomicDataType extends DataType { }
class CollectionDataType extends DataType { }
class FieldsDataType extends DataType { }
class KeyValueDataType extends DataType { }

Configuration and Settings

class EnvironmentSettings {
    static EnvironmentSettings.Builder newInstance();
    
    interface Builder {
        Builder useBlinkPlanner();
        Builder useAnyPlanner();
        Builder inStreamingMode();
        Builder inBatchMode();
        Builder withConfiguration(Configuration configuration);
        EnvironmentSettings build();
    }
}

class TableConfig {
    Configuration getConfiguration();
    String getSqlDialect();
    ZoneId getLocalTimeZone();
}

Result and Execution Types

enum ResultKind {
    SUCCESS,
    SUCCESS_WITH_CONTENT
}

enum SqlDialect {
    DEFAULT,
    HIVE
}

enum ExplainFormat {
    TEXT,
    JSON
}

class Row {
    Object getField(int pos);
    Object getField(String name);
    int getArity();
    RowKind getKind();
}

Exception Types

class TableException extends RuntimeException { }
class TableRuntimeException extends RuntimeException { }
class ValidationException extends TableException { }
class TableNotExistException extends TableException { }
class AmbiguousTableFactoryException extends TableException { }