Apache Flink's Table API and SQL module for unified stream and batch processing
—
TableEnvironment is the entry point and central context for creating Table and SQL API programs. It provides a unified interface for both streaming and batch processing, managing catalogs, databases, functions, and configuration.
Creates table environments with specific settings for streaming or batch processing.
/**
* Creates a table environment that is the entry point for Table and SQL API programs
* @param settings The environment settings used to instantiate the TableEnvironment
* @return TableEnvironment instance
*/
static TableEnvironment create(EnvironmentSettings settings);
/**
* Creates a table environment with configuration
* @param configuration The configuration for the table environment
* @return TableEnvironment instance
*/
static TableEnvironment create(Configuration configuration);Usage Examples:
// Streaming environment
EnvironmentSettings streamingSettings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment streamingEnv = TableEnvironment.create(streamingSettings);
// Batch environment
EnvironmentSettings batchSettings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
TableEnvironment batchEnv = TableEnvironment.create(batchSettings);
// With custom configuration
Configuration config = new Configuration();
config.setString("table.exec.mini-batch.enabled", "true");
TableEnvironment configEnv = TableEnvironment.create(config);Executes SQL queries and statements with support for both queries and DDL/DML operations.
/**
* Evaluates a SQL query on registered tables and returns the result as a Table
* @param query SQL query string
* @return Table representing the query result
*/
Table sqlQuery(String query);
/**
* Executes a single SQL statement and returns the execution result
* @param statement SQL statement (DDL, DML, or query)
* @return TableResult containing execution information and data
*/
TableResult executeSql(String statement);Usage Examples:
// Query execution
Table result = tableEnv.sqlQuery(
"SELECT customer_id, SUM(amount) as total " +
"FROM orders " +
"WHERE order_date >= '2024-01-01' " +
"GROUP BY customer_id"
);
// DDL execution
tableEnv.executeSql(
"CREATE TABLE orders (" +
" order_id BIGINT," +
" customer_id BIGINT," +
" amount DECIMAL(10,2)," +
" order_date DATE" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'orders'" +
")"
);
// DML execution
TableResult insertResult = tableEnv.executeSql(
"INSERT INTO target_table SELECT * FROM source_table WHERE active = true"
);Manages table creation, registration, and access within the table environment.
/**
* Creates a Table from a registered table path
* @param path Table path in catalog.database.table format
* @return Table instance for the specified path
*/
Table from(String path);
/**
* Creates a Table from a table descriptor
* @param descriptor TableDescriptor defining the table structure and properties
* @return Table instance based on the descriptor
*/
Table from(TableDescriptor descriptor);
/**
* Creates a temporary table in the current catalog and database
* @param path Table path (can be simple name or catalog.database.table)
* @param descriptor TableDescriptor defining the table
*/
void createTemporaryTable(String path, TableDescriptor descriptor);
/**
* Creates a persistent table in the catalog
* @param path Table path in catalog.database.table format
* @param descriptor TableDescriptor defining the table
*/
void createTable(String path, TableDescriptor descriptor);
/**
* Creates a temporary view from a Table
* @param path View name or path
* @param table Table to register as a view
*/
void createTemporaryView(String path, Table table);Usage Examples:
// Access existing table
Table orders = tableEnv.from("default_catalog.orders_db.orders");
// Create table from descriptor
TableDescriptor descriptor = TableDescriptor
.forConnector("kafka")
.schema(Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.build())
.option("topic", "users")
.build();
tableEnv.createTemporaryTable("users", descriptor);
Table users = tableEnv.from("users");
// Create view from existing table
Table activeUsers = users.filter($("active").isEqual(true));
tableEnv.createTemporaryView("active_users", activeUsers);Manages catalog and database contexts for table operations.
/**
* Sets the current catalog for table operations
* @param catalogName Name of the catalog to use
*/
void useCatalog(String catalogName);
/**
* Sets the current database within the current catalog
* @param databaseName Name of the database to use
*/
void useDatabase(String databaseName);
/**
* Lists all available catalogs
* @return Array of catalog names
*/
String[] listCatalogs();
/**
* Lists all databases in the current catalog
* @return Array of database names
*/
String[] listDatabases();
/**
* Lists all tables in the current database
* @return Array of table names
*/
String[] listTables();
/**
* Lists all user-defined functions in the current database
* @return Array of function names
*/
String[] listUserDefinedFunctions();
/**
* Lists all temporary tables in the current session
* @return Array of temporary table names
*/
String[] listTemporaryTables();
/**
* Lists all temporary views in the current session
* @return Array of temporary view names
*/
String[] listTemporaryViews();Usage Examples:
// Catalog and database navigation
tableEnv.useCatalog("my_catalog");
tableEnv.useDatabase("analytics");
// List available resources
String[] catalogs = tableEnv.listCatalogs();
String[] databases = tableEnv.listDatabases();
String[] tables = tableEnv.listTables();
// Current context information
String currentCatalog = tableEnv.getCurrentCatalog();
String currentDatabase = tableEnv.getCurrentDatabase();Creates tables directly from values without external data sources.
/**
* Creates a table from a list of rows with automatic schema inference
* @param rows List of Row objects containing the data
* @return Table containing the specified data
*/
Table fromValues(Row... rows);
/**
* Creates a table from a list of rows with automatic schema inference
* @param rows Collection of Row objects containing the data
* @return Table containing the specified data
*/
Table fromValues(Collection<Row> rows);
/**
* Creates a table from values with explicit schema
* @param rowDataType DataType defining the schema for the rows
* @param rows List of Row objects containing the data
* @return Table with specified schema and data
*/
Table fromValues(AbstractDataType<?> rowDataType, Row... rows);
/**
* Creates a table from values with explicit schema
* @param rowDataType DataType defining the schema for the rows
* @param rows Collection of Row objects containing the data
* @return Table with specified schema and data
*/
Table fromValues(AbstractDataType<?> rowDataType, Collection<Row> rows);
/**
* Creates a table from expression values
* @param expressions List of expressions representing row values
* @return Table containing the expression values
*/
Table fromValues(Expression... expressions);
/**
* Creates a table from expression values
* @param expressions Collection of expressions representing row values
* @return Table containing the expression values
*/
Table fromValues(Collection<Expression> expressions);
/**
* Creates a table from expression values with explicit data type
* @param rowDataType DataType defining the schema for the expressions
* @param expressions List of expressions representing row values
* @return Table with specified schema and expression values
*/
Table fromValues(AbstractDataType<?> rowDataType, Expression... expressions);
/**
* Creates a table from expression values with explicit data type
* @param rowDataType DataType defining the schema for the expressions
* @param expressions Collection of expressions representing row values
* @return Table with specified schema and expression values
*/
Table fromValues(AbstractDataType<?> rowDataType, Collection<Expression> expressions);Usage Examples:
import static org.apache.flink.table.api.Expressions.*;
// Create table from Row objects
Row row1 = Row.of(1L, "Alice", 25);
Row row2 = Row.of(2L, "Bob", 30);
Row row3 = Row.of(3L, "Charlie", 35);
Table fromRowsTable = tableEnv.fromValues(row1, row2, row3);
// Create table with explicit schema
AbstractDataType<?> rowType = DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT())
);
Table typedTable = tableEnv.fromValues(rowType, row1, row2, row3);
// Create table from expressions
Table expressionTable = tableEnv.fromValues(
row(1, "Alice", 25),
row(2, "Bob", 30),
row(3, "Charlie", 35)
);
// Create table with explicit data type for expressions
Table typedExpressionTable = tableEnv.fromValues(
rowType,
row(1, "Alice", 25),
row(2, "Bob", 30),
row(3, "Charlie", 35)
);
// Use in complex queries
Table result = expressionTable
.filter($("age").isGreater(25))
.select($("name"), $("age"));Manages user-defined functions registration and lifecycle.
/**
* Registers a temporary system function that can be used in SQL and Table API
* @param name Function name for SQL usage
* @param function UserDefinedFunction implementation
*/
void createTemporarySystemFunction(String name, UserDefinedFunction function);
/**
* Creates a persistent function in the catalog
* @param path Function path in catalog.database.function format
* @param functionClass Class implementing the function
*/
void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);
/**
* Drops a temporary system function
* @param name Function name to drop
* @return true if function existed and was dropped
*/
boolean dropTemporarySystemFunction(String name);
/**
* Drops a persistent function from the catalog
* @param path Function path in catalog.database.function format
*/
void dropFunction(String path);Usage Examples:
// Register scalar function
ScalarFunction myUpper = new ScalarFunction() {
public String eval(String input) {
return input != null ? input.toUpperCase() : null;
}
};
tableEnv.createTemporarySystemFunction("my_upper", myUpper);
// Use in SQL
Table result = tableEnv.sqlQuery("SELECT my_upper(name) FROM users");
// Register aggregate function
tableEnv.createTemporarySystemFunction("my_avg", new MyAverageFunction());
// Create persistent function
tableEnv.createFunction("my_catalog.my_db.my_function", MyFunction.class);Manages table environment configuration and context information.
/**
* Gets the table configuration for this environment
* @return TableConfig instance for accessing and modifying settings
*/
TableConfig getConfig();
/**
* Gets the current catalog name
* @return Current catalog name
*/
String getCurrentCatalog();
/**
* Gets the current database name
* @return Current database name
*/
String getCurrentDatabase();Usage Examples:
// Access configuration
TableConfig config = tableEnv.getConfig();
config.getConfiguration().setString("table.exec.mini-batch.enabled", "true");
// Get current context
String catalog = tableEnv.getCurrentCatalog();
String database = tableEnv.getCurrentDatabase();Creates and manages statement sets for batch execution of multiple operations.
/**
* Creates a StatementSet for batch execution of multiple statements
* @return StatementSet instance for adding multiple operations
*/
StatementSet createStatementSet();Usage Examples:
// Batch multiple operations
StatementSet stmtSet = tableEnv.createStatementSet();
stmtSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE type = 'A'");
stmtSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE type = 'B'");
stmtSet.addInsert("sink3", processedTable);
// Execute all statements together
TableResult result = stmtSet.execute();class EnvironmentSettings {
static Builder newInstance();
interface Builder {
Builder useBlinkPlanner();
Builder useAnyPlanner();
Builder inStreamingMode();
Builder inBatchMode();
Builder withConfiguration(Configuration configuration);
EnvironmentSettings build();
}
}class TableConfig {
Configuration getConfiguration();
void setSqlDialect(SqlDialect sqlDialect);
SqlDialect getSqlDialect();
void setLocalTimeZone(ZoneId zoneId);
ZoneId getLocalTimeZone();
}interface StatementSet {
StatementSet addInsertSql(String statement);
StatementSet addInsert(String targetPath, Table table);
StatementSet add(ModifyOperation modifyOperation);
String explain();
TableResult execute();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table