Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.
—
Batch execution optimization for multiple table operations with shared planning and execution. StreamStatementSet enables efficient execution of multiple INSERT statements and table pipelines by optimizing them together as a single job.
Create StreamStatementSet instances for batch execution of multiple operations.
/**
* StreamStatementSet that integrates with the Java DataStream API
* Accepts pipelines defined by DML statements or Table objects
* The planner can optimize all added statements together
*/
public interface StreamStatementSet extends StatementSet {
/**
* Add a table pipeline to the statement set
* @param tablePipeline Table pipeline to add
* @return This StreamStatementSet for method chaining
*/
StreamStatementSet add(TablePipeline tablePipeline);
/**
* Add an INSERT SQL statement to the statement set
* @param statement INSERT SQL statement string
* @return This StreamStatementSet for method chaining
*/
StreamStatementSet addInsertSql(String statement);
/**
* Add table insert operation to existing table
* @param targetPath Path to target table
* @param table Source table to insert
* @return This StreamStatementSet for method chaining
*/
StreamStatementSet addInsert(String targetPath, Table table);
/**
* Add table insert operation with overwrite option
* @param targetPath Path to target table
* @param table Source table to insert
* @param overwrite Whether to overwrite existing data
* @return This StreamStatementSet for method chaining
*/
StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);
/**
* Add table insert operation using table descriptor
* @param targetDescriptor Target table descriptor
* @param table Source table to insert
* @return This StreamStatementSet for method chaining
*/
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);
/**
* Add table insert operation using table descriptor with overwrite
* @param targetDescriptor Target table descriptor
* @param table Source table to insert
* @param overwrite Whether to overwrite existing data
* @return This StreamStatementSet for method chaining
*/
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);
// Inherited from StatementSet:
TableResult execute();
CompiledPlan compilePlan(); // @Experimental
String explain(ExplainDetail... extraDetails);
String explain(ExplainFormat format, ExplainDetail... extraDetails);
}Execute statement sets with different execution strategies.
/**
* Optimizes all statements as one entity and adds them as transformations
* to the underlying StreamExecutionEnvironment
* Use StreamExecutionEnvironment.execute() to execute them
* The added statements will be cleared after calling this method
*/
void attachAsDataStream();
/**
* Print execution plan explanation with optional details
* @param extraDetails Additional details to include in explanation
* @return This StreamStatementSet for method chaining
*/
StreamStatementSet printExplain(ExplainDetail... extraDetails);Usage Examples:
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.ExplainDetail;
// Create statement set
StreamStatementSet statementSet = tableEnv.createStatementSet();
// Add multiple operations
statementSet
.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE status = 'ACTIVE'")
.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE status = 'INACTIVE'")
.addInsert("sink3", tableEnv.sqlQuery("SELECT COUNT(*) FROM source"));
// Execute all statements together
statementSet.execute();
// Alternative: Attach to DataStream environment
StreamStatementSet asyncSet = tableEnv.createStatementSet();
asyncSet
.addInsertSql("INSERT INTO async_sink SELECT * FROM streaming_source")
.attachAsDataStream();
// Execute the underlying StreamExecutionEnvironment
env.execute("Batch Statement Set Job");Complex usage patterns for optimized batch execution.
// Multi-sink pattern with shared computation
StreamStatementSet multiSinkSet = tableEnv.createStatementSet();
// Shared intermediate computation
Table processedData = tableEnv.sqlQuery(
"SELECT " +
" user_id, " +
" product_id, " +
" quantity, " +
" price, " +
" quantity * price as total_amount, " +
" EXTRACT(HOUR FROM order_time) as order_hour " +
"FROM raw_orders " +
"WHERE quantity > 0 AND price > 0"
);
// Multiple sinks using shared computation
multiSinkSet
.addInsert("hourly_stats", tableEnv.sqlQuery(
"SELECT order_hour, COUNT(*), SUM(total_amount) " +
"FROM " + processedData + " " +
"GROUP BY order_hour"
))
.addInsert("product_stats", tableEnv.sqlQuery(
"SELECT product_id, COUNT(*), AVG(total_amount) " +
"FROM " + processedData + " " +
"GROUP BY product_id"
))
.addInsert("user_stats", tableEnv.sqlQuery(
"SELECT user_id, COUNT(*), SUM(total_amount) " +
"FROM " + processedData + " " +
"GROUP BY user_id"
));
// Execute with shared optimization
multiSinkSet.execute();Analyze and debug statement set execution plans.
import org.apache.flink.table.api.ExplainDetail;
// Create statement set with operations
StreamStatementSet debugSet = tableEnv.createStatementSet();
debugSet
.addInsertSql("INSERT INTO debug_sink1 SELECT * FROM source WHERE type = 'A'")
.addInsertSql("INSERT INTO debug_sink2 SELECT * FROM source WHERE type = 'B'");
// Print detailed execution plan
debugSet.printExplain(
ExplainDetail.CHANGELOG_MODE,
ExplainDetail.COST,
ExplainDetail.ESTIMATED_COST
);
// This will output the optimized execution plan showing:
// - Shared operators
// - Data flow between operations
// - Cost estimates
// - Changelog mode informationUnderstanding when statement sets provide performance improvements.
// Without statement set - separate optimizations
tableEnv.executeSql("INSERT INTO sink1 SELECT * FROM source WHERE region = 'US'");
tableEnv.executeSql("INSERT INTO sink2 SELECT * FROM source WHERE region = 'EU'");
tableEnv.executeSql("INSERT INTO sink3 SELECT * FROM source WHERE region = 'ASIA'");
// Result: 3 separate jobs, source read 3 times
// With statement set - shared optimization
StreamStatementSet optimizedSet = tableEnv.createStatementSet();
optimizedSet
.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE region = 'US'")
.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE region = 'EU'")
.addInsertSql("INSERT INTO sink3 SELECT * FROM source WHERE region = 'ASIA'")
.execute();
// Result: 1 job, source read once, shared filtering and routingConfigure statement sets for optimal resource usage.
// Large-scale batch processing with statement sets
StreamStatementSet batchProcessingSet = tableEnv.createStatementSet();
// Configure environment for batch optimization
env.setParallelism(16);
env.getConfig().setLatencyTrackingInterval(1000);
// Add multiple heavy computations
String[] regions = {"US", "EU", "ASIA", "AFRICA", "OCEANIA"};
for (String region : regions) {
batchProcessingSet.addInsertSql(
"INSERT INTO region_analytics_" + region.toLowerCase() + " " +
"SELECT " +
" date_trunc('day', order_time) as order_date, " +
" product_category, " +
" COUNT(*) as order_count, " +
" SUM(total_amount) as total_revenue, " +
" AVG(total_amount) as avg_order_value " +
"FROM orders " +
"WHERE region = '" + region + "' " +
"GROUP BY date_trunc('day', order_time), product_category"
);
}
// Execute with shared resource planning
batchProcessingSet.execute();Build statement sets dynamically based on runtime conditions.
public StreamStatementSet buildDynamicStatementSet(
StreamTableEnvironment tableEnv,
List<String> targetTables,
Map<String, String> filterConditions) {
StreamStatementSet dynamicSet = tableEnv.createStatementSet();
for (String targetTable : targetTables) {
String condition = filterConditions.get(targetTable);
String insertSql = String.format(
"INSERT INTO %s SELECT * FROM source_table WHERE %s",
targetTable, condition
);
dynamicSet.addInsertSql(insertSql);
}
return dynamicSet;
}
// Usage
List<String> tables = Arrays.asList("active_users", "inactive_users", "premium_users");
Map<String, String> conditions = Map.of(
"active_users", "last_login > CURRENT_TIMESTAMP - INTERVAL '30' DAY",
"inactive_users", "last_login <= CURRENT_TIMESTAMP - INTERVAL '30' DAY",
"premium_users", "subscription_type = 'PREMIUM'"
);
StreamStatementSet dynamicSet = buildDynamicStatementSet(tableEnv, tables, conditions);
dynamicSet.execute();Handle errors in batch statement execution.
try {
StreamStatementSet statementSet = tableEnv.createStatementSet();
statementSet
.addInsertSql("INSERT INTO sink1 SELECT * FROM source1")
.addInsertSql("INSERT INTO sink2 SELECT * FROM source2")
.addInsertSql("INSERT INTO sink3 SELECT * FROM source3");
// Execute and handle potential failures
TableResult result = statementSet.execute();
// Monitor execution
result.await(); // Wait for completion
System.out.println("Statement set executed successfully");
} catch (Exception e) {
System.err.println("Statement set execution failed: " + e.getMessage());
// Handle partial completion, rollback, or retry logic
}Monitor the progress of long-running statement sets.
StreamStatementSet monitoredSet = tableEnv.createStatementSet();
// Add operations
monitoredSet
.addInsertSql("INSERT INTO large_sink1 SELECT * FROM large_source")
.addInsertSql("INSERT INTO large_sink2 SELECT * FROM large_source");
// Execute asynchronously
CompletableFuture<TableResult> execution = CompletableFuture.supplyAsync(() -> {
try {
return monitoredSet.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// Monitor progress
execution.thenAccept(result -> {
System.out.println("Statement set completed successfully");
}).exceptionally(throwable -> {
System.err.println("Statement set failed: " + throwable.getMessage());
return null;
});Combine statement sets with DataStream operations.
// DataStream processing
DataStream<Row> preprocessedStream = env
.fromSource(kafkaSource, watermarkStrategy, "kafka-source")
.map(new PreprocessingFunction())
.filter(new QualityFilter());
// Convert to table for SQL processing
tableEnv.createTemporaryView("preprocessed_data", preprocessedStream);
// Use statement set for multiple SQL operations
StreamStatementSet hybridSet = tableEnv.createStatementSet();
hybridSet
.addInsertSql("INSERT INTO sql_sink1 SELECT * FROM preprocessed_data WHERE category = 'A'")
.addInsertSql("INSERT INTO sql_sink2 SELECT * FROM preprocessed_data WHERE category = 'B'")
.attachAsDataStream(); // Attach to existing StreamExecutionEnvironment
// Add additional DataStream operations
DataStream<String> postProcessed = tableEnv
.toDataStream(tableEnv.sqlQuery("SELECT * FROM preprocessed_data"))
.map(new PostProcessingFunction());
postProcessed.addSink(customSink);
// Execute entire pipeline
env.execute("Hybrid Stream-Table Processing");import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TablePipeline;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ExplainDetail;
import java.util.concurrent.CompletableFuture;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge