CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12

Table/SQL API bridge for Scala, enabling interaction between Table API and DataStream/DataSet APIs in Apache Flink

Pending
Overview
Eval results
Files

statement-sets.mddocs/

Statement Sets

The StreamStatementSet provides functionality for batching multiple table operations together for optimized execution and resource management. It extends the base StatementSet with streaming-specific capabilities.

Capabilities

Statement Set Creation

Create a statement set from a StreamTableEnvironment for batching operations.

// Created via StreamTableEnvironment
val statementSet = tableEnv.createStatementSet()

Table Pipeline Management

Add table processing pipelines to the statement set for batch execution.

/**
 * Add table pipeline to statement set
 * @param tablePipeline The table pipeline to add
 * @return This StreamStatementSet for method chaining
 */
def add(tablePipeline: TablePipeline): StreamStatementSet

Usage Example:

import org.apache.flink.table.api._

val statementSet = tableEnv.createStatementSet()

// Create table pipelines
val pipeline1 = tableEnv.from("source_table_1").insertInto("sink_table_1")
val pipeline2 = tableEnv.from("source_table_2").insertInto("sink_table_2")

statementSet
  .add(pipeline1)
  .add(pipeline2)
  .attachAsDataStream()

SQL Statement Management

Add SQL insert statements to the statement set.

/**
 * Add insert SQL statement to statement set
 * @param statement The SQL insert statement
 * @return This StreamStatementSet for method chaining
 */
def addInsertSql(statement: String): StreamStatementSet

Usage Example:

val statementSet = tableEnv.createStatementSet()

statementSet
  .addInsertSql("INSERT INTO sink_table_1 SELECT * FROM source_table_1 WHERE age > 25")
  .addInsertSql("INSERT INTO sink_table_2 SELECT name, COUNT(*) FROM source_table_2 GROUP BY name")
  .attachAsDataStream()

Table Insert Operations

Add table insert operations with various configuration options.

/**
 * Add insert operation with target path
 * @param targetPath The target table path
 * @param table The table to insert
 * @return This StreamStatementSet for method chaining
 */
def addInsert(targetPath: String, table: Table): StreamStatementSet

/**
 * Add insert operation with target path and overwrite option
 * @param targetPath The target table path  
 * @param table The table to insert
 * @param overwrite Whether to overwrite existing data
 * @return This StreamStatementSet for method chaining
 */
def addInsert(targetPath: String, table: Table, overwrite: Boolean): StreamStatementSet

/**
 * Add insert operation with table descriptor
 * @param targetDescriptor The target table descriptor
 * @param table The table to insert
 * @return This StreamStatementSet for method chaining
 */
def addInsert(targetDescriptor: TableDescriptor, table: Table): StreamStatementSet

/**
 * Add insert operation with table descriptor and overwrite option
 * @param targetDescriptor The target table descriptor
 * @param table The table to insert  
 * @param overwrite Whether to overwrite existing data
 * @return This StreamStatementSet for method chaining
 */
def addInsert(targetDescriptor: TableDescriptor, table: Table, overwrite: Boolean): StreamStatementSet

Usage Examples:

val statementSet = tableEnv.createStatementSet()
val processedTable1 = tableEnv.from("source_table_1").filter($"age" > 25)
val processedTable2 = tableEnv.from("source_table_2").groupBy($"name").select($"name", $"age".count())

// Insert with target path
statementSet
  .addInsert("sink_table_1", processedTable1)
  .addInsert("sink_table_2", processedTable2, true) // with overwrite
  
// Insert with table descriptor
val sinkDescriptor = TableDescriptor.forConnector("filesystem")
  .schema(Schema.newBuilder()
    .column("name", DataTypes.STRING())
    .column("age", DataTypes.INT())
    .build())
  .option("path", "/path/to/sink")
  .format("csv")
  .build()

statementSet
  .addInsert(sinkDescriptor, processedTable1)
  .attachAsDataStream()

Execution and Debugging

Methods for explaining execution plans and attaching to the streaming environment.

/**
 * Print execution plan with optional extra details
 * @param extraDetails Additional details to include in explanation
 * @return This StreamStatementSet for method chaining
 */
def printExplain(extraDetails: ExplainDetail*): StreamStatementSet

/**
 * Attach statements to underlying StreamExecutionEnvironment
 * This triggers the actual execution setup
 */
def attachAsDataStream(): Unit

Usage Examples:

val statementSet = tableEnv.createStatementSet()

statementSet
  .addInsertSql("INSERT INTO sink SELECT * FROM source WHERE age > 25")
  .printExplain(ExplainDetail.CHANGELOG_MODE, ExplainDetail.COST)
  .attachAsDataStream()

// The execution plan will be printed before attachment

Complete Usage Example

Here's a comprehensive example showing how to use StreamStatementSet for complex multi-table operations:

import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// Register source tables
tableEnv.executeSql("""
  CREATE TABLE users (
    id INT,
    name STRING,
    age INT,
    city STRING
  ) WITH (
    'connector' = 'filesystem',
    'path' = '/path/to/users.csv',
    'format' = 'csv'
  )
""")

tableEnv.executeSql("""
  CREATE TABLE orders (
    order_id INT,
    user_id INT,
    amount DECIMAL(10,2),
    order_time TIMESTAMP(3)
  ) WITH (
    'connector' = 'filesystem', 
    'path' = '/path/to/orders.csv',
    'format' = 'csv'
  )
""")

// Create statement set for batch operations
val statementSet = tableEnv.createStatementSet()

// Add multiple insert operations
val youngUsers = tableEnv.sqlQuery("SELECT * FROM users WHERE age < 30")
val userStats = tableEnv.sqlQuery("""
  SELECT 
    u.city,
    COUNT(*) as user_count,
    AVG(u.age) as avg_age
  FROM users u 
  GROUP BY u.city
""")

val orderSummary = tableEnv.sqlQuery("""
  SELECT 
    u.city,
    COUNT(o.order_id) as order_count,
    SUM(o.amount) as total_amount
  FROM users u
  JOIN orders o ON u.id = o.user_id
  GROUP BY u.city
""")

statementSet
  .addInsert("young_users_sink", youngUsers)  
  .addInsert("user_stats_sink", userStats)
  .addInsert("order_summary_sink", orderSummary)
  .addInsertSql("INSERT INTO audit_log SELECT 'batch_job' as job_type, CURRENT_TIMESTAMP as run_time")
  .printExplain() // Print execution plan
  .attachAsDataStream() // Execute all operations together

env.execute("Multi-table ETL Job")

Benefits of Statement Sets

Performance Optimization

  • Shared computation: Common subqueries are computed once and reused
  • Optimized execution plans: The planner can optimize across multiple statements
  • Resource sharing: Efficient use of parallelism and memory

Operational Benefits

  • Atomic execution: All statements succeed or fail together
  • Simplified debugging: Single execution plan for multiple operations
  • Better resource management: Coordinated scheduling of related operations

Best Practices

  • Group related operations: Batch operations that work on similar data
  • Use for ETL pipelines: Ideal for multi-sink data processing jobs
  • Consider data freshness: All operations execute with the same timestamp
  • Monitor resource usage: Large statement sets may require more memory

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12

docs

datastream-conversions.md

implicit-conversions.md

index.md

statement-sets.md

stream-table-environment.md

table-conversions.md

tile.json