or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-management.mdcomplex-event-processing.mdcore-table-operations.mddatastream-integration.mdindex.mdsql-processing.mdtype-system.mduser-defined-functions.mdwindow-operations.md
tile.json

sql-processing.mddocs/

SQL and Query Processing

This document covers SQL DDL, DML, and query capabilities including SQL parsing, execution, and Hive compatibility features in Apache Flink Table Uber Blink.

SQL Execution

SQL Statement Execution

interface TableEnvironment {
    TableResult executeSql(String statement);
    Table sqlQuery(String query);
    StatementSet createStatementSet();
}

Usage:

// Execute DDL statements
tEnv.executeSql("CREATE DATABASE mydb");
tEnv.executeSql("USE mydb");

// Execute DML statements  
TableResult result = tEnv.executeSql("INSERT INTO target_table SELECT * FROM source_table");

// Execute queries
Table queryResult = tEnv.sqlQuery("SELECT user_id, COUNT(*) as cnt FROM clicks GROUP BY user_id");

Data Definition Language (DDL)

Database Operations

-- Create database
CREATE DATABASE [IF NOT EXISTS] db_name [COMMENT 'comment'] [WITH (key1=val1, key2=val2, ...)];

-- Drop database
DROP DATABASE [IF EXISTS] db_name [RESTRICT|CASCADE];

-- Show databases
SHOW DATABASES;

-- Use database
USE db_name;

Table Operations

-- Create table
CREATE TABLE [IF NOT EXISTS] table_name (
    column_name column_type [COMMENT 'comment'],
    [WATERMARK FOR rowtime_column AS watermark_strategy],
    [PRIMARY KEY (column_list) NOT ENFORCED]
) [COMMENT 'comment']
[PARTITIONED BY (partition_column_list)]
WITH (
    'connector' = 'connector_type',
    'option_key' = 'option_value'
);

-- Create table as select
CREATE TABLE table_name WITH ('connector' = '...') AS SELECT ...;

-- Drop table
DROP TABLE [IF EXISTS] table_name;

-- Show tables
SHOW TABLES;

-- Describe table
DESCRIBE table_name;
DESC table_name;

Usage:

// Create table with watermark
tEnv.executeSql(
    "CREATE TABLE events (" +
    "  user_id BIGINT," +
    "  event_time TIMESTAMP(3)," +
    "  event_type STRING," +
    "  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'events'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

View Operations

interface TableEnvironment {
    void createTemporaryView(String path, Table view);
    void createTemporaryView(String path, DataStream<?> dataStream);
    void dropTemporaryView(String path);
}

SQL:

-- Create view
CREATE [TEMPORARY] VIEW view_name AS SELECT ...;

-- Drop view
DROP [TEMPORARY] VIEW [IF EXISTS] view_name;

-- Show views
SHOW VIEWS;

Data Manipulation Language (DML)

Insert Operations

-- Insert values
INSERT INTO table_name VALUES (value1, value2, ...);

-- Insert from select
INSERT INTO target_table SELECT * FROM source_table WHERE condition;

-- Insert overwrite
INSERT OVERWRITE target_table SELECT * FROM source_table;

Update and Delete

-- Update (only supported for changelog streams)
UPDATE table_name SET column1 = value1 WHERE condition;

-- Delete (only supported for changelog streams)  
DELETE FROM table_name WHERE condition;

Query Language (DQL)

Basic Queries

SELECT column_list
FROM table_name
[WHERE condition]
[GROUP BY column_list]
[HAVING condition]
[ORDER BY column_list]
[LIMIT number];

Joins

-- Inner join
SELECT * FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id;

-- Outer joins
SELECT * FROM table1 t1 LEFT JOIN table2 t2 ON t1.id = t2.id;
SELECT * FROM table1 t1 RIGHT JOIN table2 t2 ON t1.id = t2.id;
SELECT * FROM table1 t1 FULL OUTER JOIN table2 t2 ON t1.id = t2.id;

-- Cross join
SELECT * FROM table1 CROSS JOIN table2;

Window Functions

Tumbling Windows

SELECT 
    TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
    TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end,
    user_id,
    COUNT(*) as event_count
FROM events
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), user_id;

Sliding Windows

SELECT 
    HOP_START(event_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) as window_start,
    HOP_END(event_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) as window_end,
    COUNT(*) as event_count
FROM events  
GROUP BY HOP(event_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR);

Session Windows

SELECT 
    SESSION_START(event_time, INTERVAL '30' MINUTE) as session_start,
    SESSION_END(event_time, INTERVAL '30' MINUTE) as session_end,
    user_id,
    COUNT(*) as event_count
