CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table

Apache Flink's Table API and SQL module for unified stream and batch processing

Pending
Overview
Eval results
Files

table-environment.mddocs/

Table Environment

TableEnvironment is the entry point and central context for creating Table and SQL API programs. It provides a unified interface for both streaming and batch processing, managing catalogs, databases, functions, and configuration.

Capabilities

Environment Creation

Creates table environments with specific settings for streaming or batch processing.

/**
 * Creates a table environment that is the entry point for Table and SQL API programs
 * @param settings The environment settings used to instantiate the TableEnvironment
 * @return TableEnvironment instance
 */
static TableEnvironment create(EnvironmentSettings settings);

/**
 * Creates a table environment with configuration
 * @param configuration The configuration for the table environment
 * @return TableEnvironment instance
 */
static TableEnvironment create(Configuration configuration);

Usage Examples:

// Streaming environment
EnvironmentSettings streamingSettings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    .build();
TableEnvironment streamingEnv = TableEnvironment.create(streamingSettings);

// Batch environment
EnvironmentSettings batchSettings = EnvironmentSettings
    .newInstance()
    .inBatchMode()
    .build();
TableEnvironment batchEnv = TableEnvironment.create(batchSettings);

// With custom configuration
Configuration config = new Configuration();
config.setString("table.exec.mini-batch.enabled", "true");
TableEnvironment configEnv = TableEnvironment.create(config);

SQL Query Execution

Executes SQL queries and statements with support for both queries and DDL/DML operations.

/**
 * Evaluates a SQL query on registered tables and returns the result as a Table
 * @param query SQL query string
 * @return Table representing the query result
 */
Table sqlQuery(String query);

/**
 * Executes a single SQL statement and returns the execution result
 * @param statement SQL statement (DDL, DML, or query)
 * @return TableResult containing execution information and data
 */
TableResult executeSql(String statement);

Usage Examples:

// Query execution
Table result = tableEnv.sqlQuery(
    "SELECT customer_id, SUM(amount) as total " +
    "FROM orders " +
    "WHERE order_date >= '2024-01-01' " +
    "GROUP BY customer_id"
);

// DDL execution
tableEnv.executeSql(
    "CREATE TABLE orders (" +
    "  order_id BIGINT," +
    "  customer_id BIGINT," +
    "  amount DECIMAL(10,2)," +
    "  order_date DATE" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'orders'" +
    ")"
);

// DML execution
TableResult insertResult = tableEnv.executeSql(
    "INSERT INTO target_table SELECT * FROM source_table WHERE active = true"
);

Table Registration and Access

Manages table creation, registration, and access within the table environment.

/**
 * Creates a Table from a registered table path
 * @param path Table path in catalog.database.table format
 * @return Table instance for the specified path
 */
Table from(String path);

/**
 * Creates a Table from a table descriptor
 * @param descriptor TableDescriptor defining the table structure and properties
 * @return Table instance based on the descriptor
 */
Table from(TableDescriptor descriptor);

/**
 * Creates a temporary table in the current catalog and database
 * @param path Table path (can be simple name or catalog.database.table)
 * @param descriptor TableDescriptor defining the table
 */
void createTemporaryTable(String path, TableDescriptor descriptor);

/**
 * Creates a persistent table in the catalog
 * @param path Table path in catalog.database.table format
 * @param descriptor TableDescriptor defining the table
 */
void createTable(String path, TableDescriptor descriptor);

/**
 * Creates a temporary view from a Table
 * @param path View name or path
 * @param table Table to register as a view
 */
void createTemporaryView(String path, Table table);

Usage Examples:

// Access existing table
Table orders = tableEnv.from("default_catalog.orders_db.orders");

// Create table from descriptor
TableDescriptor descriptor = TableDescriptor
    .forConnector("kafka")
    .schema(Schema.newBuilder()
        .column("id", DataTypes.BIGINT())
        .column("name", DataTypes.STRING())
        .build())
    .option("topic", "users")
    .build();

tableEnv.createTemporaryTable("users", descriptor);
Table users = tableEnv.from("users");

