Apache Flink Table API for SQL-like operations on streaming and batch data
—
The TableEnvironment is the main entry point of the Table API, providing methods for registering tables, executing SQL queries, and configuring the table program.
Factory methods for creating batch and streaming table environments.
object TableEnvironment {
/**
* Creates a batch table environment with an ExecutionEnvironment
* @param executionEnvironment The batch execution environment
* @returns BatchTableEnvironment for batch processing
*/
def getTableEnvironment(executionEnvironment: ExecutionEnvironment): BatchTableEnvironment
/**
* Creates a streaming table environment with a StreamExecutionEnvironment
* @param executionEnvironment The stream execution environment
* @returns StreamTableEnvironment for stream processing
*/
def getTableEnvironment(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
}Usage Examples:
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
// Batch environment
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val batchTableEnv = TableEnvironment.getTableEnvironment(batchEnv)
// Streaming environment
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = TableEnvironment.getTableEnvironment(streamEnv)Register and manage tables within the table environment.
/**
* Registers a table in the catalog for later reference
* @param name Name to register the table under
* @param table Table instance to register
*/
def registerTable(name: String, table: Table): Unit
/**
* Registers a table source in the catalog
* @param name Name to register the source under
* @param tableSource The table source to register
*/
def registerTableSource(name: String, tableSource: TableSource[_]): Unit
/**
* Registers a table sink in the catalog
* @param name Name to register the sink under
* @param fieldNames Field names of the sink
* @param fieldTypes Field types of the sink
* @param tableSink The table sink to register
*/
def registerTableSink(
name: String,
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]],
tableSink: TableSink[_]
): Unit
/**
* Scans a registered table by name
* @param tablePath Table name or path components
* @returns Table instance for the registered table
*/
def scan(tablePath: String*): Table
/**
* Creates a table from a table source
* @param source The table source to create table from
* @returns Table instance wrapping the source
*/
def fromTableSource(source: TableSource[_]): Table
/**
* Lists all registered tables in the environment
* @returns Array of table names
*/
def listTables(): Array[String]Usage Examples:
// Register a table
val dataSet = env.fromElements((1, "Alice"), (2, "Bob"))
val table = dataSet.toTable(tEnv, 'id, 'name)
tEnv.registerTable("Users", table)
// Scan registered table
val users = tEnv.scan("Users")
// Register and use table source
val csvSource = new CsvTableSource("/path/to/file.csv", Array("id", "name"), Array(Types.INT, Types.STRING))
tEnv.registerTableSource("CsvUsers", csvSource)
val csvTable = tEnv.scan("CsvUsers")Execute SQL queries and statements directly on the table environment.
/**
* Executes a SQL query and returns the result as a Table
* @param query The SQL query string
* @returns Table containing query results
*/
def sqlQuery(query: String): Table
/**
* Executes a SQL statement (DDL/DML)
* @param stmt The SQL statement string
*/
def sqlUpdate(stmt: String): UnitUsage Examples:
// SQL queries
val result = tEnv.sqlQuery("SELECT id, name FROM Users WHERE id > 1")
val aggregated = tEnv.sqlQuery("SELECT COUNT(*) as user_count FROM Users")
// SQL DDL/DML
tEnv.sqlUpdate("CREATE TABLE OutputTable (id INT, name STRING)")
tEnv.sqlUpdate("INSERT INTO OutputTable SELECT * FROM Users")Register user-defined functions for use in queries.
/**
* Registers a scalar function
* @param name Function name for SQL usage
* @param function ScalarFunction implementation
*/
def registerFunction(name: String, function: ScalarFunction): Unit
/**
* Lists all registered user-defined functions
* @returns Array of function names
*/
def listUserDefinedFunctions(): Array[String]Usage Examples:
// Register custom function
class AddOne extends ScalarFunction {
def eval(x: Int): Int = x + 1
}
tEnv.registerFunction("addOne", new AddOne())
// Use in SQL
val result = tEnv.sqlQuery("SELECT addOne(id) FROM Users")Register external catalogs for accessing external metadata stores.
/**
* Registers an external catalog
* @param name Catalog name
* @param externalCatalog External catalog implementation
*/
def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit
/**
* Gets the default catalog
* @returns The default external catalog
*/
def getDefaultCatalog: ExternalCatalogAccess configuration and utility methods.
/**
* Gets the table configuration
* @returns TableConfig instance
*/
def getConfig: TableConfig
/**
* Explains the execution plan for a table
* @param table Table to explain
* @returns String representation of execution plan
*/
def explain(table: Table): StringUsage Examples:
// Configure timezone
val config = tEnv.getConfig
config.setTimeZone(TimeZone.getTimeZone("UTC"))
// Explain query plan
val table = tEnv.scan("Users").select('name)
println(tEnv.explain(table))Batch-specific table environment with DataSet conversion capabilities.
abstract class BatchTableEnvironment extends TableEnvironment {
/**
* Converts a Table to a DataSet
* @param table Table to convert
* @returns DataSet of Row
*/
def toDataSet[T](table: Table): DataSet[T]
/**
* Converts a Table to a DataSet with specific type
* @param table Table to convert
* @param typeInfo Type information for conversion
* @returns Typed DataSet
*/
def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T]
}Stream-specific table environment with DataStream conversion capabilities.
abstract class StreamTableEnvironment extends TableEnvironment {
/**
* Converts a Table to an append-only DataStream
* @param table Table to convert
* @returns DataStream of Row
*/
def toAppendStream[T](table: Table): DataStream[T]
/**
* Converts a Table to a retract DataStream
* @param table Table to convert
* @returns DataStream of (Boolean, T) where Boolean indicates add/retract
*/
def toRetractStream[T](table: Table): DataStream[(Boolean, T)]
}Usage Examples:
// Batch conversion
val batchResult: DataSet[Row] = batchTableEnv.toDataSet(table)
// Stream conversions
val appendStream: DataStream[Row] = streamTableEnv.toAppendStream(table)
val retractStream: DataStream[(Boolean, Row)] = streamTableEnv.toRetractStream(aggregatedTable)abstract class TableEnvironment
abstract class BatchTableEnvironment extends TableEnvironment
abstract class StreamTableEnvironment extends TableEnvironment
trait ExternalCatalog {
/**
* Gets a table from the external catalog
* @param tablePath Table path components
* @returns Table from the external catalog
*/
def getTable(tablePath: String*): Table
/**
* Lists all tables in the external catalog
* @returns List of table names
*/
def listTables(): java.util.List[String]
/**
* Gets a database from the external catalog
* @param databaseName Name of the database
* @returns External catalog database
*/
def getDatabase(databaseName: String): ExternalCatalogDatabase
}
trait ExternalCatalogDatabase {
/**
* Gets a table from the database
* @param tableName Name of the table
* @returns Table from the database
*/
def getTable(tableName: String): Table
/**
* Lists all tables in the database
* @returns List of table names in the database
*/
def listTables(): java.util.List[String]
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-2-11