FROM events
GROUP BY SESSION(event_time, INTERVAL '30' MINUTE), user_id;

SQL Parser Configuration

interface TableEnvironment {
    SqlParser createParser(String statement);
}

class SqlParser {
    List<SqlNode> parseStmtList(String sql);
    SqlNode parseStmt(String sql);
    SqlNode parseExpression(String sqlExpression);
}

enum SqlDialect {
    DEFAULT,
    HIVE
}

Configuration:

// Set SQL dialect
Configuration config = new Configuration();
config.setString("table.sql-dialect", "hive");
TableEnvironment tEnv = TableEnvironment.create(config);

// Or via table config
tEnv.getConfig().getConfiguration().setString("table.sql-dialect", "hive");

Hive Compatibility

Hive SQL Dialect

When using Hive dialect, Flink supports Hive-specific SQL syntax:

-- Hive-style CTAS with storage format
CREATE TABLE target_table
STORED AS PARQUET
LOCATION '/path/to/table'
AS SELECT * FROM source_table;

-- Hive functions
SELECT concat_ws('|', col1, col2) FROM table1;
SELECT get_json_object(json_col, '$.field') FROM table2;

Hive Catalog Integration

// Register Hive catalog
HiveCatalog hive = new HiveCatalog("myhive", "default", "/path/to/hive-conf");
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");

// Access Hive tables
Table hiveTable = tEnv.from("hive_database.hive_table");

Built-in Functions

Scalar Functions

-- String functions
SELECT UPPER(name), LOWER(email), LENGTH(description) FROM users;
SELECT CONCAT(first_name, ' ', last_name) as full_name FROM users;
SELECT SUBSTRING(text, 1, 10) FROM documents;

-- Math functions
SELECT ABS(balance), ROUND(price, 2), CEIL(rating) FROM products;

-- Date/Time functions
SELECT CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP;
SELECT EXTRACT(YEAR FROM order_date), DATE_FORMAT(created_at, 'yyyy-MM-dd') FROM orders;

-- Conditional functions
SELECT CASE WHEN age >= 18 THEN 'adult' ELSE 'minor' END FROM users;
SELECT COALESCE(nickname, first_name, 'Anonymous') FROM users;

Aggregate Functions

-- Basic aggregates
SELECT COUNT(*), SUM(amount), AVG(rating), MIN(price), MAX(price) FROM products;

-- Statistical functions
SELECT STDDEV_POP(score), VAR_SAMP(rating) FROM reviews;

-- Collection functions
SELECT COLLECT(tags), LISTAGG(name, ',') FROM items GROUP BY category;

Table Functions

-- String split
SELECT word FROM table1 CROSS JOIN UNNEST(SPLIT(description, ' ')) AS t(word);

-- JSON extraction
SELECT field_value FROM json_table CROSS JOIN UNNEST(JSON_EXTRACT_ARRAY(json_col)) AS t(field_value);

Error Handling

SQL-specific exceptions:

class SqlParserException extends TableException {
    SqlParserException(String message);
    SqlParserException(String message, Throwable cause);
}

class SqlExecutionException extends TableException;
class SqlValidationException extends ValidationException;

Configuration Options

class TableConfigOptions {
    public static final ConfigOption<String> TABLE_SQL_DIALECT;
    public static final ConfigOption<Duration> TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
    public static final ConfigOption<String> TABLE_LOCAL_TIME_ZONE;
}

Common configurations:

Configuration config = tEnv.getConfig().getConfiguration();

// Set SQL dialect
config.setString("table.sql-dialect", "default");

// Set local timezone
config.setString("table.local-time-zone", "UTC");

// Set default parallelism
config.setInteger("table.exec.resource.default-parallelism", 4);

Statement Sets

For batch execution of multiple statements:

interface StatementSet {
    StatementSet addInsertSql(String statement);
    StatementSet addInsert(String targetPath, Table table);
    StatementSet addInsert(String targetPath, Table table, boolean overwrite);
    TableResult execute();
    String explain();
}

Usage:

StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE type = 'A'");
stmtSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE type = 'B'");
stmtSet.execute();

Types

interface TableResult {
    ResultKind getResultKind();
    ResolvedSchema getResolvedSchema();
    CloseableIterator<Row> collect();
    void print();
    JobExecutionResult getJobExecutionResult();
}

enum ResultKind {
    SUCCESS,
    SUCCESS_WITH_CONTENT,
    NOT_READY
}

interface SqlNode;
interface SqlIdentifier extends SqlNode;
interface SqlCall extends SqlNode;