CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table

Apache Flink's Table API and SQL module for unified stream and batch processing

Pending
Overview
Eval results
Files

sql-execution.mddocs/

SQL Execution

Flink's Table API provides comprehensive SQL execution capabilities supporting DDL, DML, and query operations with advanced features like statement batching, result streaming, and metadata access.

Capabilities

SQL Query Execution

Execute SQL queries and retrieve results as Table objects for further processing.

/**
 * Evaluates a SQL query on registered tables and returns the result as a Table
 * @param query SQL query string (SELECT, WITH, VALUES, etc.)
 * @return Table representing the query result
 */
Table sqlQuery(String query);

Usage Examples:

// Simple SELECT query
Table customers = tableEnv.sqlQuery("SELECT * FROM customers WHERE age > 25");

// Complex analytical query
Table salesAnalysis = tableEnv.sqlQuery(
    "WITH monthly_sales AS (" +
    "  SELECT " +
    "    EXTRACT(YEAR FROM order_date) as year," +
    "    EXTRACT(MONTH FROM order_date) as month," +
    "    product_category," +
    "    SUM(amount) as total_sales" +
    "  FROM orders " +
    "  WHERE order_date >= DATE '2024-01-01'" +
    "  GROUP BY EXTRACT(YEAR FROM order_date), EXTRACT(MONTH FROM order_date), product_category" +
    ") " +
    "SELECT " +
    "  year, month, product_category, total_sales," +
    "  LAG(total_sales) OVER (PARTITION BY product_category ORDER BY year, month) as prev_month_sales," +
    "  total_sales - LAG(total_sales) OVER (PARTITION BY product_category ORDER BY year, month) as growth" +
    "FROM monthly_sales " +
    "ORDER BY year, month, product_category"
);

// Window aggregation for streaming data
Table windowedEvents = tableEnv.sqlQuery(
    "SELECT " +
    "  user_id," +
    "  TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start," +
    "  TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end," +
    "  COUNT(*) as event_count," +
    "  SUM(event_value) as total_value" +
    "FROM user_events " +
    "GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' HOUR)"
);

SQL Statement Execution

Execute DDL, DML, and utility statements with execution results and metadata.

/**
 * Executes a single SQL statement and returns the execution result
 * @param statement SQL statement (CREATE, DROP, INSERT, DESCRIBE, etc.)
 * @return TableResult containing execution information and optional data
 */
TableResult executeSql(String statement);

Usage Examples:

