CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-2-11

Apache Flink Table API for SQL-like operations on streaming and batch data

Pending
Overview
Eval results
Files

table-operations.mddocs/

Table Operations

The Table class provides SQL-like operations for data transformation, filtering, aggregation, and joining. It represents a relational table with a schema and supports fluent method chaining.

Capabilities

Projection Operations

Select specific fields or computed expressions from the table.

/**
 * Projects fields using expressions
 * @param fields Expression-based field selections
 * @returns New table with projected fields
 */
def select(fields: Expression*): Table

/**
 * Projects fields using string expressions  
 * @param fields String-based field selections
 * @returns New table with projected fields
 */
def select(fields: String): Table

Usage Examples:

// Expression-based selection
val projected = table.select('name, 'age + 1, 'salary * 0.1)

// String-based selection  
val projected2 = table.select("name, age + 1 as nextAge, salary * 0.1 as bonus")

// Select all fields
val allFields = table.select('*)

Filtering Operations

Filter rows based on predicates and conditions.

/**
 * Filters rows using expression predicate
 * @param predicate Boolean expression for filtering
 * @returns Filtered table
 */
def filter(predicate: Expression): Table

/**
 * Filters rows using string predicate
 * @param predicate String-based boolean expression
 * @returns Filtered table  
 */
def filter(predicate: String): Table

/**
 * Alias for filter operation
 * @param predicate Boolean expression for filtering
 * @returns Filtered table
 */
def where(predicate: Expression): Table

Usage Examples:

// Expression-based filtering
val adults = table.filter('age >= 18)
val activeUsers = table.where('status === "active")

// String-based filtering
val highEarners = table.filter("salary > 50000")

// Complex conditions
val filtered = table.filter('age >= 21 && 'department === "Engineering")

Grouping and Aggregation

Group rows and perform aggregate operations.

/**
 * Groups table by specified fields
 * @param fields Grouping field expressions
 * @returns GroupedTable for aggregation operations
 */
def groupBy(fields: Expression*): GroupedTable

class GroupedTable {
  /**
   * Selects fields and aggregates for grouped table
   * @param fields Fields and aggregate expressions
   * @returns Aggregated table
   */
  def select(fields: Expression*): Table
  
  /**
   * Selects fields using string expressions
   * @param fields String-based field and aggregate selections
   * @returns Aggregated table
   */
  def select(fields: String): Table
}

Usage Examples:

// Basic grouping and aggregation
val grouped = table
  .groupBy('department)
  .select('department, 'salary.avg, 'age.max, 'name.count)

// Multiple grouping fields
val multiGrouped = table
  .groupBy('department, 'level)  
  .select('department, 'level, 'salary.sum as 'totalSalary)

// String-based aggregation
val stringAgg = table
  .groupBy('department)
  .select("department, AVG(salary) as avgSalary, COUNT(*) as employeeCount")

Sorting Operations

Order table rows by specified criteria.

/**
 * Orders table by specified fields
 * @param fields Ordering field expressions (use .asc/.desc for direction)
 * @returns Ordered table
 */
def orderBy(fields: Expression*): Table

Usage Examples:

// Ascending order (default)
val sortedAsc = table.orderBy('name)

// Descending order
val sortedDesc = table.orderBy('salary.desc)

// Multiple fields
val multiSorted = table.orderBy('department.asc, 'salary.desc, 'name.asc)

Distinct Operations

Remove duplicate rows from the table.

/**
 * Removes duplicate rows from the table
 * @returns Table with unique rows
 */
def distinct(): Table

Usage Examples:

// Remove all duplicates
val unique = table.distinct()

// Distinct after projection
val uniqueNames = table.select('name).distinct()

Join Operations

Combine tables using various join strategies.

/**
 * Inner join with another table (Cartesian product)
 * @param right Right table to join
 * @returns Joined table
 */
def join(right: Table): Table

/**
 * Inner join with join condition
 * @param right Right table to join
 * @param joinPredicate Join condition expression
 * @returns Joined table
 */
def join(right: Table, joinPredicate: Expression): Table

/**
 * Left outer join
 * @param right Right table to join
 * @param joinPredicate Join condition expression  
 * @returns Left outer joined table
 */
def leftOuterJoin(right: Table, joinPredicate: Expression): Table

/**
 * Right outer join
 * @param right Right table to join
 * @param joinPredicate Join condition expression
 * @returns Right outer joined table
 */
def rightOuterJoin(right: Table, joinPredicate: Expression): Table

/**
 * Full outer join
 * @param right Right table to join  
 * @param joinPredicate Join condition expression
 * @returns Full outer joined table
 */
def fullOuterJoin(right: Table, joinPredicate: Expression): Table

Usage Examples:

val employees = tEnv.scan("Employees")
val departments = tEnv.scan("Departments")

// Inner join
val innerJoined = employees.join(departments, 'emp_dept_id === 'dept_id)

// Left outer join  
val leftJoined = employees.leftOuterJoin(departments, 'emp_dept_id === 'dept_id)

// Multiple join conditions
val complexJoin = employees.join(
  departments, 
  'emp_dept_id === 'dept_id && 'emp_status === "active"
)

Set Operations

Combine tables using set-based operations.

/**
 * Union with another table (removes duplicates)
 * @param right Right table for union
 * @returns Union of both tables without duplicates
 */
def union(right: Table): Table

/**
 * Union all with another table (keeps duplicates)
 * @param right Right table for union
 * @returns Union of both tables with duplicates
 */
def unionAll(right: Table): Table

/**
 * Set difference (removes duplicates)
 * @param right Right table for difference
 * @returns Rows in left table but not in right table
 */
def minus(right: Table): Table

/**
 * Set difference all (keeps duplicates)  
 * @param right Right table for difference
 * @returns All rows in left table minus right table rows
 */
def minusAll(right: Table): Table

/**
 * Intersection (removes duplicates)
 * @param right Right table for intersection
 * @returns Common rows between both tables
 */
def intersect(right: Table): Table

/**
 * Intersection all (keeps duplicates)
 * @param right Right table for intersection  
 * @returns All common rows between both tables
 */
def intersectAll(right: Table): Table

Usage Examples:

val currentEmployees = tEnv.scan("CurrentEmployees")
val formerEmployees = tEnv.scan("FormerEmployees")

// Union operations
val allEmployees = currentEmployees.union(formerEmployees)
val allEmployeesWithDupes = currentEmployees.unionAll(formerEmployees)

// Set difference
val onlyCurrent = currentEmployees.minus(formerEmployees)

// Intersection
val rehiredEmployees = currentEmployees.intersect(formerEmployees)

Field Aliasing and Renaming

Rename fields and provide aliases for table references.

/**
 * Renames fields of the table
 * @param fields New field name expressions  
 * @returns Table with renamed fields
 */
def as(fields: Expression*): Table

Usage Examples:

// Rename all fields
val renamed = table.as('employee_name, 'employee_age, 'employee_salary)

// Rename selected fields after projection
val projected = table
  .select('name, 'age, 'salary * 12)
  .as('fullName, 'currentAge, 'annualSalary)

Schema Operations

Access and inspect table schema information.

/**
 * Gets the schema of the table
 * @returns TableSchema containing field information
 */
def getSchema: TableSchema

/**
 * Prints the schema to console
 */
def printSchema(): Unit

Usage Examples:

// Get schema information
val schema = table.getSchema
val fieldNames = schema.getFieldNames
val fieldTypes = schema.getFieldTypes

// Print schema for debugging
table.printSchema()

Output Operations

Write table results to registered sinks or external systems.

/**
 * Writes table to a table sink
 * @param sink Table sink for output
 */
def writeToSink[T](sink: TableSink[T]): Unit

/**
 * Inserts table data into a registered sink
 * @param tableName Name of registered table sink
 */
def insertInto(tableName: String): Unit

Usage Examples:

// Write to custom sink
val csvSink = new CsvTableSink("/path/to/output.csv")
table.writeToSink(csvSink)

// Insert into registered sink
tEnv.registerTableSink("OutputTable", fieldNames, fieldTypes, csvSink)
table.insertInto("OutputTable")

Result Limiting Operations

Control the number of rows returned from sorted results.

/**
 * Limits a sorted result from an offset position
 * @param offset Number of records to skip
 * @returns Table with offset applied
 */
def offset(offset: Int): Table

/**
 * Limits a sorted result to the first n rows  
 * @param fetch Number of records to return (must be >= 0)
 * @returns Table limited to first n rows
 */
def fetch(fetch: Int): Table

/**
 * Limits a sorted result (deprecated - use offset/fetch instead)
 * @param offset Number of records to skip
 * @returns Table with limit applied
 */
@deprecated("Use offset() and fetch() instead")  
def limit(offset: Int): Table

/**
 * Limits a sorted result with offset and fetch (deprecated)
 * @param offset Number of records to skip
 * @param fetch Number of records to return  
 * @returns Table with limit applied
 */
@deprecated("Use offset() and fetch() instead")
def limit(offset: Int, fetch: Int): Table

Usage Examples:

// Skip first 5 rows
val offsetResult = table.orderBy('name).offset(5)

// Return first 10 rows
val fetchResult = table.orderBy('salary.desc).fetch(10)

// Skip 5 rows and return next 10 
val combined = table.orderBy('name).offset(5).fetch(10)

// Deprecated limit usage (still available)
val limited = table.orderBy('name).limit(5, 10)

Internal API Access

Access internal table representations for advanced use cases.

/**
 * Returns the RelNode representation of this table
 * @returns RelNode for advanced query operations
 */
def getRelNode: RelNode

/**
 * Access to the relation builder for advanced operations
 * @returns FlinkRelBuilder instance
 */
def relBuilder: FlinkRelBuilder

Usage Examples:

// Access internal RelNode (advanced usage)
val relNode = table.getRelNode

// Access relation builder for complex operations
val builder = table.relBuilder

Window Operations

Apply windowing operations for time-based aggregations.

/**
 * Applies time or count-based windows to the table
 * @param window Window specification (Tumble, Slide, or Session)
 * @returns WindowedTable for window-based operations
 */
def window(window: Window): WindowedTable

/**
 * Applies over-windows for row-based calculations
 * @param overWindows Over-window specifications
 * @returns OverWindowedTable for over-window operations  
 */
def window(overWindows: OverWindow*): OverWindowedTable

Usage Examples:

import org.apache.flink.table.api.Tumble

// Tumbling window
val windowedTable = table
  .window(Tumble over 10.minutes on 'rowtime as 'w)
  .groupBy('w, 'department)
  .select('department, 'w.start, 'w.end, 'salary.avg)

// Over window
val overResult = table
  .window(Over partitionBy 'department orderBy 'salary.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)
  .select('name, 'salary, 'salary.sum over 'w)

Types

class Table {
  def tableEnv: TableEnvironment
}

class GroupedTable
class WindowedTable  
class WindowGroupedTable
class OverWindowedTable

class TableSchema {
  def getFieldNames: Array[String]
  def getFieldTypes: Array[TypeInformation[_]]  
  def getFieldCount: Int
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-2-11

docs

index.md

sources-sinks.md

sql-integration.md

table-environment.md

table-operations.md

type-system.md

user-defined-functions.md

window-operations.md

tile.json