0
# SQL Execution
1
2
Flink's Table API provides comprehensive SQL execution capabilities supporting DDL, DML, and query operations with advanced features like statement batching, result streaming, and metadata access.
3
4
## Capabilities
5
6
### SQL Query Execution
7
8
Execute SQL queries and retrieve results as Table objects for further processing.
9
10
```java { .api }
11
/**
12
* Evaluates a SQL query on registered tables and returns the result as a Table
13
* @param query SQL query string (SELECT, WITH, VALUES, etc.)
14
* @return Table representing the query result
15
*/
16
Table sqlQuery(String query);
17
```
18
19
**Usage Examples:**
20
21
```java
22
// Simple SELECT query
23
Table customers = tableEnv.sqlQuery("SELECT * FROM customers WHERE age > 25");
24
25
// Complex analytical query
26
Table salesAnalysis = tableEnv.sqlQuery(
27
"WITH monthly_sales AS (" +
28
" SELECT " +
29
" EXTRACT(YEAR FROM order_date) as year," +
30
" EXTRACT(MONTH FROM order_date) as month," +
31
" product_category," +
32
" SUM(amount) as total_sales" +
33
" FROM orders " +
34
" WHERE order_date >= DATE '2024-01-01'" +
35
" GROUP BY EXTRACT(YEAR FROM order_date), EXTRACT(MONTH FROM order_date), product_category" +
36
") " +
37
"SELECT " +
38
" year, month, product_category, total_sales," +
39
" LAG(total_sales) OVER (PARTITION BY product_category ORDER BY year, month) as prev_month_sales," +
40
" total_sales - LAG(total_sales) OVER (PARTITION BY product_category ORDER BY year, month) as growth" +
41
"FROM monthly_sales " +
42
"ORDER BY year, month, product_category"
43
);
44
45
// Window aggregation for streaming data
46
Table windowedEvents = tableEnv.sqlQuery(
47
"SELECT " +
48
" user_id," +
49
" TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start," +
50
" TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end," +
51
" COUNT(*) as event_count," +
52
" SUM(event_value) as total_value" +
53
"FROM user_events " +
54
"GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' HOUR)"
55
);
56
```
57
58
### SQL Statement Execution
59
60
Execute DDL, DML, and utility statements with execution results and metadata.
61
62
```java { .api }
63
/**
64
* Executes a single SQL statement and returns the execution result
65
* @param statement SQL statement (CREATE, DROP, INSERT, DESCRIBE, etc.)
66
* @return TableResult containing execution information and optional data
67
*/
68
TableResult executeSql(String statement);
69
```
70
71
**Usage Examples:**
72
73
```java
74
// DDL - Create table
75
TableResult createResult = tableEnv.executeSql(
76
"CREATE TABLE orders (" +
77
" order_id BIGINT," +
78
" customer_id BIGINT," +
79
" product_id BIGINT," +
80
" quantity INT," +
81
" unit_price DECIMAL(10, 2)," +
82
" order_time TIMESTAMP(3)," +
83
" WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND" +
84
") WITH (" +
85
" 'connector' = 'kafka'," +
86
" 'topic' = 'orders'," +
87
" 'properties.bootstrap.servers' = 'localhost:9092'," +
88
" 'format' = 'json'" +
89
")"
90
);
91
92
// DDL - Create view
93
tableEnv.executeSql(
94
"CREATE VIEW high_value_customers AS " +
95
"SELECT customer_id, SUM(quantity * unit_price) as total_spent " +
96
"FROM orders " +
97
"GROUP BY customer_id " +
98
"HAVING SUM(quantity * unit_price) > 1000"
99
);
100
101
// DML - Insert data
102
TableResult insertResult = tableEnv.executeSql(
103
"INSERT INTO customer_summary " +
104
"SELECT customer_id, COUNT(*) as order_count, SUM(quantity * unit_price) as total_spent " +
105
"FROM orders " +
106
"GROUP BY customer_id"
107
);
108
109
// Utility - Describe table
110
TableResult describeResult = tableEnv.executeSql("DESCRIBE orders");
111
describeResult.print();
112
113
// Utility - Show tables
114
TableResult showResult = tableEnv.executeSql("SHOW TABLES");
115
```
116
117
### Statement Set Operations
118
119
Batch multiple statements for efficient execution and dependency management.
120
121
```java { .api }
122
/**
123
* Creates a StatementSet for batch execution of multiple statements
124
* @return StatementSet instance for adding multiple operations
125
*/
126
StatementSet createStatementSet();
127
128
interface StatementSet {
129
/**
130
* Adds an INSERT SQL statement to the set
131
* @param statement INSERT SQL statement
132
* @return StatementSet for method chaining
133
*/
134
StatementSet addInsertSql(String statement);
135
136
/**
137
* Adds a table insertion to the set
138
* @param targetPath Target table path
139
* @param table Source table to insert
140
* @return StatementSet for method chaining
141
*/
142
StatementSet addInsert(String targetPath, Table table);
143
144
/**
145
* Explains the execution plan for all statements
146
* @return String representation of the execution plan
147
*/
148
String explain();
149
150
/**
151
* Executes all statements in the set
152
* @return TableResult with execution information
153
*/
154
TableResult execute();
155
}
156
```
157
158
**Usage Examples:**
159
160
```java
161
// Batch multiple related insertions
162
StatementSet stmtSet = tableEnv.createStatementSet();
163
164
// Add SQL insertions
165
stmtSet.addInsertSql(
166
"INSERT INTO daily_sales " +
167
"SELECT DATE(order_time) as sale_date, SUM(quantity * unit_price) as daily_total " +
168
"FROM orders " +
169
"WHERE DATE(order_time) = CURRENT_DATE " +
170
"GROUP BY DATE(order_time)"
171
);
172
173
stmtSet.addInsertSql(
174
"INSERT INTO product_sales " +
175
"SELECT product_id, COUNT(*) as order_count, SUM(quantity) as total_quantity " +
176
"FROM orders " +
177
"WHERE DATE(order_time) = CURRENT_DATE " +
178
"GROUP BY product_id"
179
);
180
181
// Add table insertions
182
Table customerMetrics = tableEnv.from("orders")
183
.groupBy($("customer_id"))
184
.select(
185
$("customer_id"),
186
$("order_id").count().as("order_count"),
187
$("quantity").sum().as("total_items")
188
);
189
190
stmtSet.addInsert("customer_metrics", customerMetrics);
191
192
// Execute all statements together
193
System.out.println(stmtSet.explain());
194
TableResult batchResult = stmtSet.execute();
195
```
196
197
### Result Handling
198
199
Process and consume query results with various access patterns.
200
201
```java { .api }
202
interface TableResult extends AutoCloseable {
203
/**
204
* Gets the result kind indicating success or content availability
205
* @return ResultKind (SUCCESS, SUCCESS_WITH_CONTENT)
206
*/
207
ResultKind getResultKind();
208
209
/**
210
* Gets the schema of the result
211
* @return ResolvedSchema describing result structure
212
*/
213
ResolvedSchema getResolvedSchema();
214
215
/**
216
* Collects all result rows as an iterator
217
* @return CloseableIterator for consuming result rows
218
*/
219
CloseableIterator<Row> collect();
220
221
/**
222
* Prints the result to standard output
223
*/
224
void print();
225
226
/**
227
* Prints the result with row limit
228
* @param maxNumRows Maximum number of rows to print
229
*/
230
void print(int maxNumRows);
231
232
/**
233
* Gets the job client for the executed job (for streaming queries)
234
* @return Optional JobClient for job monitoring
235
*/
236
Optional<JobClient> getJobClient();
237
}
238
239
enum ResultKind {
240
/** Statement executed successfully without result data */
241
SUCCESS,
242
/** Statement executed successfully with result data */
243
SUCCESS_WITH_CONTENT
244
}
245
```
246
247
**Usage Examples:**
248
249
```java
250
// Query execution with result processing
251
Table query = tableEnv.sqlQuery("SELECT customer_id, COUNT(*) as orders FROM orders GROUP BY customer_id");
252
TableResult result = query.execute();
253
254
// Check result type
255
if (result.getResultKind() == ResultKind.SUCCESS_WITH_CONTENT) {
256
// Access schema information
257
ResolvedSchema schema = result.getResolvedSchema();
258
System.out.println("Columns: " + schema.getColumnNames());
259
260
// Process results with iterator
261
try (CloseableIterator<Row> iterator = result.collect()) {
262
while (iterator.hasNext()) {
263
Row row = iterator.next();
264
Long customerId = row.getFieldAs(0);
265
Long orderCount = row.getFieldAs(1);
266
System.out.println("Customer " + customerId + " has " + orderCount + " orders");
267
}
268
}
269
}
270
271
// Simple result printing
272
tableEnv.sqlQuery("SELECT * FROM customers LIMIT 10").execute().print();
273
274
// Limited printing
275
tableEnv.sqlQuery("SELECT * FROM large_table").execute().print(100);
276
```
277
278
### Advanced SQL Features
279
280
Support for advanced SQL constructs and streaming-specific features.
281
282
```java { .api }
283
// Advanced SQL constructs examples - these are strings passed to sqlQuery() or executeSql()
284
```
285
286
**Usage Examples:**
287
288
```java
289
// Common Table Expressions (CTE)
290
Table cteQuery = tableEnv.sqlQuery(
291
"WITH RECURSIVE category_hierarchy AS (" +
292
" SELECT category_id, category_name, parent_id, 0 as level " +
293
" FROM categories WHERE parent_id IS NULL " +
294
" UNION ALL " +
295
" SELECT c.category_id, c.category_name, c.parent_id, ch.level + 1 " +
296
" FROM categories c " +
297
" JOIN category_hierarchy ch ON c.parent_id = ch.category_id" +
298
") " +
299
"SELECT * FROM category_hierarchy"
300
);
301
302
// Window functions
303
Table windowQuery = tableEnv.sqlQuery(
304
"SELECT " +
305
" customer_id, order_date, amount," +
306
" ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date) as order_sequence," +
307
" SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date " +
308
" ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total," +
309
" AVG(amount) OVER (PARTITION BY customer_id ORDER BY order_date " +
310
" ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) as moving_avg" +
311
"FROM orders"
312
);
313
314
// Streaming-specific: Event time processing
315
Table streamingQuery = tableEnv.sqlQuery(
316
"SELECT " +
317
" user_id," +
318
" HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) as window_start," +
319
" HOP_END(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) as window_end," +
320
" COUNT(*) as event_count," +
321
" MAX(event_value) as max_value" +
322
"FROM events " +
323
"GROUP BY user_id, HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)"
324
);
325
326
// Pattern matching with MATCH_RECOGNIZE
327
Table patternQuery = tableEnv.sqlQuery(
328
"SELECT customer_id, start_time, end_time, total_amount " +
329
"FROM orders " +
330
"MATCH_RECOGNIZE (" +
331
" PARTITION BY customer_id " +
332
" ORDER BY order_time " +
333
" MEASURES " +
334
" FIRST(A.order_time) as start_time," +
335
" LAST(C.order_time) as end_time," +
336
" SUM(A.amount + B.amount + C.amount) as total_amount " +
337
" PATTERN (A B+ C) " +
338
" DEFINE " +
339
" A as A.amount > 100," +
340
" B as B.amount > A.amount," +
341
" C as C.amount < B.amount" +
342
")"
343
);
344
345
// JSON processing
346
Table jsonQuery = tableEnv.sqlQuery(
347
"SELECT " +
348
" user_id," +
349
" JSON_EXTRACT(user_profile, '$.preferences.language') as preferred_language," +
350
" JSON_EXTRACT(user_profile, '$.address.country') as country," +
351
" JSON_QUERY(user_profile, '$.purchase_history[*].product_id') as purchased_products" +
352
"FROM users " +
353
"WHERE JSON_EXISTS(user_profile, '$.preferences')"
354
);
355
```
356
357
### SQL Dialect Support
358
359
Configure SQL dialect for compatibility with different SQL engines.
360
361
```java { .api }
362
/**
363
* Gets the table configuration for SQL dialect settings
364
* @return TableConfig for accessing dialect configuration
365
*/
366
TableConfig getConfig();
367
368
interface TableConfig {
369
/**
370
* Sets the SQL dialect for parsing
371
* @param sqlDialect Dialect to use (DEFAULT, HIVE)
372
*/
373
void setSqlDialect(SqlDialect sqlDialect);
374
375
/**
376
* Gets the current SQL dialect
377
* @return Currently configured SQL dialect
378
*/
379
SqlDialect getSqlDialect();
380
}
381
382
enum SqlDialect {
383
/** Default Flink SQL dialect */
384
DEFAULT,
385
/** Hive-compatible SQL dialect */
386
HIVE
387
}
388
```
389
390
**Usage Examples:**
391
392
```java
393
// Switch to Hive dialect for compatibility
394
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
395
396
// Hive-specific SQL features
397
tableEnv.executeSql(
398
"CREATE TABLE hive_table (" +
399
" id BIGINT," +
400
" name STRING," +
401
" partition_date STRING" +
402
") PARTITIONED BY (partition_date) " +
403
"STORED AS PARQUET"
404
);
405
406
// Switch back to default Flink dialect
407
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
408
```
409
410
## Types
411
412
### Result Types
413
414
```java { .api }
415
interface TableResult extends AutoCloseable {
416
ResultKind getResultKind();
417
ResolvedSchema getResolvedSchema();
418
CloseableIterator<Row> collect();
419
void print();
420
void print(int maxNumRows);
421
Optional<JobClient> getJobClient();
422
void close();
423
}
424
425
enum ResultKind {
426
SUCCESS,
427
SUCCESS_WITH_CONTENT
428
}
429
430
interface StatementSet {
431
StatementSet addInsertSql(String statement);
432
StatementSet addInsert(String targetPath, Table table);
433
String explain();
434
TableResult execute();
435
}
436
```
437
438
### Row Access
439
440
```java { .api }
441
class Row {
442
Object getField(int pos);
443
Object getField(String name);
444
<T> T getFieldAs(int pos);
445
<T> T getFieldAs(String name);
446
int getArity();
447
RowKind getKind();
448
449
// Factory methods
450
static Row of(Object... values);
451
static Row withNames(RowKind kind);
452
static Row withPositions(RowKind kind);
453
}
454
455
enum RowKind {
456
INSERT, // +I
457
UPDATE_BEFORE, // -U
458
UPDATE_AFTER, // +U
459
DELETE // -D
460
}
461
```
462
463
### Iterator Support
464
465
```java { .api }
466
interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
467
boolean hasNext();
468
T next();
469
void close();
470
}
471
```