Apache Flink's Table API and SQL module for unified stream and batch processing
—
Table is the core abstraction of the Table API, representing a pipeline of data transformations. It provides fluent API methods for selection, filtering, aggregation, joins, and window operations on both bounded and unbounded data streams.
Core operations for selecting fields, filtering data, and basic data manipulation.
/**
* Selects the given fields from the table
* @param fields Expressions defining the selected fields
* @return New Table with selected fields
*/
Table select(Expression... fields);
/**
* Filters rows based on the given predicate
* @param predicate Boolean expression for row filtering
* @return New Table with filtered rows
*/
Table filter(Expression predicate);
/**
* Alias for filter() - filters rows based on the given predicate
* @param predicate Boolean expression for row filtering
* @return New Table with filtered rows
*/
Table where(Expression predicate);
/**
* Renames fields of the table
* @param fields Expressions defining new field names
* @return New Table with renamed fields
*/
Table as(Expression... fields);
/**
* Adds additional columns to the table
* @param fields Expressions defining new columns
* @return New Table with additional columns
*/
Table addColumns(Expression... fields);
/**
* Adds columns or replaces existing ones
* @param fields Expressions defining columns to add or replace
* @return New Table with added/replaced columns
*/
Table addOrReplaceColumns(Expression... fields);
/**
* Drops columns from the table
* @param fields Expressions defining columns to drop
* @return New Table without the specified columns
*/
Table dropColumns(Expression... fields);
/**
* Returns distinct rows from the table
* @return New Table with distinct rows
*/
Table distinct();Usage Examples:
import static org.apache.flink.table.api.Expressions.*;
// Basic selection and filtering
Table customers = tableEnv.from("customers");
Table result = customers
.select($("customer_id"), $("name"), $("email"))
.filter($("age").isGreater(18))
.where($("active").isEqual(true));
// Column manipulation
Table enhanced = customers
.addColumns($("name").upperCase().as("name_upper"))
.dropColumns($("internal_notes"))
.as("customer_id", "full_name", "email_address", "age", "is_active", "name_upper");
// Distinct records
Table uniqueCategories = products
.select($("category"))
.distinct();Operations for grouping data and computing aggregations.
/**
* Groups the table by the given expressions
* @param fields Expressions defining grouping keys
* @return GroupedTable for applying aggregations
*/
GroupedTable groupBy(Expression... fields);
/**
* Applies aggregation functions to grouped or ungrouped table
* @param aggregateExpression Aggregate expression (sum, count, avg, etc.)
* @param moreAggregateExpressions Additional aggregate expressions
* @return AggregatedTable with aggregation results
*/
AggregatedTable aggregate(Expression aggregateExpression, Expression... moreAggregateExpressions);
/**
* Applies flat aggregation using a table aggregate function
* @param tableAggFunction Table aggregate function
* @return FlatAggregateTable with flattened results
*/
FlatAggregateTable flatAggregate(Expression tableAggFunction);Usage Examples:
// Basic grouping and aggregation
Table sales = tableEnv.from("sales");
Table salesByRegion = sales
.groupBy($("region"))
.select($("region"), $("amount").sum().as("total_sales"));
// Multiple aggregations
Table summary = sales
.groupBy($("region"), $("product_category"))
.select(
$("region"),
$("product_category"),
$("amount").sum().as("total_sales"),
$("amount").avg().as("avg_sale"),
$("order_id").count().as("num_orders")
);
// Aggregate without grouping
Table overallStats = sales
.select(
$("amount").sum().as("total"),
$("amount").avg().as("average"),
$("order_id").count().as("count")
);Various types of joins for combining data from multiple tables.
/**
* Inner join with another table
* @param right Right table to join with
* @return New Table with joined results
*/
Table join(Table right);
/**
* Inner join with explicit join condition
* @param right Right table to join with
* @param joinPredicate Boolean expression defining join condition
* @return New Table with joined results
*/
Table join(Table right, Expression joinPredicate);
/**
* Left outer join with another table
* @param right Right table to join with
* @return New Table with left outer join results
*/
Table leftOuterJoin(Table right);
/**
* Left outer join with explicit join condition
* @param right Right table to join with
* @param joinPredicate Boolean expression defining join condition
* @return New Table with left outer join results
*/
Table leftOuterJoin(Table right, Expression joinPredicate);
/**
* Right outer join with another table
* @param right Right table to join with
* @return New Table with right outer join results
*/
Table rightOuterJoin(Table right);
/**
* Right outer join with explicit join condition
* @param right Right table to join with
* @param joinPredicate Boolean expression defining join condition
* @return New Table with right outer join results
*/
Table rightOuterJoin(Table right, Expression joinPredicate);
/**
* Full outer join with another table
* @param right Right table to join with
* @return New Table with full outer join results
*/
Table fullOuterJoin(Table right);
/**
* Full outer join with explicit join condition
* @param right Right table to join with
* @param joinPredicate Boolean expression defining join condition
* @return New Table with full outer join results
*/
Table fullOuterJoin(Table right, Expression joinPredicate);Usage Examples:
Table customers = tableEnv.from("customers");
Table orders = tableEnv.from("orders");
// Inner join with implicit condition (requires common column names)
Table customerOrders = customers.join(orders);
// Inner join with explicit condition
Table explicitJoin = customers
.join(orders, $("customer_id").isEqual($("cust_id")))
.select($("name"), $("order_id"), $("amount"));
// Left outer join to include all customers
Table allCustomers = customers
.leftOuterJoin(orders, $("customer_id").isEqual($("cust_id")))
.select($("name"), $("order_id").isNull().as("no_orders"));
// Complex join with multiple conditions
Table complexJoin = customers
.join(orders,
$("customer_id").isEqual($("cust_id"))
.and($("status").isEqual("active")))
.select($("name"), $("order_date"), $("total"));Time-based window operations for streaming data processing.
/**
* Groups records into windows based on time attributes
* @param window Window specification (tumbling, sliding, session)
* @return WindowGroupedTable for window-based aggregations
*/
WindowGroupedTable window(GroupWindow window);
/**
* Applies over window aggregations
* @param overWindows Over window specifications
* @return New Table with over window results
*/
Table select(Expression... fields);Usage Examples:
// Tumbling window aggregation
Table events = tableEnv.from("events");
Table windowedStats = events
.window(Tumble.over(lit(5).minutes()).on($("event_time")).as("w"))
.groupBy($("user_id"), $("w"))
.select(
$("user_id"),
$("w").start().as("window_start"),
$("w").end().as("window_end"),
$("event_count").sum().as("total_events")
);
// Over window for running calculations
Table runningTotals = sales
.select(
$("product_id"),
$("sale_time"),
$("amount"),
$("amount").sum().over(
partitionBy($("product_id"))
.orderBy($("sale_time"))
.rows().unboundedPreceding().toCurrentRow()
).as("running_total")
);Operations for sorting and limiting result sets.
/**
* Orders the table by the given expressions
* @param fields Expressions defining sort order
* @return New Table with ordered rows
*/
Table orderBy(Expression... fields);
/**
* Limits the number of returned rows
* @param fetch Maximum number of rows to return
* @return New Table with limited rows
*/
Table limit(int fetch);
/**
* Limits with offset and fetch
* @param offset Number of rows to skip
* @param fetch Maximum number of rows to return
* @return New Table with limited rows
*/
Table limit(int offset, int fetch);Usage Examples:
// Order by multiple fields
Table sortedCustomers = customers
.orderBy($("registration_date").desc(), $("name").asc());
// Top N results
Table topSellers = sales
.groupBy($("seller_id"))
.select($("seller_id"), $("amount").sum().as("total_sales"))
.orderBy($("total_sales").desc())
.limit(10);
// Pagination
Table page2 = products
.orderBy($("product_id"))
.limit(20, 10); // Skip 20, take 10Methods for accessing table schema and metadata information.
/**
* Gets the resolved schema of this table
* @return ResolvedSchema containing column information
*/
ResolvedSchema getResolvedSchema();
/**
* Gets the query operation that defines this table
* @return QueryOperation representing the table pipeline
*/
QueryOperation getQueryOperation();Usage Examples:
Table myTable = tableEnv.from("products");
// Access schema information
ResolvedSchema schema = myTable.getResolvedSchema();
List<Column> columns = schema.getColumns();
for (Column column : columns) {
System.out.println(column.getName() + ": " + column.getDataType());
}
// Check column existence
boolean hasCategory = schema.getColumn("category").isPresent();Methods for executing table operations and examining execution plans.
/**
* Executes the table and returns results
* @return TableResult containing execution information and data
*/
TableResult execute();
/**
* Returns the execution plan as a string
* @return String representation of the execution plan
*/
String explain();
/**
* Returns detailed execution plan with specified format
* @param format Format for the explanation (TEXT or JSON)
* @param details Additional details to include in the plan
* @return String representation of the detailed execution plan
*/
String explain(ExplainFormat format, ExplainDetail... details);
/**
* Inserts the table contents into a target table
* @param tablePath Target table path
* @return TableResult with execution information
*/
TableResult insertInto(String tablePath);Usage Examples:
Table result = customers
.filter($("age").isGreater(25))
.select($("name"), $("email"));
// Examine execution plan
System.out.println(result.explain());
// Detailed plan with JSON format
String detailedPlan = result.explain(
ExplainFormat.JSON,
ExplainDetail.CHANGELOG_MODE,
ExplainDetail.ESTIMATED_COST
);
// Execute and process results
TableResult tableResult = result.execute();
try (CloseableIterator<Row> iterator = tableResult.collect()) {
while (iterator.hasNext()) {
Row row = iterator.next();
System.out.println(row);
}
}
// Insert into target table
result.insertInto("customer_summary").execute();Operations for combining tables using set theory operations.
/**
* Returns the union of this table and the given table
* @param right The table to union with
* @return New Table containing union of both tables (with duplicates removed)
*/
Table union(Table right);
/**
* Returns the union of this table and the given table including duplicates
* @param right The table to union with
* @return New Table containing union of both tables (with duplicates)
*/
Table unionAll(Table right);
/**
* Returns the intersection of this table and the given table
* @param right The table to intersect with
* @return New Table containing only rows present in both tables
*/
Table intersect(Table right);
/**
* Returns the intersection of this table and the given table including duplicates
* @param right The table to intersect with
* @return New Table containing intersection with duplicates
*/
Table intersectAll(Table right);
/**
* Returns the minus operation (difference) of this table and the given table
* @param right The table to subtract from this table
* @return New Table containing rows in this table but not in the right table
*/
Table minus(Table right);
/**
* Returns the minus operation including duplicates
* @param right The table to subtract from this table
* @return New Table containing difference with duplicates
*/
Table minusAll(Table right);Usage Examples:
Table europeanCustomers = tableEnv.from("customers_europe");
Table americanCustomers = tableEnv.from("customers_america");
// Union all customers
Table allCustomers = europeanCustomers.unionAll(americanCustomers);
// Find common customer IDs between regions (for validation)
Table commonIds = europeanCustomers
.select($("customer_id"))
.intersect(americanCustomers.select($("customer_id")));
// Find customers only in Europe
Table europeOnly = europeanCustomers
.select($("customer_id"))
.minus(americanCustomers.select($("customer_id")));Join operations with table-valued functions for dynamic table expansion.
/**
* Performs a lateral join with a table function
* @param tableFunctionCall Expression calling a table function
* @return New Table with lateral join results
*/
Table joinLateral(Expression tableFunctionCall);
/**
* Performs a lateral join with a table function and join condition
* @param tableFunctionCall Expression calling a table function
* @param joinPredicate Join condition expression
* @return New Table with lateral join results
*/
Table joinLateral(Expression tableFunctionCall, Expression joinPredicate);
/**
* Performs a left outer lateral join with a table function
* @param tableFunctionCall Expression calling a table function
* @return New Table with left outer lateral join results
*/
Table leftOuterJoinLateral(Expression tableFunctionCall);
/**
* Performs a left outer lateral join with a table function and join condition
* @param tableFunctionCall Expression calling a table function
* @param joinPredicate Join condition expression
* @return New Table with left outer lateral join results
*/
Table leftOuterJoinLateral(Expression tableFunctionCall, Expression joinPredicate);Usage Examples:
// Lateral join with a table function to split comma-separated values
Table orders = tableEnv.from("orders");
Table expandedOrders = orders
.joinLateral(call("split_string", $("item_list"), ",").as("item_id"))
.select($("order_id"), $("customer_id"), $("item_id"));
// Left outer lateral join to handle orders with no items
Table allOrders = orders
.leftOuterJoinLateral(call("split_string", $("item_list"), ",").as("item_id"))
.select($("order_id"), $("customer_id"), $("item_id"));Operations using scalar and table functions for data transformation.
/**
* Applies a scalar function to each row
* @param mapFunction Scalar function expression
* @return New Table with function results
*/
Table map(Expression mapFunction);
/**
* Applies a table function that can produce multiple rows per input row
* @param tableFunction Table function expression
* @return New Table with flattened results
*/
Table flatMap(Expression tableFunction);Usage Examples:
Table events = tableEnv.from("events");
// Map operation to transform each row
Table transformedEvents = events
.map(call("parse_json", $("json_data")).as("parsed_data"))
.select($("event_id"), $("parsed_data"));
// FlatMap operation to explode arrays
Table expandedEvents = events
.flatMap(call("explode_array", $("tag_array")).as("tag"))
.select($("event_id"), $("event_time"), $("tag"));Operations for creating temporal table functions from tables.
/**
* Creates a temporal table function from this table
* @param timeAttribute Expression identifying the time attribute
* @param primaryKey Expression identifying the primary key
* @return TemporalTableFunction for temporal joins
*/
TemporalTableFunction createTemporalTableFunction(Expression timeAttribute, Expression primaryKey);Usage Examples:
Table exchangeRates = tableEnv.from("exchange_rates");
// Create temporal table function for exchange rates
TemporalTableFunction ratesFunction = exchangeRates
.createTemporalTableFunction($("rate_time"), $("currency"));
// Register for use in temporal joins
tableEnv.createTemporarySystemFunction("rates", ratesFunction);
// Use in temporal join
Table orders = tableEnv.from("orders");
Table ordersWithRates = orders
.joinLateral(call("rates", $("order_time")).as("rate_currency", "exchange_rate"))
.select($("order_id"), $("amount"), $("exchange_rate"),
$("amount").times($("exchange_rate")).as("amount_usd"));Additional operations for advanced column manipulation.
/**
* Renames columns of the table
* @param fields Expressions defining new column names
* @return New Table with renamed columns
*/
Table renameColumns(Expression... fields);
/**
* Skips the first n rows
* @param offset Number of rows to skip
* @return New Table with offset applied
*/
Table offset(int offset);
/**
* Takes the first n rows after any offset
* @param fetch Number of rows to take
* @return New Table with fetch applied
*/
Table fetch(int fetch);
/**
* Creates an alias name for the table with optional field names
* @param field First field name
* @param fields Additional field names
* @return New Table with alias applied
*/
Table as(String field, String... fields);Usage Examples:
Table products = tableEnv.from("products");
// Rename columns
Table renamedProducts = products
.renameColumns($("prod_id").as("product_id"), $("prod_name").as("product_name"));
// Pagination using offset and fetch
Table page3Products = products
.orderBy($("product_id"))
.offset(20)
.fetch(10);
// Alias table and columns
Table aliasedProducts = products.as("p", "id", "name", "price", "category");Operations for inserting table data into target tables.
/**
* Creates a pipeline to insert table data into the specified table
* @param tablePath Target table path
* @return TablePipeline for further configuration
*/
TablePipeline insertInto(String tablePath);
/**
* Creates a pipeline to insert table data with overwrite option
* @param tablePath Target table path
* @param overwrite Whether to overwrite existing data
* @return TablePipeline for further configuration
*/
TablePipeline insertInto(String tablePath, boolean overwrite);
/**
* Creates a pipeline to insert table data using table descriptor
* @param descriptor Table descriptor defining the target
* @return TablePipeline for further configuration
*/
TablePipeline insertInto(TableDescriptor descriptor);
/**
* Creates a pipeline to insert table data using table descriptor with overwrite
* @param descriptor Table descriptor defining the target
* @param overwrite Whether to overwrite existing data
* @return TablePipeline for further configuration
*/
TablePipeline insertInto(TableDescriptor descriptor, boolean overwrite);
/**
* Directly executes insert into the specified table
* @param tablePath Target table path
* @return TableResult with execution information
*/
TableResult executeInsert(String tablePath);
/**
* Directly executes insert with overwrite option
* @param tablePath Target table path
* @param overwrite Whether to overwrite existing data
* @return TableResult with execution information
*/
TableResult executeInsert(String tablePath, boolean overwrite);
/**
* Directly executes insert using table descriptor
* @param descriptor Table descriptor defining the target
* @return TableResult with execution information
*/
TableResult executeInsert(TableDescriptor descriptor);
/**
* Directly executes insert using table descriptor with overwrite
* @param descriptor Table descriptor defining the target
* @param overwrite Whether to overwrite existing data
* @return TableResult with execution information
*/
TableResult executeInsert(TableDescriptor descriptor, boolean overwrite);Usage Examples:
Table processedOrders = orders
.filter($("status").isEqual("processed"))
.select($("order_id"), $("customer_id"), $("total_amount"));
// Direct insert execution
TableResult result = processedOrders.executeInsert("processed_orders");
// Pipeline-based insert for more control
TablePipeline pipeline = processedOrders.insertInto("processed_orders", true);
TableResult pipelineResult = pipeline.execute();
// Insert using table descriptor
TableDescriptor targetDescriptor = TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("order_id", DataTypes.BIGINT())
.column("customer_id", DataTypes.BIGINT())
.column("total_amount", DataTypes.DECIMAL(10, 2))
.build())
.option("topic", "processed-orders")
.build();
TableResult descriptorResult = processedOrders.executeInsert(targetDescriptor);interface GroupedTable {
Table select(Expression... fields);
AggregatedTable aggregate(Expression aggregateExpression, Expression... moreAggregateExpressions);
FlatAggregateTable flatAggregate(Expression tableAggFunction);
}
interface WindowGroupedTable extends GroupedTable {
// Inherits all GroupedTable methods
}interface AggregatedTable {
Table select(Expression... fields);
AggregatedTable aggregate(Expression aggregateExpression, Expression... moreAggregateExpressions);
}
interface FlatAggregateTable {
Table select(Expression... fields);
FlatAggregateTable flatAggregate(Expression tableAggFunction);
}class ResolvedSchema {
List<Column> getColumns();
Optional<Column> getColumn(String name);
Optional<Column> getColumn(int index);
List<String> getColumnNames();
List<DataType> getColumnDataTypes();
Optional<UniqueConstraint> getPrimaryKey();
List<WatermarkSpec> getWatermarkSpecs();
}
class Column {
String getName();
DataType getDataType();
String getComment();
boolean isPhysical();
boolean isComputed();
boolean isMetadata();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table