CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-bridge

Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.

Pending
Overview
Eval results
Files

statement-sets.mddocs/

Statement Sets

Batch execution optimization for multiple table operations with shared planning and execution. StreamStatementSet enables efficient execution of multiple INSERT statements and table pipelines by optimizing them together as a single job.

Capabilities

Statement Set Creation

Create StreamStatementSet instances for batch execution of multiple operations.

/**
 * StreamStatementSet that integrates with the Java DataStream API
 * Accepts pipelines defined by DML statements or Table objects
 * The planner can optimize all added statements together
 */
public interface StreamStatementSet extends StatementSet {
    
    /**
     * Add a table pipeline to the statement set
     * @param tablePipeline Table pipeline to add
     * @return This StreamStatementSet for method chaining
     */
    StreamStatementSet add(TablePipeline tablePipeline);
    
    /**
     * Add an INSERT SQL statement to the statement set
     * @param statement INSERT SQL statement string
     * @return This StreamStatementSet for method chaining
     */
    StreamStatementSet addInsertSql(String statement);
    
    /**
     * Add table insert operation to existing table
     * @param targetPath Path to target table
     * @param table Source table to insert
     * @return This StreamStatementSet for method chaining
     */
    StreamStatementSet addInsert(String targetPath, Table table);
    
    /**
     * Add table insert operation with overwrite option
     * @param targetPath Path to target table
     * @param table Source table to insert
     * @param overwrite Whether to overwrite existing data
     * @return This StreamStatementSet for method chaining
     */
    StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);
    
    /**
     * Add table insert operation using table descriptor
     * @param targetDescriptor Target table descriptor
     * @param table Source table to insert
     * @return This StreamStatementSet for method chaining
     */
    StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);
    
    /**
     * Add table insert operation using table descriptor with overwrite
     * @param targetDescriptor Target table descriptor
     * @param table Source table to insert
     * @param overwrite Whether to overwrite existing data
     * @return This StreamStatementSet for method chaining
     */
    StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);
    
    // Inherited from StatementSet:
    TableResult execute();
    CompiledPlan compilePlan(); // @Experimental
    String explain(ExplainDetail... extraDetails);
    String explain(ExplainFormat format, ExplainDetail... extraDetails);
}

Statement Set Execution

Execute statement sets with different execution strategies.

/**
 * Optimizes all statements as one entity and adds them as transformations 
 * to the underlying StreamExecutionEnvironment
 * Use StreamExecutionEnvironment.execute() to execute them
 * The added statements will be cleared after calling this method
 */
void attachAsDataStream();

/**
 * Print execution plan explanation with optional details
 * @param extraDetails Additional details to include in explanation
 * @return This StreamStatementSet for method chaining
 */
StreamStatementSet printExplain(ExplainDetail... extraDetails);

Usage Examples:

import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.ExplainDetail;

// Create statement set
StreamStatementSet statementSet = tableEnv.createStatementSet();

// Add multiple operations
statementSet
    .addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE status = 'ACTIVE'")
    .addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE status = 'INACTIVE'")
    .addInsert("sink3", tableEnv.sqlQuery("SELECT COUNT(*) FROM source"));

// Execute all statements together
statementSet.execute();

// Alternative: Attach to DataStream environment
StreamStatementSet asyncSet = tableEnv.createStatementSet();
asyncSet
    .addInsertSql("INSERT INTO async_sink SELECT * FROM streaming_source")
    .attachAsDataStream();

// Execute the underlying StreamExecutionEnvironment
env.execute("Batch Statement Set Job");

Advanced Statement Set Patterns

Complex usage patterns for optimized batch execution.

// Multi-sink pattern with shared computation
StreamStatementSet multiSinkSet = tableEnv.createStatementSet();

// Shared intermediate computation
Table processedData = tableEnv.sqlQuery(
    "SELECT " +
    "  user_id, " +
    "  product_id, " +
    "  quantity, " +
    "  price, " +
    "  quantity * price as total_amount, " +
    "  EXTRACT(HOUR FROM order_time) as order_hour " +
    "FROM raw_orders " +
    "WHERE quantity > 0 AND price > 0"
);

