0
# SQL and Query Processing
1
2
This document covers SQL DDL, DML, and query capabilities including SQL parsing, execution, and Hive compatibility features in Apache Flink Table Uber Blink.
3
4
## SQL Execution
5
6
### SQL Statement Execution
7
8
```java { .api }
9
interface TableEnvironment {
10
TableResult executeSql(String statement);
11
Table sqlQuery(String query);
12
StatementSet createStatementSet();
13
}
14
```
15
16
**Usage:**
17
18
```java
19
// Execute DDL statements
20
tEnv.executeSql("CREATE DATABASE mydb");
21
tEnv.executeSql("USE mydb");
22
23
// Execute DML statements
24
TableResult result = tEnv.executeSql("INSERT INTO target_table SELECT * FROM source_table");
25
26
// Execute queries
27
Table queryResult = tEnv.sqlQuery("SELECT user_id, COUNT(*) as cnt FROM clicks GROUP BY user_id");
28
```
29
30
## Data Definition Language (DDL)
31
32
### Database Operations
33
34
```sql
35
-- Create database
36
CREATE DATABASE [IF NOT EXISTS] db_name [COMMENT 'comment'] [WITH (key1=val1, key2=val2, ...)];
37
38
-- Drop database
39
DROP DATABASE [IF EXISTS] db_name [RESTRICT|CASCADE];
40
41
-- Show databases
42
SHOW DATABASES;
43
44
-- Use database
45
USE db_name;
46
```
47
48
### Table Operations
49
50
```sql
51
-- Create table
52
CREATE TABLE [IF NOT EXISTS] table_name (
53
column_name column_type [COMMENT 'comment'],
54
[WATERMARK FOR rowtime_column AS watermark_strategy],
55
[PRIMARY KEY (column_list) NOT ENFORCED]
56
) [COMMENT 'comment']
57
[PARTITIONED BY (partition_column_list)]
58
WITH (
59
'connector' = 'connector_type',
60
'option_key' = 'option_value'
61
);
62
63
-- Create table as select
64
CREATE TABLE table_name WITH ('connector' = '...') AS SELECT ...;
65
66
-- Drop table
67
DROP TABLE [IF EXISTS] table_name;
68
69
-- Show tables
70
SHOW TABLES;
71
72
-- Describe table
73
DESCRIBE table_name;
74
DESC table_name;
75
```
76
77
**Usage:**
78
79
```java
80
// Create table with watermark
81
tEnv.executeSql(
82
"CREATE TABLE events (" +
83
" user_id BIGINT," +
84
" event_time TIMESTAMP(3)," +
85
" event_type STRING," +
86
" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
87
") WITH (" +
88
" 'connector' = 'kafka'," +
89
" 'topic' = 'events'," +
90
" 'properties.bootstrap.servers' = 'localhost:9092'," +
91
" 'format' = 'json'" +
92
")"
93
);
94
```
95
96
### View Operations
97
98
```java { .api }
99
interface TableEnvironment {
100
void createTemporaryView(String path, Table view);
101
void createTemporaryView(String path, DataStream<?> dataStream);
102
void dropTemporaryView(String path);
103
}
104
```
105
106
**SQL:**
107
108
```sql
109
-- Create view
110
CREATE [TEMPORARY] VIEW view_name AS SELECT ...;
111
112
-- Drop view
113
DROP [TEMPORARY] VIEW [IF EXISTS] view_name;
114
115
-- Show views
116
SHOW VIEWS;
117
```
118
119
## Data Manipulation Language (DML)
120
121
### Insert Operations
122
123
```sql
124
-- Insert values
125
INSERT INTO table_name VALUES (value1, value2, ...);
126
127
-- Insert from select
128
INSERT INTO target_table SELECT * FROM source_table WHERE condition;
129
130
-- Insert overwrite
131
INSERT OVERWRITE target_table SELECT * FROM source_table;
132
```
133
134
### Update and Delete
135
136
```sql
137
-- Update (only supported for changelog streams)
138
UPDATE table_name SET column1 = value1 WHERE condition;
139
140
-- Delete (only supported for changelog streams)
141
DELETE FROM table_name WHERE condition;
142
```
143
144
## Query Language (DQL)
145
146
### Basic Queries
147
148
```sql
149
SELECT column_list
150
FROM table_name
151
[WHERE condition]
152
[GROUP BY column_list]
153
[HAVING condition]
154
[ORDER BY column_list]
155
[LIMIT number];
156
```
157
158
### Joins
159
160
```sql
161
-- Inner join
162
SELECT * FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id;
163
164
-- Outer joins
165
SELECT * FROM table1 t1 LEFT JOIN table2 t2 ON t1.id = t2.id;
166
SELECT * FROM table1 t1 RIGHT JOIN table2 t2 ON t1.id = t2.id;
167
SELECT * FROM table1 t1 FULL OUTER JOIN table2 t2 ON t1.id = t2.id;
168
169
-- Cross join
170
SELECT * FROM table1 CROSS JOIN table2;
171
```
172
173
### Window Functions
174
175
#### Tumbling Windows
176
177
```sql
178
SELECT
179
TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
180
TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end,
181
user_id,
182
COUNT(*) as event_count
183
FROM events
184
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), user_id;
185
```
186
187
#### Sliding Windows
188
189
```sql
190
SELECT
191
HOP_START(event_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) as window_start,
192
HOP_END(event_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) as window_end,
193
COUNT(*) as event_count
194
FROM events
195
GROUP BY HOP(event_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR);
196
```
197
198
#### Session Windows
199
200
```sql
201
SELECT
202
SESSION_START(event_time, INTERVAL '30' MINUTE) as session_start,
203
SESSION_END(event_time, INTERVAL '30' MINUTE) as session_end,
204
user_id,
205
COUNT(*) as event_count
206
FROM events
207
GROUP BY SESSION(event_time, INTERVAL '30' MINUTE), user_id;
208
```
209
210
## SQL Parser Configuration
211
212
```java { .api }
213
interface TableEnvironment {
214
SqlParser createParser(String statement);
215
}
216
217
class SqlParser {
218
List<SqlNode> parseStmtList(String sql);
219
SqlNode parseStmt(String sql);
220
SqlNode parseExpression(String sqlExpression);
221
}
222
223
enum SqlDialect {
224
DEFAULT,
225
HIVE
226
}
227
```
228
229
**Configuration:**
230
231
```java
232
// Set SQL dialect
233
Configuration config = new Configuration();
234
config.setString("table.sql-dialect", "hive");
235
TableEnvironment tEnv = TableEnvironment.create(config);
236
237
// Or via table config
238
tEnv.getConfig().getConfiguration().setString("table.sql-dialect", "hive");
239
```
240
241
## Hive Compatibility
242
243
### Hive SQL Dialect
244
245
When using Hive dialect, Flink supports Hive-specific SQL syntax:
246
247
```sql
248
-- Hive-style CTAS with storage format
249
CREATE TABLE target_table
250
STORED AS PARQUET
251
LOCATION '/path/to/table'
252
AS SELECT * FROM source_table;
253
254
-- Hive functions
255
SELECT concat_ws('|', col1, col2) FROM table1;
256
SELECT get_json_object(json_col, '$.field') FROM table2;
257
```
258
259
### Hive Catalog Integration
260
261
```java
262
// Register Hive catalog
263
HiveCatalog hive = new HiveCatalog("myhive", "default", "/path/to/hive-conf");
264
tEnv.registerCatalog("myhive", hive);
265
tEnv.useCatalog("myhive");
266
267
// Access Hive tables
268
Table hiveTable = tEnv.from("hive_database.hive_table");
269
```
270
271
## Built-in Functions
272
273
### Scalar Functions
274
275
```sql
276
-- String functions
277
SELECT UPPER(name), LOWER(email), LENGTH(description) FROM users;
278
SELECT CONCAT(first_name, ' ', last_name) as full_name FROM users;
279
SELECT SUBSTRING(text, 1, 10) FROM documents;
280
281
-- Math functions
282
SELECT ABS(balance), ROUND(price, 2), CEIL(rating) FROM products;
283
284
-- Date/Time functions
285
SELECT CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP;
286
SELECT EXTRACT(YEAR FROM order_date), DATE_FORMAT(created_at, 'yyyy-MM-dd') FROM orders;
287
288
-- Conditional functions
289
SELECT CASE WHEN age >= 18 THEN 'adult' ELSE 'minor' END FROM users;
290
SELECT COALESCE(nickname, first_name, 'Anonymous') FROM users;
291
```
292
293
### Aggregate Functions
294
295
```sql
296
-- Basic aggregates
297
SELECT COUNT(*), SUM(amount), AVG(rating), MIN(price), MAX(price) FROM products;
298
299
-- Statistical functions
300
SELECT STDDEV_POP(score), VAR_SAMP(rating) FROM reviews;
301
302
-- Collection functions
303
SELECT COLLECT(tags), LISTAGG(name, ',') FROM items GROUP BY category;
304
```
305
306
### Table Functions
307
308
```sql
309
-- String split
310
SELECT word FROM table1 CROSS JOIN UNNEST(SPLIT(description, ' ')) AS t(word);
311
312
-- JSON extraction
313
SELECT field_value FROM json_table CROSS JOIN UNNEST(JSON_EXTRACT_ARRAY(json_col)) AS t(field_value);
314
```
315
316
## Error Handling
317
318
SQL-specific exceptions:
319
320
```java { .api }
321
class SqlParserException extends TableException {
322
SqlParserException(String message);
323
SqlParserException(String message, Throwable cause);
324
}
325
326
class SqlExecutionException extends TableException;
327
class SqlValidationException extends ValidationException;
328
```
329
330
## Configuration Options
331
332
```java { .api }
333
class TableConfigOptions {
334
public static final ConfigOption<String> TABLE_SQL_DIALECT;
335
public static final ConfigOption<Duration> TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
336
public static final ConfigOption<String> TABLE_LOCAL_TIME_ZONE;
337
}
338
```
339
340
**Common configurations:**
341
342
```java
343
Configuration config = tEnv.getConfig().getConfiguration();
344
345
// Set SQL dialect
346
config.setString("table.sql-dialect", "default");
347
348
// Set local timezone
349
config.setString("table.local-time-zone", "UTC");
350
351
// Set default parallelism
352
config.setInteger("table.exec.resource.default-parallelism", 4);
353
```
354
355
## Statement Sets
356
357
For batch execution of multiple statements:
358
359
```java { .api }
360
interface StatementSet {
361
StatementSet addInsertSql(String statement);
362
StatementSet addInsert(String targetPath, Table table);
363
StatementSet addInsert(String targetPath, Table table, boolean overwrite);
364
TableResult execute();
365
String explain();
366
}
367
```
368
369
**Usage:**
370
371
```java
372
StatementSet stmtSet = tEnv.createStatementSet();
373
stmtSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE type = 'A'");
374
stmtSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE type = 'B'");
375
stmtSet.execute();
376
```
377
378
## Types
379
380
```java { .api }
381
interface TableResult {
382
ResultKind getResultKind();
383
ResolvedSchema getResolvedSchema();
384
CloseableIterator<Row> collect();
385
void print();
386
JobExecutionResult getJobExecutionResult();
387
}
388
389
enum ResultKind {
390
SUCCESS,
391
SUCCESS_WITH_CONTENT,
392
NOT_READY
393
}
394
395
interface SqlNode;
396
interface SqlIdentifier extends SqlNode;
397
interface SqlCall extends SqlNode;
398
```