or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-management.mdcomplex-event-processing.mdcore-table-operations.mddatastream-integration.mdindex.mdsql-processing.mdtype-system.mduser-defined-functions.mdwindow-operations.md
tile.json

core-table-operations.mddocs/

Core Table Operations

This document covers the essential table environment setup, table creation, and basic table operations in Apache Flink Table Uber Blink.

Table Environment Creation

The TableEnvironment is the main entry point for all table operations.

Pure Table Environment

static TableEnvironment create(EnvironmentSettings settings);
static TableEnvironment create(Configuration configuration);

Usage:

// Create with settings builder
EnvironmentSettings settings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
TableEnvironment tEnv = TableEnvironment.create(settings);

// Create with configuration
Configuration config = new Configuration();
config.setString("table.sql-dialect", "default");
TableEnvironment tEnv = TableEnvironment.create(config);

Environment Settings

class EnvironmentSettings {
    static EnvironmentSettings.Builder newInstance();
    
    interface Builder {
        Builder useBlinkPlanner();
        Builder inStreamingMode();
        Builder inBatchMode();
        Builder withBuiltInCatalogName(String catalogName);
        Builder withBuiltInDatabaseName(String databaseName);
        Builder withConfiguration(Configuration configuration);
        EnvironmentSettings build();
    }
}

SQL Execution

Direct SQL Execution

TableResult executeSql(String statement);
Table sqlQuery(String query);

Usage:

// Execute DDL/DML statements
TableResult result = tEnv.executeSql(
    "CREATE TABLE users (id INT, name STRING, age INT) WITH ('connector' = 'filesystem', 'path' = '/data/users', 'format' = 'json')"
);

// Execute queries returning tables
Table userTable = tEnv.sqlQuery("SELECT * FROM users WHERE age > 18");

Table Creation and Management

Creating Tables

void createTable(String path, TableDescriptor descriptor);
void createTemporaryTable(String path, TableDescriptor descriptor);
void createTemporaryView(String path, Table view);
void dropTable(String path);
void dropTemporaryTable(String path);
void dropTemporaryView(String path);
boolean tableExists(String path);

Table Access

Table from(String path);
Table scan(String... tablePath);
Table fromValues(Object... values);
Table fromValues(AbstractDataType<?> rowType, Object... values);
Table fromValues(DataType rowType, Object... values);

Usage:

// Create table with descriptor
TableDescriptor descriptor = TableDescriptor.forConnector("filesystem")
    .schema(Schema.newBuilder()
        .column("id", DataTypes.INT())
        .column("name", DataTypes.STRING())
        .column("age", DataTypes.INT())
        .build())
    .option("path", "/data/users")
    .option("format", "json")
    .build();

tEnv.createTable("users", descriptor);

// Access existing table
Table userTable = tEnv.from("users");

// Create table from values
Table valuesTable = tEnv.fromValues(
    DataTypes.ROW(
        DataTypes.FIELD("id", DataTypes.INT()),
        DataTypes.FIELD("name", DataTypes.STRING())
    ),
    Row.of(1, "Alice"),
    Row.of(2, "Bob")
);

// Create temporary view
tEnv.createTemporaryView("user_view", userTable.where($"age".isGreater(18)));

Basic Table Operations

The Table interface provides fluent API for data transformations.

Selection and Projection

interface Table {
    Table select(Expression... fields);
    Table addColumns(Expression... fields);
    Table addOrReplaceColumns(Expression... fields);
    Table renameColumns(Expression... fields);
    Table dropColumns(Expression... fields);
}

Filtering and Sorting

interface Table {
    Table where(Expression predicate);
    Table filter(Expression predicate);
    Table distinct();
    Table orderBy(Expression... fields);
    Table offset(int offset);
    Table fetch(int fetch);
    Table limit(int fetch);
}

Grouping and Aggregation

interface Table {
    GroupedTable groupBy(Expression... fields);
    WindowGroupedTable window(GroupWindow window);
}

