This document covers SQL DDL, DML, and query capabilities including SQL parsing, execution, and Hive compatibility features in Apache Flink Table Uber Blink.
interface TableEnvironment {
TableResult executeSql(String statement);
Table sqlQuery(String query);
StatementSet createStatementSet();
}Usage:
// Execute DDL statements
tEnv.executeSql("CREATE DATABASE mydb");
tEnv.executeSql("USE mydb");
// Execute DML statements
TableResult result = tEnv.executeSql("INSERT INTO target_table SELECT * FROM source_table");
// Execute queries
Table queryResult = tEnv.sqlQuery("SELECT user_id, COUNT(*) as cnt FROM clicks GROUP BY user_id");-- Create database
CREATE DATABASE [IF NOT EXISTS] db_name [COMMENT 'comment'] [WITH (key1=val1, key2=val2, ...)];
-- Drop database
DROP DATABASE [IF EXISTS] db_name [RESTRICT|CASCADE];
-- Show databases
SHOW DATABASES;
-- Use database
USE db_name;-- Create table
CREATE TABLE [IF NOT EXISTS] table_name (
column_name column_type [COMMENT 'comment'],
[WATERMARK FOR rowtime_column AS watermark_strategy],
[PRIMARY KEY (column_list) NOT ENFORCED]
) [COMMENT 'comment']
[PARTITIONED BY (partition_column_list)]
WITH (
'connector' = 'connector_type',
'option_key' = 'option_value'
);
-- Create table as select
CREATE TABLE table_name WITH ('connector' = '...') AS SELECT ...;
-- Drop table
DROP TABLE [IF EXISTS] table_name;
-- Show tables
SHOW TABLES;
-- Describe table
DESCRIBE table_name;
DESC table_name;Usage:
// Create table with watermark
tEnv.executeSql(
"CREATE TABLE events (" +
" user_id BIGINT," +
" event_time TIMESTAMP(3)," +
" event_type STRING," +
" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'events'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);interface TableEnvironment {
void createTemporaryView(String path, Table view);
void createTemporaryView(String path, DataStream<?> dataStream);
void dropTemporaryView(String path);
}SQL:
-- Create view
CREATE [TEMPORARY] VIEW view_name AS SELECT ...;
-- Drop view
DROP [TEMPORARY] VIEW [IF EXISTS] view_name;
-- Show views
SHOW VIEWS;-- Insert values
INSERT INTO table_name VALUES (value1, value2, ...);
-- Insert from select
INSERT INTO target_table SELECT * FROM source_table WHERE condition;
-- Insert overwrite
INSERT OVERWRITE target_table SELECT * FROM source_table;-- Update (only supported for changelog streams)
UPDATE table_name SET column1 = value1 WHERE condition;
-- Delete (only supported for changelog streams)
DELETE FROM table_name WHERE condition;SELECT column_list
FROM table_name
[WHERE condition]
[GROUP BY column_list]
[HAVING condition]
[ORDER BY column_list]
[LIMIT number];-- Inner join
SELECT * FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id;
-- Outer joins
SELECT * FROM table1 t1 LEFT JOIN table2 t2 ON t1.id = t2.id;
SELECT * FROM table1 t1 RIGHT JOIN table2 t2 ON t1.id = t2.id;
SELECT * FROM table1 t1 FULL OUTER JOIN table2 t2 ON t1.id = t2.id;
-- Cross join
SELECT * FROM table1 CROSS JOIN table2;SELECT
TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end,
user_id,
COUNT(*) as event_count
FROM events
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), user_id;SELECT
HOP_START(event_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) as window_start,
HOP_END(event_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) as window_end,
COUNT(*) as event_count
FROM events
GROUP BY HOP(event_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR);SELECT
SESSION_START(event_time, INTERVAL '30' MINUTE) as session_start,
SESSION_END(event_time, INTERVAL '30' MINUTE) as session_end,
user_id,
COUNT(*) as event_count
FROM events
GROUP BY SESSION(event_time, INTERVAL '30' MINUTE), user_id;interface TableEnvironment {
SqlParser createParser(String statement);
}
class SqlParser {
List<SqlNode> parseStmtList(String sql);
SqlNode parseStmt(String sql);
SqlNode parseExpression(String sqlExpression);
}
enum SqlDialect {
DEFAULT,
HIVE
}Configuration:
// Set SQL dialect
Configuration config = new Configuration();
config.setString("table.sql-dialect", "hive");
TableEnvironment tEnv = TableEnvironment.create(config);
// Or via table config
tEnv.getConfig().getConfiguration().setString("table.sql-dialect", "hive");When using Hive dialect, Flink supports Hive-specific SQL syntax:
-- Hive-style CTAS with storage format
CREATE TABLE target_table
STORED AS PARQUET
LOCATION '/path/to/table'
AS SELECT * FROM source_table;
-- Hive functions
SELECT concat_ws('|', col1, col2) FROM table1;
SELECT get_json_object(json_col, '$.field') FROM table2;// Register Hive catalog
HiveCatalog hive = new HiveCatalog("myhive", "default", "/path/to/hive-conf");
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
// Access Hive tables
Table hiveTable = tEnv.from("hive_database.hive_table");-- String functions
SELECT UPPER(name), LOWER(email), LENGTH(description) FROM users;
SELECT CONCAT(first_name, ' ', last_name) as full_name FROM users;
SELECT SUBSTRING(text, 1, 10) FROM documents;
-- Math functions
SELECT ABS(balance), ROUND(price, 2), CEIL(rating) FROM products;
-- Date/Time functions
SELECT CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP;
SELECT EXTRACT(YEAR FROM order_date), DATE_FORMAT(created_at, 'yyyy-MM-dd') FROM orders;
-- Conditional functions
SELECT CASE WHEN age >= 18 THEN 'adult' ELSE 'minor' END FROM users;
SELECT COALESCE(nickname, first_name, 'Anonymous') FROM users;-- Basic aggregates
SELECT COUNT(*), SUM(amount), AVG(rating), MIN(price), MAX(price) FROM products;
-- Statistical functions
SELECT STDDEV_POP(score), VAR_SAMP(rating) FROM reviews;
-- Collection functions
SELECT COLLECT(tags), LISTAGG(name, ',') FROM items GROUP BY category;-- String split
SELECT word FROM table1 CROSS JOIN UNNEST(SPLIT(description, ' ')) AS t(word);
-- JSON extraction
SELECT field_value FROM json_table CROSS JOIN UNNEST(JSON_EXTRACT_ARRAY(json_col)) AS t(field_value);SQL-specific exceptions:
class SqlParserException extends TableException {
SqlParserException(String message);
SqlParserException(String message, Throwable cause);
}
class SqlExecutionException extends TableException;
class SqlValidationException extends ValidationException;class TableConfigOptions {
public static final ConfigOption<String> TABLE_SQL_DIALECT;
public static final ConfigOption<Duration> TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
public static final ConfigOption<String> TABLE_LOCAL_TIME_ZONE;
}Common configurations:
Configuration config = tEnv.getConfig().getConfiguration();
// Set SQL dialect
config.setString("table.sql-dialect", "default");
// Set local timezone
config.setString("table.local-time-zone", "UTC");
// Set default parallelism
config.setInteger("table.exec.resource.default-parallelism", 4);For batch execution of multiple statements:
interface StatementSet {
StatementSet addInsertSql(String statement);
StatementSet addInsert(String targetPath, Table table);
StatementSet addInsert(String targetPath, Table table, boolean overwrite);
TableResult execute();
String explain();
}Usage:
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE type = 'A'");
stmtSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE type = 'B'");
stmtSet.execute();interface TableResult {
ResultKind getResultKind();
ResolvedSchema getResolvedSchema();
CloseableIterator<Row> collect();
void print();
JobExecutionResult getJobExecutionResult();
}
enum ResultKind {
SUCCESS,
SUCCESS_WITH_CONTENT,
NOT_READY
}
interface SqlNode;
interface SqlIdentifier extends SqlNode;
interface SqlCall extends SqlNode;