Bridge component for Apache Flink's Table/SQL API that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs.
—
The StreamTableEnvironment is the main entry point for creating and managing table environments that integrate with Flink's DataStream API. It provides factory methods for creating table environments and comprehensive methods for converting between DataStream and Table representations.
@PublicEvolving
public interface StreamTableEnvironment extends TableEnvironment {
// Factory methods
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
// DataStream to Table conversions
<T> Table fromDataStream(DataStream<T> dataStream);
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
Table fromChangelogStream(DataStream<Row> dataStream);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
// Table to DataStream conversions
DataStream<Row> toDataStream(Table table);
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
DataStream<Row> toChangelogStream(Table table);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
// Temporary view creation
<T> void createTemporaryView(String path, DataStream<T> dataStream);
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
// Statement set creation
StreamStatementSet createStatementSet();
}Create a StreamTableEnvironment from an existing StreamExecutionEnvironment:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Create a StreamTableEnvironment with specific configuration settings:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);Convert a DataStream to a Table using automatic schema inference:
DataStream<MyPojo> dataStream = env.fromElements(
new MyPojo("Alice", 25),
new MyPojo("Bob", 30)
);
Table table = tableEnv.fromDataStream(dataStream);Convert a DataStream to a Table with a custom schema definition:
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 25),
Row.of("Bob", 30)
);
Schema schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
Table table = tableEnv.fromDataStream(dataStream, schema);Convert a changelog DataStream to a Table:
DataStream<Row> changelogStream = // ... source of changelog data
Table table = tableEnv.fromChangelogStream(changelogStream);
// With custom schema
Schema schema = Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("value", DataTypes.DOUBLE())
.build();
Table tableWithSchema = tableEnv.fromChangelogStream(changelogStream, schema);
// With changelog mode specification
ChangelogMode changelogMode = ChangelogMode.insertOnly();
Table tableWithMode = tableEnv.fromChangelogStream(changelogStream, schema, changelogMode);Convert a Table to a DataStream<Row>:
Table table = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 21");
DataStream<Row> resultStream = tableEnv.toDataStream(table);Convert a Table to a DataStream of a specific class:
Table table = tableEnv.sqlQuery("SELECT name, age FROM users");
DataStream<MyPojo> pojoStream = tableEnv.toDataStream(table, MyPojo.class);Convert a Table to a DataStream using a specific data type:
Table table = tableEnv.sqlQuery("SELECT name, age FROM users");
AbstractDataType<?> dataType = DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT())
);
DataStream<Row> typedStream = tableEnv.toDataStream(table, dataType);Convert a Table to a changelog DataStream:
Table table = tableEnv.sqlQuery("SELECT id, name, COUNT(*) as cnt FROM events GROUP BY id, name");
DataStream<Row> changelogStream = tableEnv.toChangelogStream(table);
// With target schema
Schema targetSchema = Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("cnt", DataTypes.BIGINT())
.build();
DataStream<Row> changelogWithSchema = tableEnv.toChangelogStream(table, targetSchema);
// With changelog mode
ChangelogMode mode = ChangelogMode.upsert();
DataStream<Row> upsertChangelog = tableEnv.toChangelogStream(table, targetSchema, mode);Create a temporary view from a DataStream:
DataStream<MyPojo> dataStream = env.fromElements(
new MyPojo("Alice", 25),
new MyPojo("Bob", 30)
);
tableEnv.createTemporaryView("users", dataStream);
// Now you can query the view
Table result = tableEnv.sqlQuery("SELECT * FROM users WHERE age > 25");Create a temporary view with a custom schema:
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 25),
Row.of("Bob", 30)
);
Schema schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
tableEnv.createTemporaryView("users_with_schema", dataStream, schema);
Table result = tableEnv.sqlQuery("SELECT name FROM users_with_schema WHERE age > 25");Create a StreamStatementSet for batch operations:
StreamStatementSet statementSet = tableEnv.createStatementSet();
// Add multiple insert operations
statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE condition1");
statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE condition2");
// Execute all statements together
statementSet.attachAsDataStream();The following methods are deprecated and should be avoided in new code:
registerFunction() - Use createFunction() insteadfromDataStream(DataStream, String) - Use fromDataStream(DataStream, Schema) insteadregisterDataStream() - Use createTemporaryView() insteadtoAppendStream() - Use toDataStream() insteadtoRetractStream() - Use toChangelogStream() insteadexecute() - Use StreamExecutionEnvironment.execute() insteadCommon exceptions that may be thrown:
ValidationException - When schema validation fails during conversionTableException - When table operations failIllegalArgumentException - When invalid parameters are providedAlways ensure proper error handling when working with conversions:
try {
Table table = tableEnv.fromDataStream(dataStream, schema);
DataStream<Row> result = tableEnv.toDataStream(table);
} catch (ValidationException e) {
// Handle schema validation errors
log.error("Schema validation failed: " + e.getMessage());
} catch (TableException e) {
// Handle table operation errors
log.error("Table operation failed: " + e.getMessage());
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11