// Multiple sinks using shared computation
multiSinkSet
    .addInsert("hourly_stats", tableEnv.sqlQuery(
        "SELECT order_hour, COUNT(*), SUM(total_amount) " +
        "FROM " + processedData + " " +
        "GROUP BY order_hour"
    ))
    .addInsert("product_stats", tableEnv.sqlQuery(
        "SELECT product_id, COUNT(*), AVG(total_amount) " +
        "FROM " + processedData + " " +
        "GROUP BY product_id"
    ))
    .addInsert("user_stats", tableEnv.sqlQuery(
        "SELECT user_id, COUNT(*), SUM(total_amount) " +
        "FROM " + processedData + " " +
        "GROUP BY user_id"
    ));

// Execute with shared optimization
multiSinkSet.execute();

Execution Plan Analysis

Analyze and debug statement set execution plans.

import org.apache.flink.table.api.ExplainDetail;

// Create statement set with operations
StreamStatementSet debugSet = tableEnv.createStatementSet();
debugSet
    .addInsertSql("INSERT INTO debug_sink1 SELECT * FROM source WHERE type = 'A'")
    .addInsertSql("INSERT INTO debug_sink2 SELECT * FROM source WHERE type = 'B'");

// Print detailed execution plan
debugSet.printExplain(
    ExplainDetail.CHANGELOG_MODE,
    ExplainDetail.COST,
    ExplainDetail.ESTIMATED_COST
);

// This will output the optimized execution plan showing:
// - Shared operators
// - Data flow between operations  
// - Cost estimates
// - Changelog mode information

Performance Optimization

Batch Execution Benefits

Understanding when statement sets provide performance improvements.

// Without statement set - separate optimizations
tableEnv.executeSql("INSERT INTO sink1 SELECT * FROM source WHERE region = 'US'");
tableEnv.executeSql("INSERT INTO sink2 SELECT * FROM source WHERE region = 'EU'");
tableEnv.executeSql("INSERT INTO sink3 SELECT * FROM source WHERE region = 'ASIA'");
// Result: 3 separate jobs, source read 3 times

// With statement set - shared optimization
StreamStatementSet optimizedSet = tableEnv.createStatementSet();
optimizedSet
    .addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE region = 'US'")
    .addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE region = 'EU'")
    .addInsertSql("INSERT INTO sink3 SELECT * FROM source WHERE region = 'ASIA'")
    .execute();
// Result: 1 job, source read once, shared filtering and routing

Resource Optimization

Configure statement sets for optimal resource usage.

// Large-scale batch processing with statement sets
StreamStatementSet batchProcessingSet = tableEnv.createStatementSet();

// Configure environment for batch optimization
env.setParallelism(16);
env.getConfig().setLatencyTrackingInterval(1000);

// Add multiple heavy computations
String[] regions = {"US", "EU", "ASIA", "AFRICA", "OCEANIA"};
for (String region : regions) {
    batchProcessingSet.addInsertSql(
        "INSERT INTO region_analytics_" + region.toLowerCase() + " " +
        "SELECT " +
        "  date_trunc('day', order_time) as order_date, " +
        "  product_category, " +
        "  COUNT(*) as order_count, " +
        "  SUM(total_amount) as total_revenue, " +
        "  AVG(total_amount) as avg_order_value " +
        "FROM orders " +
        "WHERE region = '" + region + "' " +
        "GROUP BY date_trunc('day', order_time), product_category"
    );
}

// Execute with shared resource planning
batchProcessingSet.execute();

Dynamic Statement Building

Build statement sets dynamically based on runtime conditions.