// Create view from existing table
Table activeUsers = users.filter($("active").isEqual(true));
tableEnv.createTemporaryView("active_users", activeUsers);

Catalog and Database Management

Manages catalog and database contexts for table operations.

/**
 * Sets the current catalog for table operations
 * @param catalogName Name of the catalog to use
 */
void useCatalog(String catalogName);

/**
 * Sets the current database within the current catalog
 * @param databaseName Name of the database to use
 */
void useDatabase(String databaseName);

/**
 * Lists all available catalogs
 * @return Array of catalog names
 */
String[] listCatalogs();

/**
 * Lists all databases in the current catalog
 * @return Array of database names
 */
String[] listDatabases();

/**
 * Lists all tables in the current database
 * @return Array of table names
 */
String[] listTables();

/**
 * Lists all user-defined functions in the current database
 * @return Array of function names
 */
String[] listUserDefinedFunctions();

/**
 * Lists all temporary tables in the current session
 * @return Array of temporary table names
 */
String[] listTemporaryTables();

/**
 * Lists all temporary views in the current session
 * @return Array of temporary view names
 */
String[] listTemporaryViews();

Usage Examples:

// Catalog and database navigation
tableEnv.useCatalog("my_catalog");
tableEnv.useDatabase("analytics");

// List available resources
String[] catalogs = tableEnv.listCatalogs();
String[] databases = tableEnv.listDatabases();
String[] tables = tableEnv.listTables();

// Current context information
String currentCatalog = tableEnv.getCurrentCatalog();
String currentDatabase = tableEnv.getCurrentDatabase();

Table Creation from Values

Creates tables directly from values without external data sources.

/**
 * Creates a table from a list of rows with automatic schema inference
 * @param rows List of Row objects containing the data
 * @return Table containing the specified data
 */
Table fromValues(Row... rows);

/**
 * Creates a table from a list of rows with automatic schema inference
 * @param rows Collection of Row objects containing the data
 * @return Table containing the specified data
 */
Table fromValues(Collection<Row> rows);

/**
 * Creates a table from values with explicit schema
 * @param rowDataType DataType defining the schema for the rows
 * @param rows List of Row objects containing the data
 * @return Table with specified schema and data
 */
Table fromValues(AbstractDataType<?> rowDataType, Row... rows);

/**
 * Creates a table from values with explicit schema
 * @param rowDataType DataType defining the schema for the rows
 * @param rows Collection of Row objects containing the data
 * @return Table with specified schema and data
 */
Table fromValues(AbstractDataType<?> rowDataType, Collection<Row> rows);

/**
 * Creates a table from expression values
 * @param expressions List of expressions representing row values
 * @return Table containing the expression values
 */
Table fromValues(Expression... expressions);

/**
 * Creates a table from expression values
 * @param expressions Collection of expressions representing row values
 * @return Table containing the expression values
 */
Table fromValues(Collection<Expression> expressions);

/**
 * Creates a table from expression values with explicit data type
 * @param rowDataType DataType defining the schema for the expressions
 * @param expressions List of expressions representing row values
 * @return Table with specified schema and expression values
 */
Table fromValues(AbstractDataType<?> rowDataType, Expression... expressions);

/**
 * Creates a table from expression values with explicit data type
 * @param rowDataType DataType defining the schema for the expressions
 * @param expressions Collection of expressions representing row values
 * @return Table with specified schema and expression values
 */
Table fromValues(AbstractDataType<?> rowDataType, Collection<Expression> expressions);

Usage Examples:

import static org.apache.flink.table.api.Expressions.*;

// Create table from Row objects
Row row1 = Row.of(1L, "Alice", 25);
Row row2 = Row.of(2L, "Bob", 30);
Row row3 = Row.of(3L, "Charlie", 35);

Table fromRowsTable = tableEnv.fromValues(row1, row2, row3);

// Create table with explicit schema
AbstractDataType<?> rowType = DataTypes.ROW(
    DataTypes.FIELD("id", DataTypes.BIGINT()),
    DataTypes.FIELD("name", DataTypes.STRING()),
    DataTypes.FIELD("age", DataTypes.INT())
);

Table typedTable = tableEnv.fromValues(rowType, row1, row2, row3);

// Create table from expressions
Table expressionTable = tableEnv.fromValues(
    row(1, "Alice", 25),
    row(2, "Bob", 30),
    row(3, "Charlie", 35)
);

// Create table with explicit data type for expressions
Table typedExpressionTable = tableEnv.fromValues(
    rowType,
    row(1, "Alice", 25),
    row(2, "Bob", 30),
    row(3, "Charlie", 35)
);

// Use in complex queries
Table result = expressionTable
    .filter($("age").isGreater(25))
    .select($("name"), $("age"));

Function Management

Manages user-defined functions registration and lifecycle.

/**
 * Registers a temporary system function that can be used in SQL and Table API
 * @param name Function name for SQL usage
 * @param function UserDefinedFunction implementation
 */
void createTemporarySystemFunction(String name, UserDefinedFunction function);

/**
 * Creates a persistent function in the catalog
 * @param path Function path in catalog.database.function format
 * @param functionClass Class implementing the function
 */
void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);

/**
 * Drops a temporary system function
 * @param name Function name to drop
 * @return true if function existed and was dropped
 */
boolean dropTemporarySystemFunction(String name);

/**
 * Drops a persistent function from the catalog
 * @param path Function path in catalog.database.function format
 */
void dropFunction(String path);

Usage Examples:

// Register scalar function
ScalarFunction myUpper = new ScalarFunction() {
    public String eval(String input) {
        return input != null ? input.toUpperCase() : null;
    }
};
tableEnv.createTemporarySystemFunction("my_upper", myUpper);

// Use in SQL
Table result = tableEnv.sqlQuery("SELECT my_upper(name) FROM users");

// Register aggregate function
tableEnv.createTemporarySystemFunction("my_avg", new MyAverageFunction());

// Create persistent function
tableEnv.createFunction("my_catalog.my_db.my_function", MyFunction.class);

Configuration and Context

Manages table environment configuration and context information.

/**
 * Gets the table configuration for this environment
 * @return TableConfig instance for accessing and modifying settings
 */
TableConfig getConfig();

/**
 * Gets the current catalog name
 * @return Current catalog name
 */
String getCurrentCatalog();

/**
 * Gets the current database name
 * @return Current database name
 */
String getCurrentDatabase();

Usage Examples:

// Access configuration
TableConfig config = tableEnv.getConfig();
config.getConfiguration().setString("table.exec.mini-batch.enabled", "true");

// Get current context
String catalog = tableEnv.getCurrentCatalog();
String database = tableEnv.getCurrentDatabase();

Statement Set Operations

Creates and manages statement sets for batch execution of multiple operations.

/**
 * Creates a StatementSet for batch execution of multiple statements
 * @return StatementSet instance for adding multiple operations
 */
StatementSet createStatementSet();

Usage Examples:

// Batch multiple operations
StatementSet stmtSet = tableEnv.createStatementSet();
stmtSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE type = 'A'");
stmtSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE type = 'B'");
stmtSet.addInsert("sink3", processedTable);

// Execute all statements together
TableResult result = stmtSet.execute();

Types

Environment Settings

class EnvironmentSettings {
    static Builder newInstance();
    
    interface Builder {
        Builder useBlinkPlanner();
        Builder useAnyPlanner();
        Builder inStreamingMode();
        Builder inBatchMode();
        Builder withConfiguration(Configuration configuration);
        EnvironmentSettings build();
    }
}

Table Configuration

class TableConfig {
    Configuration getConfiguration();
    void setSqlDialect(SqlDialect sqlDialect);
    SqlDialect getSqlDialect();
    void setLocalTimeZone(ZoneId zoneId);
    ZoneId getLocalTimeZone();
}

Statement Set

interface StatementSet {
    StatementSet addInsertSql(String statement);
    StatementSet addInsert(String targetPath, Table table);
    StatementSet add(ModifyOperation modifyOperation);
    String explain();
    TableResult execute();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table

docs

catalog-system.md

datastream-integration.md

index.md

sql-execution.md

table-environment.md

table-operations.md

type-system.md

user-defined-functions.md

tile.json