Apache Flink Table API for SQL-like operations on streaming and batch data
—
The Table class provides SQL-like operations for data transformation, filtering, aggregation, and joining. It represents a relational table with a schema and supports fluent method chaining.
Select specific fields or computed expressions from the table.
/**
* Projects fields using expressions
* @param fields Expression-based field selections
* @returns New table with projected fields
*/
def select(fields: Expression*): Table
/**
* Projects fields using string expressions
* @param fields String-based field selections
* @returns New table with projected fields
*/
def select(fields: String): TableUsage Examples:
// Expression-based selection
val projected = table.select('name, 'age + 1, 'salary * 0.1)
// String-based selection
val projected2 = table.select("name, age + 1 as nextAge, salary * 0.1 as bonus")
// Select all fields
val allFields = table.select('*)Filter rows based on predicates and conditions.
/**
* Filters rows using expression predicate
* @param predicate Boolean expression for filtering
* @returns Filtered table
*/
def filter(predicate: Expression): Table
/**
* Filters rows using string predicate
* @param predicate String-based boolean expression
* @returns Filtered table
*/
def filter(predicate: String): Table
/**
* Alias for filter operation
* @param predicate Boolean expression for filtering
* @returns Filtered table
*/
def where(predicate: Expression): TableUsage Examples:
// Expression-based filtering
val adults = table.filter('age >= 18)
val activeUsers = table.where('status === "active")
// String-based filtering
val highEarners = table.filter("salary > 50000")
// Complex conditions
val filtered = table.filter('age >= 21 && 'department === "Engineering")Group rows and perform aggregate operations.
/**
* Groups table by specified fields
* @param fields Grouping field expressions
* @returns GroupedTable for aggregation operations
*/
def groupBy(fields: Expression*): GroupedTable
class GroupedTable {
/**
* Selects fields and aggregates for grouped table
* @param fields Fields and aggregate expressions
* @returns Aggregated table
*/
def select(fields: Expression*): Table
/**
* Selects fields using string expressions
* @param fields String-based field and aggregate selections
* @returns Aggregated table
*/
def select(fields: String): Table
}Usage Examples:
// Basic grouping and aggregation
val grouped = table
.groupBy('department)
.select('department, 'salary.avg, 'age.max, 'name.count)
// Multiple grouping fields
val multiGrouped = table
.groupBy('department, 'level)
.select('department, 'level, 'salary.sum as 'totalSalary)
// String-based aggregation
val stringAgg = table
.groupBy('department)
.select("department, AVG(salary) as avgSalary, COUNT(*) as employeeCount")Order table rows by specified criteria.
/**
* Orders table by specified fields
* @param fields Ordering field expressions (use .asc/.desc for direction)
* @returns Ordered table
*/
def orderBy(fields: Expression*): TableUsage Examples:
// Ascending order (default)
val sortedAsc = table.orderBy('name)
// Descending order
val sortedDesc = table.orderBy('salary.desc)
// Multiple fields
val multiSorted = table.orderBy('department.asc, 'salary.desc, 'name.asc)Remove duplicate rows from the table.
/**
* Removes duplicate rows from the table
* @returns Table with unique rows
*/
def distinct(): TableUsage Examples:
// Remove all duplicates
val unique = table.distinct()
// Distinct after projection
val uniqueNames = table.select('name).distinct()Combine tables using various join strategies.
/**
* Inner join with another table (Cartesian product)
* @param right Right table to join
* @returns Joined table
*/
def join(right: Table): Table
/**
* Inner join with join condition
* @param right Right table to join
* @param joinPredicate Join condition expression
* @returns Joined table
*/
def join(right: Table, joinPredicate: Expression): Table
/**
* Left outer join
* @param right Right table to join
* @param joinPredicate Join condition expression
* @returns Left outer joined table
*/
def leftOuterJoin(right: Table, joinPredicate: Expression): Table
/**
* Right outer join
* @param right Right table to join
* @param joinPredicate Join condition expression
* @returns Right outer joined table
*/
def rightOuterJoin(right: Table, joinPredicate: Expression): Table
/**
* Full outer join
* @param right Right table to join
* @param joinPredicate Join condition expression
* @returns Full outer joined table
*/
def fullOuterJoin(right: Table, joinPredicate: Expression): TableUsage Examples:
val employees = tEnv.scan("Employees")
val departments = tEnv.scan("Departments")
// Inner join
val innerJoined = employees.join(departments, 'emp_dept_id === 'dept_id)
// Left outer join
val leftJoined = employees.leftOuterJoin(departments, 'emp_dept_id === 'dept_id)
// Multiple join conditions
val complexJoin = employees.join(
departments,
'emp_dept_id === 'dept_id && 'emp_status === "active"
)Combine tables using set-based operations.
/**
* Union with another table (removes duplicates)
* @param right Right table for union
* @returns Union of both tables without duplicates
*/
def union(right: Table): Table
/**
* Union all with another table (keeps duplicates)
* @param right Right table for union
* @returns Union of both tables with duplicates
*/
def unionAll(right: Table): Table
/**
* Set difference (removes duplicates)
* @param right Right table for difference
* @returns Rows in left table but not in right table
*/
def minus(right: Table): Table
/**
* Set difference all (keeps duplicates)
* @param right Right table for difference
* @returns All rows in left table minus right table rows
*/
def minusAll(right: Table): Table
/**
* Intersection (removes duplicates)
* @param right Right table for intersection
* @returns Common rows between both tables
*/
def intersect(right: Table): Table
/**
* Intersection all (keeps duplicates)
* @param right Right table for intersection
* @returns All common rows between both tables
*/
def intersectAll(right: Table): TableUsage Examples:
val currentEmployees = tEnv.scan("CurrentEmployees")
val formerEmployees = tEnv.scan("FormerEmployees")
// Union operations
val allEmployees = currentEmployees.union(formerEmployees)
val allEmployeesWithDupes = currentEmployees.unionAll(formerEmployees)
// Set difference
val onlyCurrent = currentEmployees.minus(formerEmployees)
// Intersection
val rehiredEmployees = currentEmployees.intersect(formerEmployees)Rename fields and provide aliases for table references.
/**
* Renames fields of the table
* @param fields New field name expressions
* @returns Table with renamed fields
*/
def as(fields: Expression*): TableUsage Examples:
// Rename all fields
val renamed = table.as('employee_name, 'employee_age, 'employee_salary)
// Rename selected fields after projection
val projected = table
.select('name, 'age, 'salary * 12)
.as('fullName, 'currentAge, 'annualSalary)Access and inspect table schema information.
/**
* Gets the schema of the table
* @returns TableSchema containing field information
*/
def getSchema: TableSchema
/**
* Prints the schema to console
*/
def printSchema(): UnitUsage Examples:
// Get schema information
val schema = table.getSchema
val fieldNames = schema.getFieldNames
val fieldTypes = schema.getFieldTypes
// Print schema for debugging
table.printSchema()Write table results to registered sinks or external systems.
/**
* Writes table to a table sink
* @param sink Table sink for output
*/
def writeToSink[T](sink: TableSink[T]): Unit
/**
* Inserts table data into a registered sink
* @param tableName Name of registered table sink
*/
def insertInto(tableName: String): UnitUsage Examples:
// Write to custom sink
val csvSink = new CsvTableSink("/path/to/output.csv")
table.writeToSink(csvSink)
// Insert into registered sink
tEnv.registerTableSink("OutputTable", fieldNames, fieldTypes, csvSink)
table.insertInto("OutputTable")Control the number of rows returned from sorted results.
/**
* Limits a sorted result from an offset position
* @param offset Number of records to skip
* @returns Table with offset applied
*/
def offset(offset: Int): Table
/**
* Limits a sorted result to the first n rows
* @param fetch Number of records to return (must be >= 0)
* @returns Table limited to first n rows
*/
def fetch(fetch: Int): Table
/**
* Limits a sorted result (deprecated - use offset/fetch instead)
* @param offset Number of records to skip
* @returns Table with limit applied
*/
@deprecated("Use offset() and fetch() instead")
def limit(offset: Int): Table
/**
* Limits a sorted result with offset and fetch (deprecated)
* @param offset Number of records to skip
* @param fetch Number of records to return
* @returns Table with limit applied
*/
@deprecated("Use offset() and fetch() instead")
def limit(offset: Int, fetch: Int): TableUsage Examples:
// Skip first 5 rows
val offsetResult = table.orderBy('name).offset(5)
// Return first 10 rows
val fetchResult = table.orderBy('salary.desc).fetch(10)
// Skip 5 rows and return next 10
val combined = table.orderBy('name).offset(5).fetch(10)
// Deprecated limit usage (still available)
val limited = table.orderBy('name).limit(5, 10)Access internal table representations for advanced use cases.
/**
* Returns the RelNode representation of this table
* @returns RelNode for advanced query operations
*/
def getRelNode: RelNode
/**
* Access to the relation builder for advanced operations
* @returns FlinkRelBuilder instance
*/
def relBuilder: FlinkRelBuilderUsage Examples:
// Access internal RelNode (advanced usage)
val relNode = table.getRelNode
// Access relation builder for complex operations
val builder = table.relBuilderApply windowing operations for time-based aggregations.
/**
* Applies time or count-based windows to the table
* @param window Window specification (Tumble, Slide, or Session)
* @returns WindowedTable for window-based operations
*/
def window(window: Window): WindowedTable
/**
* Applies over-windows for row-based calculations
* @param overWindows Over-window specifications
* @returns OverWindowedTable for over-window operations
*/
def window(overWindows: OverWindow*): OverWindowedTableUsage Examples:
import org.apache.flink.table.api.Tumble
// Tumbling window
val windowedTable = table
.window(Tumble over 10.minutes on 'rowtime as 'w)
.groupBy('w, 'department)
.select('department, 'w.start, 'w.end, 'salary.avg)
// Over window
val overResult = table
.window(Over partitionBy 'department orderBy 'salary.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)
.select('name, 'salary, 'salary.sum over 'w)class Table {
def tableEnv: TableEnvironment
}
class GroupedTable
class WindowedTable
class WindowGroupedTable
class OverWindowedTable
class TableSchema {
def getFieldNames: Array[String]
def getFieldTypes: Array[TypeInformation[_]]
def getFieldCount: Int
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-2-11