interface GroupedTable {
    Table select(Expression... fields);
    AggregatedTable aggregate(Expression aggregateFunction);
    FlatAggregateTable flatAggregate(Expression tableAggregateFunction);
}

Usage:

Table users = tEnv.from("users");

// Select specific columns
Table result = users.select($("name"), $("age"));

// Filter data
Table adults = users.where($("age").isGreaterOrEqual(18));

// Group and aggregate
Table ageGroups = users
    .groupBy($("age"))
    .select($("age"), $("age").count().as("count"));

Joins

interface Table {
    Table join(Table right);
    Table join(Table right, Expression joinPredicate);
    Table leftOuterJoin(Table right);
    Table leftOuterJoin(Table right, Expression joinPredicate);
    Table rightOuterJoin(Table right);
    Table rightOuterJoin(Table right, Expression joinPredicate);
    Table fullOuterJoin(Table right);
    Table fullOuterJoin(Table right, Expression joinPredicate);
}

Usage:

Table users = tEnv.from("users");
Table orders = tEnv.from("orders");

// Inner join
Table userOrders = users.join(orders, $("users.id").isEqual($("orders.user_id")));

// Left outer join
Table usersWithOrders = users.leftOuterJoin(orders, $("users.id").isEqual($("orders.user_id")));

Set Operations

interface Table {
    Table unionAll(Table right);
    Table union(Table right);
    Table intersect(Table right);
    Table intersectAll(Table right);
    Table minus(Table right);
    Table minusAll(Table right);
}

Table Execution

interface Table {
    TableResult execute();
    void executeInsert(String tablePath);
    TableResult executeInsert(String tablePath, boolean overwrite);
    CloseableIterator<Row> execute().collect();
}

Statement Sets (Multi-Sink Operations)

For executing multiple DML statements as a single job:

interface TableEnvironment {
    StatementSet createStatementSet();
}

interface StatementSet {
    StatementSet addInsertSql(String statement);
    StatementSet addInsert(String targetPath, Table table);
    StatementSet addInsert(String targetPath, Table table, boolean overwrite);
    String explain(ExplainDetail... extraDetails);
    TableResult execute();
}

Usage:

// Create statement set for multi-sink execution
StatementSet statementSet = tEnv.createStatementSet();

// Add multiple insert operations
statementSet.addInsert("sink_table_1", processedData);
statementSet.addInsertSql("INSERT INTO sink_table_2 SELECT * FROM source WHERE condition = true");

// Execute all operations as single job
TableResult result = statementSet.execute();
Table result = users.select($("name"), $("age")).where($("age").isGreater(21));

// Execute and print results
result.execute().print();

// Collect results as iterator
try (CloseableIterator<Row> iterator = result.execute().collect()) {
    while (iterator.hasNext()) {
        Row row = iterator.next();
        System.out.println(row);
    }
}

// Insert into another table
result.executeInsert("young_adults");

Schema Information

interface Table {
    ResolvedSchema getResolvedSchema();
    TableSchema getSchema();  // Deprecated
    String explain();
    String explain(ExplainDetail... extraDetails);
}

Table Configuration

interface TableEnvironment {
    TableConfig getConfig();
    Configuration getConfiguration();
}

class TableConfig {
    Configuration getConfiguration();
    ZoneId getLocalTimeZone();
    void setLocalTimeZone(ZoneId zoneId);
}

Error Handling

Common exceptions when working with table operations:

class TableException extends RuntimeException;
class SqlParserException extends TableException;
class ValidationException extends TableException;
class CodeGenException extends TableException;

Types

interface Table extends Serializable;

class ResolvedSchema {
    List<Column> getColumns();
    List<String> getColumnNames();
    List<DataType> getColumnDataTypes();
    Optional<UniqueConstraint> getPrimaryKey();
    List<UniqueConstraint> getUniqueConstraints();
}

enum ExplainDetail {
    CHANGELOG_MODE,
    ESTIMATED_COST,
    JSON_EXECUTION_PLAN
}

interface CloseableIterator<T> extends Iterator<T>, AutoCloseable;