Apache Flink's Table API and SQL module for unified stream and batch processing
—
StreamTableEnvironment provides seamless integration between Flink's Table API and DataStream API, enabling conversion between tables and data streams for complex stream processing pipelines that combine both APIs.
Creates streaming table environments with DataStream integration capabilities.
/**
* Creates a StreamTableEnvironment from a StreamExecutionEnvironment
* @param executionEnvironment The StreamExecutionEnvironment for stream processing
* @return StreamTableEnvironment with DataStream integration
*/
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
/**
* Creates a StreamTableEnvironment with custom settings
* @param executionEnvironment The StreamExecutionEnvironment for stream processing
* @param settings Environment settings for the table environment
* @return StreamTableEnvironment with specified settings
*/
static StreamTableEnvironment create(
StreamExecutionEnvironment executionEnvironment,
EnvironmentSettings settings
);Usage Examples:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
// Create from execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// With custom settings
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment customTableEnv = StreamTableEnvironment.create(env, settings);Converts DataStream objects to Table objects for SQL and Table API operations.
/**
* Creates a Table from a DataStream with automatic schema inference
* @param dataStream The DataStream to convert
* @return Table representing the DataStream
*/
<T> Table fromDataStream(DataStream<T> dataStream);
/**
* Creates a Table from a DataStream with explicit schema
* @param dataStream The DataStream to convert
* @param schema Schema definition for the resulting Table
* @return Table with specified schema
*/
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
/**
* Creates a Table from a DataStream with field expressions
* @param dataStream The DataStream to convert
* @param fields Expressions defining field mappings and types
* @return Table with specified field mappings
*/
<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);Usage Examples:
// Simple POJO DataStream
DataStream<Order> orderStream = env.fromSource(orderSource, watermarkStrategy, "orders");
// Automatic schema inference
Table ordersTable = tableEnv.fromDataStream(orderStream);
// With explicit schema
Schema orderSchema = Schema.newBuilder()
.column("orderId", DataTypes.BIGINT())
.column("customerId", DataTypes.BIGINT())
.column("amount", DataTypes.DECIMAL(10, 2))
.column("orderTime", DataTypes.TIMESTAMP(3))
.watermark("orderTime", $("orderTime").minus(lit(5).seconds()))
.build();
Table ordersWithSchema = tableEnv.fromDataStream(orderStream, orderSchema);
// With field expressions
Table ordersWithFields = tableEnv.fromDataStream(
orderStream,
$("orderId"),
$("customerId"),
$("amount"),
$("orderTime").rowtime()
);Converts Table objects back to DataStream objects for further stream processing.
/**
* Converts a Table to a DataStream of Row objects
* @param table The Table to convert
* @return DataStream<Row> representing the Table data
*/
DataStream<Row> toDataStream(Table table);
/**
* Converts a Table to a DataStream with explicit target class
* @param table The Table to convert
* @param targetClass Target class for DataStream elements
* @return DataStream with specified element type
*/
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
/**
* Converts a Table to a DataStream with specific data type
* @param table The Table to convert
* @param targetDataType Target data type for DataStream elements
* @return DataStream with specified data type
*/
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);Usage Examples:
// Table with aggregated results
Table aggregatedOrders = tableEnv
.from("orders")
.groupBy($("customerId"))
.select($("customerId"), $("amount").sum().as("totalAmount"));
// Convert to DataStream with automatic inference
DataStream<Row> resultStream = tableEnv.toDataStream(aggregatedOrders);
// Convert to specific POJO class
@Data
public class CustomerTotal {
public Long customerId;
public BigDecimal totalAmount;
}
DataStream<CustomerTotal> pojoStream = tableEnv.toDataStream(
aggregatedOrders,
CustomerTotal.class
);
// Convert with explicit data type
DataStream<Row> typedStream = tableEnv.toDataStream(
aggregatedOrders,
DataTypes.ROW(
DataTypes.FIELD("customerId", DataTypes.BIGINT()),
DataTypes.FIELD("totalAmount", DataTypes.DECIMAL(10, 2))
)
);Handles change data capture (CDC) and changelog streams for maintaining state consistency.
/**
* Converts a Table to a changelog DataStream
* @param table The Table to convert (should support changelog)
* @return DataStream of Row with RowKind information
*/
DataStream<Row> toChangelogStream(Table table);
/**
* Converts a Table to a changelog DataStream with explicit schema
* @param table The Table to convert
* @param targetSchema Schema for the resulting changelog stream
* @return DataStream of Row with RowKind information
*/
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
/**
* Converts a Table to a changelog DataStream with explicit schema and changelog mode
* @param table The Table to convert
* @param targetSchema Schema for the resulting changelog stream
* @param changelogMode Changelog mode specifying supported operations
* @return DataStream of Row with RowKind information
*/
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);Usage Examples:
// Table with updates and deletes
Table updatingTable = tableEnv
.from("user_updates")
.groupBy($("userId"))
.select($("userId"), $("lastUpdate").max().as("latestUpdate"));
// Convert to changelog stream
DataStream<Row> changelogStream = tableEnv.toChangelogStream(updatingTable);
// Process changelog events
changelogStream.map(new MapFunction<Row, String>() {
@Override
public String map(Row row) throws Exception {
RowKind kind = row.getKind();
switch (kind) {
case INSERT:
return "New user: " + row.getField(0);
case UPDATE_AFTER:
return "Updated user: " + row.getField(0);
case DELETE:
return "Deleted user: " + row.getField(0);
default:
return "Unknown change: " + row.getField(0);
}
}
});Converts changelog DataStreams back to Table objects for further table operations.
/**
* Converts a changelog DataStream to a Table
* @param dataStream Changelog DataStream with RowKind information
* @return Table representing the changelog stream
*/
Table fromChangelogStream(DataStream<Row> dataStream);
/**
* Converts a changelog DataStream to a Table with explicit schema
* @param dataStream Changelog DataStream with RowKind information
* @param schema Schema for the resulting table
* @return Table with specified schema
*/
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
/**
* Converts a changelog DataStream to a Table with schema and changelog mode
* @param dataStream Changelog DataStream with RowKind information
* @param schema Schema for the resulting table
* @param changelogMode Changelog mode specifying supported operations
* @return Table with specified schema and changelog mode
*/
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);Usage Examples:
// DataStream with changelog semantics
DataStream<Row> changelogStream = env.addSource(new ChangelogSource());
// Convert to table with automatic schema inference
Table changelogTable = tableEnv.fromChangelogStream(changelogStream);
// Convert with explicit schema
Schema explicitSchema = Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("user_name", DataTypes.STRING())
.column("last_login", DataTypes.TIMESTAMP(3))
.build();
Table typedChangelogTable = tableEnv.fromChangelogStream(changelogStream, explicitSchema);
// With changelog mode specification
ChangelogMode updateMode = ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
Table modeAwareTable = tableEnv.fromChangelogStream(
changelogStream,
explicitSchema,
updateMode
);Creates temporary views from DataStream objects for SQL query access.
/**
* Creates a temporary view from a DataStream
* @param path View name/path
* @param dataStream DataStream to create view from
*/
<T> void createTemporaryView(String path, DataStream<T> dataStream);
/**
* Creates a temporary view from a DataStream with explicit schema
* @param path View name/path
* @param dataStream DataStream to create view from
* @param schema Schema for the view
*/
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);Usage Examples:
DataStream<Order> orderStream = env.addSource(new OrderSource());
// Create temporary view with automatic schema inference
tableEnv.createTemporaryView("orders", orderStream);
// Create view with explicit schema
Schema orderSchema = Schema.newBuilder()
.column("order_id", DataTypes.BIGINT())
.column("customer_id", DataTypes.BIGINT())
.column("amount", DataTypes.DECIMAL(10, 2))
.column("order_time", DataTypes.TIMESTAMP(3))
.watermark("order_time", $("order_time").minus(lit(5).seconds()))
.build();
tableEnv.createTemporaryView("orders_with_watermark", orderStream, orderSchema);
// Use in SQL queries
Table results = tableEnv.sqlQuery(
"SELECT customer_id, SUM(amount) as total_amount " +
"FROM orders_with_watermark " +
"GROUP BY customer_id"
);Creates statement sets for executing multiple streaming operations together.
/**
* Creates a StreamStatementSet for batch execution of multiple streaming operations
* @return StreamStatementSet for adding multiple statements
*/
StreamStatementSet createStatementSet();Usage Examples:
// Create statement set for batch execution
StreamStatementSet statementSet = tableEnv.createStatementSet();
// Add multiple insert statements
Table processedOrders = tableEnv.from("orders")
.filter($("status").isEqual("processed"));
statementSet.addInsert("processed_orders_sink", processedOrders);
Table failedOrders = tableEnv.from("orders")
.filter($("status").isEqual("failed"));
statementSet.addInsert("failed_orders_sink", failedOrders);
// Execute all statements together
TableResult result = statementSet.execute();
// Or add SQL statements
statementSet.addInsertSql(
"INSERT INTO daily_summary " +
"SELECT DATE(order_time) as order_date, COUNT(*) as total_orders " +
"FROM orders " +
"GROUP BY DATE(order_time)"
);Older methods for DataStream conversion that are now deprecated but still available for backward compatibility.
/**
* @deprecated Use fromDataStream() instead
* Registers a DataStream as a temporary table
*/
@Deprecated
<T> void registerDataStream(String name, DataStream<T> dataStream);
/**
* @deprecated Use toDataStream() instead
* Converts a Table to an append-only DataStream
*/
@Deprecated
<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
/**
* @deprecated Use toChangelogStream() instead
* Converts a Table to a retract DataStream
*/
@Deprecated
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);Explicit schema definitions for controlling DataStream to Table conversions.
class Schema {
/**
* Creates a new schema builder
* @return Builder for constructing schemas
*/
static Builder newBuilder();
interface Builder {
Builder column(String columnName, AbstractDataType<?> dataType);
Builder columnByExpression(String columnName, String expression);
Builder columnByMetadata(String columnName, AbstractDataType<?> dataType);
Builder columnByMetadata(String columnName, AbstractDataType<?> dataType, String metadataKey);
Builder watermark(String columnName, Expression watermarkExpression);
Builder primaryKey(String... columnNames);
Schema build();
}
}Usage Examples:
// Complex schema with computed columns and watermarks
Schema complexSchema = Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("event_time", DataTypes.TIMESTAMP(3))
.column("event_data", DataTypes.STRING())
.columnByExpression("hour_of_day", "EXTRACT(HOUR FROM event_time)")
.columnByMetadata("kafka_offset", DataTypes.BIGINT(), "offset")
.watermark("event_time", $("event_time").minus(lit(30).seconds()))
.primaryKey("user_id", "event_time")
.build();
// Apply schema during conversion
Table enrichedEvents = tableEnv.fromDataStream(eventStream, complexSchema);interface StreamTableEnvironment extends TableEnvironment {
// Inherits all TableEnvironment methods
// Plus DataStream integration methods listed above
}class TemporalTableFunction extends TableFunction<Row> {
// Used for temporal joins in streaming scenarios
// Automatically created by createTemporalTableFunction()
}class Row {
Object getField(int pos);
Object getField(String name);
int getArity();
RowKind getKind();
void setKind(RowKind kind);
}
enum RowKind {
INSERT, // +I: Insert operation
UPDATE_BEFORE, // -U: Update before (old value)
UPDATE_AFTER, // +U: Update after (new value)
DELETE // -D: Delete operation
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table