Table/SQL API bridge for Scala, enabling interaction between Table API and DataStream/DataSet APIs in Apache Flink
—
The StreamStatementSet provides functionality for batching multiple table operations together for optimized execution and resource management. It extends the base StatementSet with streaming-specific capabilities.
Create a statement set from a StreamTableEnvironment for batching operations.
// Created via StreamTableEnvironment
val statementSet = tableEnv.createStatementSet()Add table processing pipelines to the statement set for batch execution.
/**
* Add table pipeline to statement set
* @param tablePipeline The table pipeline to add
* @return This StreamStatementSet for method chaining
*/
def add(tablePipeline: TablePipeline): StreamStatementSetUsage Example:
import org.apache.flink.table.api._
val statementSet = tableEnv.createStatementSet()
// Create table pipelines
val pipeline1 = tableEnv.from("source_table_1").insertInto("sink_table_1")
val pipeline2 = tableEnv.from("source_table_2").insertInto("sink_table_2")
statementSet
.add(pipeline1)
.add(pipeline2)
.attachAsDataStream()Add SQL insert statements to the statement set.
/**
* Add insert SQL statement to statement set
* @param statement The SQL insert statement
* @return This StreamStatementSet for method chaining
*/
def addInsertSql(statement: String): StreamStatementSetUsage Example:
val statementSet = tableEnv.createStatementSet()
statementSet
.addInsertSql("INSERT INTO sink_table_1 SELECT * FROM source_table_1 WHERE age > 25")
.addInsertSql("INSERT INTO sink_table_2 SELECT name, COUNT(*) FROM source_table_2 GROUP BY name")
.attachAsDataStream()Add table insert operations with various configuration options.
/**
* Add insert operation with target path
* @param targetPath The target table path
* @param table The table to insert
* @return This StreamStatementSet for method chaining
*/
def addInsert(targetPath: String, table: Table): StreamStatementSet
/**
* Add insert operation with target path and overwrite option
* @param targetPath The target table path
* @param table The table to insert
* @param overwrite Whether to overwrite existing data
* @return This StreamStatementSet for method chaining
*/
def addInsert(targetPath: String, table: Table, overwrite: Boolean): StreamStatementSet
/**
* Add insert operation with table descriptor
* @param targetDescriptor The target table descriptor
* @param table The table to insert
* @return This StreamStatementSet for method chaining
*/
def addInsert(targetDescriptor: TableDescriptor, table: Table): StreamStatementSet
/**
* Add insert operation with table descriptor and overwrite option
* @param targetDescriptor The target table descriptor
* @param table The table to insert
* @param overwrite Whether to overwrite existing data
* @return This StreamStatementSet for method chaining
*/
def addInsert(targetDescriptor: TableDescriptor, table: Table, overwrite: Boolean): StreamStatementSetUsage Examples:
val statementSet = tableEnv.createStatementSet()
val processedTable1 = tableEnv.from("source_table_1").filter($"age" > 25)
val processedTable2 = tableEnv.from("source_table_2").groupBy($"name").select($"name", $"age".count())
// Insert with target path
statementSet
.addInsert("sink_table_1", processedTable1)
.addInsert("sink_table_2", processedTable2, true) // with overwrite
// Insert with table descriptor
val sinkDescriptor = TableDescriptor.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build())
.option("path", "/path/to/sink")
.format("csv")
.build()
statementSet
.addInsert(sinkDescriptor, processedTable1)
.attachAsDataStream()Methods for explaining execution plans and attaching to the streaming environment.
/**
* Print execution plan with optional extra details
* @param extraDetails Additional details to include in explanation
* @return This StreamStatementSet for method chaining
*/
def printExplain(extraDetails: ExplainDetail*): StreamStatementSet
/**
* Attach statements to underlying StreamExecutionEnvironment
* This triggers the actual execution setup
*/
def attachAsDataStream(): UnitUsage Examples:
val statementSet = tableEnv.createStatementSet()
statementSet
.addInsertSql("INSERT INTO sink SELECT * FROM source WHERE age > 25")
.printExplain(ExplainDetail.CHANGELOG_MODE, ExplainDetail.COST)
.attachAsDataStream()
// The execution plan will be printed before attachmentHere's a comprehensive example showing how to use StreamStatementSet for complex multi-table operations:
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// Register source tables
tableEnv.executeSql("""
CREATE TABLE users (
id INT,
name STRING,
age INT,
city STRING
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/users.csv',
'format' = 'csv'
)
""")
tableEnv.executeSql("""
CREATE TABLE orders (
order_id INT,
user_id INT,
amount DECIMAL(10,2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/orders.csv',
'format' = 'csv'
)
""")
// Create statement set for batch operations
val statementSet = tableEnv.createStatementSet()
// Add multiple insert operations
val youngUsers = tableEnv.sqlQuery("SELECT * FROM users WHERE age < 30")
val userStats = tableEnv.sqlQuery("""
SELECT
u.city,
COUNT(*) as user_count,
AVG(u.age) as avg_age
FROM users u
GROUP BY u.city
""")
val orderSummary = tableEnv.sqlQuery("""
SELECT
u.city,
COUNT(o.order_id) as order_count,
SUM(o.amount) as total_amount
FROM users u
JOIN orders o ON u.id = o.user_id
GROUP BY u.city
""")
statementSet
.addInsert("young_users_sink", youngUsers)
.addInsert("user_stats_sink", userStats)
.addInsert("order_summary_sink", orderSummary)
.addInsertSql("INSERT INTO audit_log SELECT 'batch_job' as job_type, CURRENT_TIMESTAMP as run_time")
.printExplain() // Print execution plan
.attachAsDataStream() // Execute all operations together
env.execute("Multi-table ETL Job")Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12