public StreamStatementSet buildDynamicStatementSet(
        StreamTableEnvironment tableEnv,
        List<String> targetTables,
        Map<String, String> filterConditions) {
    
    StreamStatementSet dynamicSet = tableEnv.createStatementSet();
    
    for (String targetTable : targetTables) {
        String condition = filterConditions.get(targetTable);
        String insertSql = String.format(
            "INSERT INTO %s SELECT * FROM source_table WHERE %s",
            targetTable, condition
        );
        dynamicSet.addInsertSql(insertSql);
    }
    
    return dynamicSet;
}

// Usage
List<String> tables = Arrays.asList("active_users", "inactive_users", "premium_users");
Map<String, String> conditions = Map.of(
    "active_users", "last_login > CURRENT_TIMESTAMP - INTERVAL '30' DAY",
    "inactive_users", "last_login <= CURRENT_TIMESTAMP - INTERVAL '30' DAY",
    "premium_users", "subscription_type = 'PREMIUM'"
);

StreamStatementSet dynamicSet = buildDynamicStatementSet(tableEnv, tables, conditions);
dynamicSet.execute();

Error Handling and Monitoring

Statement Set Error Handling

Handle errors in batch statement execution.

try {
    StreamStatementSet statementSet = tableEnv.createStatementSet();
    statementSet
        .addInsertSql("INSERT INTO sink1 SELECT * FROM source1")
        .addInsertSql("INSERT INTO sink2 SELECT * FROM source2")
        .addInsertSql("INSERT INTO sink3 SELECT * FROM source3");
    
    // Execute and handle potential failures
    TableResult result = statementSet.execute();
    
    // Monitor execution
    result.await(); // Wait for completion
    System.out.println("Statement set executed successfully");
    
} catch (Exception e) {
    System.err.println("Statement set execution failed: " + e.getMessage());
    // Handle partial completion, rollback, or retry logic
}

Monitoring Statement Set Progress

Monitor the progress of long-running statement sets.

StreamStatementSet monitoredSet = tableEnv.createStatementSet();

// Add operations
monitoredSet
    .addInsertSql("INSERT INTO large_sink1 SELECT * FROM large_source")
    .addInsertSql("INSERT INTO large_sink2 SELECT * FROM large_source");

// Execute asynchronously
CompletableFuture<TableResult> execution = CompletableFuture.supplyAsync(() -> {
    try {
        return monitoredSet.execute();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
});

// Monitor progress
execution.thenAccept(result -> {
    System.out.println("Statement set completed successfully");
}).exceptionally(throwable -> {
    System.err.println("Statement set failed: " + throwable.getMessage());
    return null;
});

Integration with DataStream API

Hybrid Processing Patterns

Combine statement sets with DataStream operations.

// DataStream processing
DataStream<Row> preprocessedStream = env
    .fromSource(kafkaSource, watermarkStrategy, "kafka-source")
    .map(new PreprocessingFunction())
    .filter(new QualityFilter());

// Convert to table for SQL processing
tableEnv.createTemporaryView("preprocessed_data", preprocessedStream);

// Use statement set for multiple SQL operations
StreamStatementSet hybridSet = tableEnv.createStatementSet();
hybridSet
    .addInsertSql("INSERT INTO sql_sink1 SELECT * FROM preprocessed_data WHERE category = 'A'")
    .addInsertSql("INSERT INTO sql_sink2 SELECT * FROM preprocessed_data WHERE category = 'B'")
    .attachAsDataStream(); // Attach to existing StreamExecutionEnvironment

// Add additional DataStream operations
DataStream<String> postProcessed = tableEnv
    .toDataStream(tableEnv.sqlQuery("SELECT * FROM preprocessed_data"))
    .map(new PostProcessingFunction());

postProcessed.addSink(customSink);

// Execute entire pipeline
env.execute("Hybrid Stream-Table Processing");

Types

Statement Set Types

import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TablePipeline;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;

Execution and Monitoring Types

import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ExplainDetail;
import java.util.concurrent.CompletableFuture;

Environment Integration Types

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge

docs

builtin-connectors.md

changelog-processing.md

datastream-connectors.md

index.md

procedures.md

statement-sets.md

stream-table-environment.md

watermark-strategies.md

tile.json