// DDL - Create table
TableResult createResult = tableEnv.executeSql(
    "CREATE TABLE orders (" +
    "  order_id BIGINT," +
    "  customer_id BIGINT," +
    "  product_id BIGINT," +
    "  quantity INT," +
    "  unit_price DECIMAL(10, 2)," +
    "  order_time TIMESTAMP(3)," +
    "  WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'orders'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

// DDL - Create view
tableEnv.executeSql(
    "CREATE VIEW high_value_customers AS " +
    "SELECT customer_id, SUM(quantity * unit_price) as total_spent " +
    "FROM orders " +
    "GROUP BY customer_id " +
    "HAVING SUM(quantity * unit_price) > 1000"
);

// DML - Insert data
TableResult insertResult = tableEnv.executeSql(
    "INSERT INTO customer_summary " +
    "SELECT customer_id, COUNT(*) as order_count, SUM(quantity * unit_price) as total_spent " +
    "FROM orders " +
    "GROUP BY customer_id"
);

// Utility - Describe table
TableResult describeResult = tableEnv.executeSql("DESCRIBE orders");
describeResult.print();

// Utility - Show tables
TableResult showResult = tableEnv.executeSql("SHOW TABLES");

Statement Set Operations

Batch multiple statements for efficient execution and dependency management.

/**
 * Creates a StatementSet for batch execution of multiple statements
 * @return StatementSet instance for adding multiple operations
 */
StatementSet createStatementSet();

interface StatementSet {
    /**
     * Adds an INSERT SQL statement to the set
     * @param statement INSERT SQL statement
     * @return StatementSet for method chaining
     */
    StatementSet addInsertSql(String statement);
    
    /**
     * Adds a table insertion to the set
     * @param targetPath Target table path
     * @param table Source table to insert
     * @return StatementSet for method chaining
     */
    StatementSet addInsert(String targetPath, Table table);
    
    /**
     * Explains the execution plan for all statements
     * @return String representation of the execution plan
     */
    String explain();
    
    /**
     * Executes all statements in the set
     * @return TableResult with execution information
     */
    TableResult execute();
}

Usage Examples:

// Batch multiple related insertions
StatementSet stmtSet = tableEnv.createStatementSet();

// Add SQL insertions
stmtSet.addInsertSql(
    "INSERT INTO daily_sales " +
    "SELECT DATE(order_time) as sale_date, SUM(quantity * unit_price) as daily_total " +
    "FROM orders " +
    "WHERE DATE(order_time) = CURRENT_DATE " +
    "GROUP BY DATE(order_time)"
);

stmtSet.addInsertSql(
    "INSERT INTO product_sales " +
    "SELECT product_id, COUNT(*) as order_count, SUM(quantity) as total_quantity " +
    "FROM orders " +
    "WHERE DATE(order_time) = CURRENT_DATE " +
    "GROUP BY product_id"
);

// Add table insertions
Table customerMetrics = tableEnv.from("orders")
    .groupBy($("customer_id"))
    .select(
        $("customer_id"),
        $("order_id").count().as("order_count"),
        $("quantity").sum().as("total_items")
    );

stmtSet.addInsert("customer_metrics", customerMetrics);

// Execute all statements together
System.out.println(stmtSet.explain());
TableResult batchResult = stmtSet.execute();

Result Handling

Process and consume query results with various access patterns.

interface TableResult extends AutoCloseable {
    /**
     * Gets the result kind indicating success or content availability
     * @return ResultKind (SUCCESS, SUCCESS_WITH_CONTENT)
     */
    ResultKind getResultKind();
    
    /**
     * Gets the schema of the result
     * @return ResolvedSchema describing result structure
     */
    ResolvedSchema getResolvedSchema();
    
    /**
     * Collects all result rows as an iterator
     * @return CloseableIterator for consuming result rows
     */
    CloseableIterator<Row> collect();
    
    /**
     * Prints the result to standard output
     */
    void print();
    
    /**
     * Prints the result with row limit
     * @param maxNumRows Maximum number of rows to print
     */
    void print(int maxNumRows);
    
    /**
     * Gets the job client for the executed job (for streaming queries)
     * @return Optional JobClient for job monitoring
     */
    Optional<JobClient> getJobClient();
}

enum ResultKind {
    /** Statement executed successfully without result data */
    SUCCESS,
    /** Statement executed successfully with result data */
    SUCCESS_WITH_CONTENT
}

Usage Examples:

// Query execution with result processing
Table query = tableEnv.sqlQuery("SELECT customer_id, COUNT(*) as orders FROM orders GROUP BY customer_id");
TableResult result = query.execute();

// Check result type
if (result.getResultKind() == ResultKind.SUCCESS_WITH_CONTENT) {
    // Access schema information
    ResolvedSchema schema = result.getResolvedSchema();
    System.out.println("Columns: " + schema.getColumnNames());
    
    // Process results with iterator
    try (CloseableIterator<Row> iterator = result.collect()) {
        while (iterator.hasNext()) {
            Row row = iterator.next();
            Long customerId = row.getFieldAs(0);
            Long orderCount = row.getFieldAs(1);
            System.out.println("Customer " + customerId + " has " + orderCount + " orders");
        }
    }
}

// Simple result printing
tableEnv.sqlQuery("SELECT * FROM customers LIMIT 10").execute().print();

// Limited printing
tableEnv.sqlQuery("SELECT * FROM large_table").execute().print(100);

Advanced SQL Features

Support for advanced SQL constructs and streaming-specific features.

// Advanced SQL constructs examples - these are strings passed to sqlQuery() or executeSql()

Usage Examples:

// Common Table Expressions (CTE)
Table cteQuery = tableEnv.sqlQuery(
    "WITH RECURSIVE category_hierarchy AS (" +
    "  SELECT category_id, category_name, parent_id, 0 as level " +
    "  FROM categories WHERE parent_id IS NULL " +
    "  UNION ALL " +
    "  SELECT c.category_id, c.category_name, c.parent_id, ch.level + 1 " +
    "  FROM categories c " +
    "  JOIN category_hierarchy ch ON c.parent_id = ch.category_id" +
    ") " +
    "SELECT * FROM category_hierarchy"
);

// Window functions
Table windowQuery = tableEnv.sqlQuery(
    "SELECT " +
    "  customer_id, order_date, amount," +
    "  ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date) as order_sequence," +
    "  SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date " +
    "                   ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total," +
    "  AVG(amount) OVER (PARTITION BY customer_id ORDER BY order_date " +
    "                   ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) as moving_avg" +
    "FROM orders"
);

// Streaming-specific: Event time processing
Table streamingQuery = tableEnv.sqlQuery(
    "SELECT " +
    "  user_id," +
    "  HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) as window_start," +
    "  HOP_END(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) as window_end," +
    "  COUNT(*) as event_count," +
    "  MAX(event_value) as max_value" +
    "FROM events " +
    "GROUP BY user_id, HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)"
);

// Pattern matching with MATCH_RECOGNIZE
Table patternQuery = tableEnv.sqlQuery(
    "SELECT customer_id, start_time, end_time, total_amount " +
    "FROM orders " +
    "MATCH_RECOGNIZE (" +
    "  PARTITION BY customer_id " +
    "  ORDER BY order_time " +
    "  MEASURES " +
    "    FIRST(A.order_time) as start_time," +
    "    LAST(C.order_time) as end_time," +
    "    SUM(A.amount + B.amount + C.amount) as total_amount " +
    "  PATTERN (A B+ C) " +
    "  DEFINE " +
    "    A as A.amount > 100," +
    "    B as B.amount > A.amount," +
    "    C as C.amount < B.amount" +
    ")"
);

// JSON processing
Table jsonQuery = tableEnv.sqlQuery(
    "SELECT " +
    "  user_id," +
    "  JSON_EXTRACT(user_profile, '$.preferences.language') as preferred_language," +
    "  JSON_EXTRACT(user_profile, '$.address.country') as country," +
    "  JSON_QUERY(user_profile, '$.purchase_history[*].product_id') as purchased_products" +
    "FROM users " +
    "WHERE JSON_EXISTS(user_profile, '$.preferences')"
);

SQL Dialect Support

Configure SQL dialect for compatibility with different SQL engines.

/**
 * Gets the table configuration for SQL dialect settings
 * @return TableConfig for accessing dialect configuration
 */
TableConfig getConfig();

interface TableConfig {
    /**
     * Sets the SQL dialect for parsing
     * @param sqlDialect Dialect to use (DEFAULT, HIVE)
     */
    void setSqlDialect(SqlDialect sqlDialect);
    
    /**
     * Gets the current SQL dialect
     * @return Currently configured SQL dialect
     */
    SqlDialect getSqlDialect();
}

enum SqlDialect {
    /** Default Flink SQL dialect */
    DEFAULT,
    /** Hive-compatible SQL dialect */
    HIVE
}

Usage Examples:

// Switch to Hive dialect for compatibility
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

// Hive-specific SQL features
tableEnv.executeSql(
    "CREATE TABLE hive_table (" +
    "  id BIGINT," +
    "  name STRING," +
    "  partition_date STRING" +
    ") PARTITIONED BY (partition_date) " +
    "STORED AS PARQUET"
);

// Switch back to default Flink dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

Types

Result Types

interface TableResult extends AutoCloseable {
    ResultKind getResultKind();
    ResolvedSchema getResolvedSchema();
    CloseableIterator<Row> collect();
    void print();
    void print(int maxNumRows);
    Optional<JobClient> getJobClient();
    void close();
}

enum ResultKind {
    SUCCESS,
    SUCCESS_WITH_CONTENT
}

interface StatementSet {
    StatementSet addInsertSql(String statement);
    StatementSet addInsert(String targetPath, Table table);
    String explain();
    TableResult execute();
}

Row Access

class Row {
    Object getField(int pos);
    Object getField(String name);
    <T> T getFieldAs(int pos);
    <T> T getFieldAs(String name);
    int getArity();
    RowKind getKind();
    
    // Factory methods
    static Row of(Object... values);
    static Row withNames(RowKind kind);
    static Row withPositions(RowKind kind);
}

enum RowKind {
    INSERT,        // +I
    UPDATE_BEFORE, // -U
    UPDATE_AFTER,  // +U  
    DELETE         // -D
}

Iterator Support

interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
    boolean hasNext();
    T next();
    void close();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table

docs

catalog-system.md

datastream-integration.md

index.md

sql-execution.md

table-environment.md

table-operations.md

type-system.md

user-defined-functions.md

tile.json