0
# SQL Integration
1
2
Flink Table API provides seamless integration with SQL, allowing you to mix SQL statements with Table API operations. The system supports full SQL DDL, DML, and query capabilities alongside programmatic table operations.
3
4
## Capabilities
5
6
### SQL Query Execution
7
8
Execute SQL SELECT queries and retrieve results as Table objects.
9
10
```java { .api }
11
/**
12
* Executes a SQL query and returns the result as a Table
13
* @param query SQL SELECT statement
14
* @return Table containing query results
15
*/
16
public Table sqlQuery(String query);
17
```
18
19
**Usage Examples:**
20
21
```java
22
// Basic SQL query
23
Table result = tableEnv.sqlQuery(
24
"SELECT customer_id, SUM(amount) as total_amount " +
25
"FROM orders " +
26
"WHERE order_date >= '2023-01-01' " +
27
"GROUP BY customer_id " +
28
"HAVING SUM(amount) > 1000"
29
);
30
31
// Complex joins and window functions
32
Table analyticsResult = tableEnv.sqlQuery(
33
"SELECT " +
34
" c.customer_name, " +
35
" o.order_date, " +
36
" o.amount, " +
37
" SUM(o.amount) OVER (PARTITION BY c.customer_id ORDER BY o.order_date) as running_total, " +
38
" ROW_NUMBER() OVER (PARTITION BY c.customer_id ORDER BY o.amount DESC) as amount_rank " +
39
"FROM customers c " +
40
"JOIN orders o ON c.customer_id = o.customer_id " +
41
"WHERE o.order_date >= CURRENT_DATE - INTERVAL '30' DAY"
42
);
43
44
// Subqueries and CTEs
45
Table complexQuery = tableEnv.sqlQuery(
46
"WITH monthly_sales AS ( " +
47
" SELECT " +
48
" EXTRACT(YEAR FROM order_date) as year, " +
49
" EXTRACT(MONTH FROM order_date) as month, " +
50
" SUM(amount) as monthly_total " +
51
" FROM orders " +
52
" GROUP BY EXTRACT(YEAR FROM order_date), EXTRACT(MONTH FROM order_date) " +
53
"), " +
54
"avg_sales AS ( " +
55
" SELECT AVG(monthly_total) as avg_monthly " +
56
" FROM monthly_sales " +
57
") " +
58
"SELECT ms.year, ms.month, ms.monthly_total, " +
59
" ms.monthly_total - a.avg_monthly as deviation " +
60
"FROM monthly_sales ms CROSS JOIN avg_sales a " +
61
"ORDER BY ms.year, ms.month"
62
);
63
```
64
65
### SQL Statement Execution
66
67
Execute SQL DDL and DML statements that modify schema or data.
68
69
```java { .api }
70
/**
71
* Executes a SQL statement and returns execution results
72
* @param statement SQL DDL, DML, or query statement
73
* @return TableResult containing execution status and results
74
*/
75
public TableResult executeSql(String statement);
76
```
77
78
**Usage Examples:**
79
80
```java
81
// Create table with SQL DDL
82
tableEnv.executeSql(
83
"CREATE TABLE user_behavior ( " +
84
" user_id BIGINT, " +
85
" item_id BIGINT, " +
86
" category_id BIGINT, " +
87
" behavior STRING, " +
88
" ts TIMESTAMP(3), " +
89
" WATERMARK FOR ts AS ts - INTERVAL '5' SECOND " +
90
") WITH ( " +
91
" 'connector' = 'kafka', " +
92
" 'topic' = 'user_behavior', " +
93
" 'properties.bootstrap.servers' = 'localhost:9092', " +
94
" 'format' = 'json' " +
95
")"
96
);
97
98
// Create sink table
99
tableEnv.executeSql(
100
"CREATE TABLE result_table ( " +
101
" user_id BIGINT, " +
102
" behavior_count BIGINT, " +
103
" PRIMARY KEY (user_id) NOT ENFORCED " +
104
") WITH ( " +
105
" 'connector' = 'jdbc', " +
106
" 'url' = 'jdbc:mysql://localhost:3306/test', " +
107
" 'table-name' = 'user_behavior_count' " +
108
")"
109
);
110
111
// Insert data with SQL
112
TableResult insertResult = tableEnv.executeSql(
113
"INSERT INTO result_table " +
114
"SELECT user_id, COUNT(*) as behavior_count " +
115
"FROM user_behavior " +
116
"WHERE behavior = 'purchase' " +
117
"GROUP BY user_id"
118
);
119
120
// Get job client for monitoring
121
Optional<JobClient> jobClient = insertResult.getJobClient();
122
if (jobClient.isPresent()) {
123
System.out.println("Job ID: " + jobClient.get().getJobID());
124
}
125
```
126
127
### Statement Sets for Batch Execution
128
129
Group multiple DML statements for efficient batch execution.
130
131
```java { .api }
132
/**
133
* Creates a StatementSet for batching multiple statements
134
* @return StatementSet for adding multiple statements
135
*/
136
public StatementSet createStatementSet();
137
138
public interface StatementSet {
139
/**
140
* Adds an INSERT SQL statement to the set
141
* @param statement INSERT SQL statement
142
* @return StatementSet for method chaining
143
*/
144
StatementSet addInsertSql(String statement);
145
146
/**
147
* Adds an INSERT operation from a Table to the set
148
* @param targetPath Target table path
149
* @param table Table to insert
150
* @return StatementSet for method chaining
151
*/
152
StatementSet addInsert(String targetPath, Table table);
153
154
/**
155
* Executes all statements in the set
156
* @return TableResult containing execution status
157
*/
158
TableResult execute();
159
160
/**
161
* Explains the execution plan for all statements in the set
162
* @return String representation of the execution plan
163
*/
164
String explain();
165
}
166
```
167
168
**Usage Examples:**
169
170
```java
171
// Create statement set for multiple inserts
172
StatementSet stmtSet = tableEnv.createStatementSet();
173
174
// Add multiple SQL insert statements
175
stmtSet.addInsertSql(
176
"INSERT INTO daily_summary " +
177
"SELECT DATE(order_date) as day, COUNT(*) as order_count, SUM(amount) as total_amount " +
178
"FROM orders " +
179
"WHERE order_date >= CURRENT_DATE - INTERVAL '1' DAY " +
180
"GROUP BY DATE(order_date)"
181
);
182
183
stmtSet.addInsertSql(
184
"INSERT INTO customer_summary " +
185
"SELECT customer_id, COUNT(*) as order_count, SUM(amount) as total_spent " +
186
"FROM orders " +
187
"WHERE order_date >= CURRENT_DATE - INTERVAL '1' DAY " +
188
"GROUP BY customer_id"
189
);
190
191
// Add Table API insert
192
Table hourlyStats = sourceTable
193
.window(Tumble.over(lit(1).hour()).on($("event_time")).as("hourly"))
194
.groupBy($("hourly"))
195
.select($("hourly").start().as("hour"), count($("*")).as("event_count"));
196
197
stmtSet.addInsert("hourly_stats", hourlyStats);
198
199
// Execute all statements together
200
TableResult result = stmtSet.execute();
201
202
// Explain execution plan
203
String plan = stmtSet.explain();
204
System.out.println("Execution plan:\n" + plan);
205
```
206
207
### SQL and Table API Interoperability
208
209
Seamlessly mix SQL queries with Table API operations.
210
211
**Usage Examples:**
212
213
```java
214
// Start with SQL, continue with Table API
215
Table sqlTable = tableEnv.sqlQuery(
216
"SELECT customer_id, order_date, amount " +
217
"FROM orders " +
218
"WHERE amount > 100"
219
);
220
221
Table processed = sqlTable
222
.filter($("amount").isGreater(500))
223
.groupBy($("customer_id"))
224
.select($("customer_id"), $("amount").avg().as("avg_amount"));
225
226
// Start with Table API, continue with SQL
227
Table apiTable = sourceTable
228
.select($("id"), $("name"), $("salary"))
229
.filter($("salary").isGreater(50000));
230
231
// Register as temporary view for SQL access
232
tableEnv.createTemporaryView("high_earners", apiTable);
233
234
Table sqlResult = tableEnv.sqlQuery(
235
"SELECT name, salary, " +
236
" CASE WHEN salary > 100000 THEN 'Senior' " +
237
" WHEN salary > 75000 THEN 'Mid' " +
238
" ELSE 'Junior' END as level " +
239
"FROM high_earners " +
240
"ORDER BY salary DESC"
241
);
242
243
// Chain back to Table API
244
Table finalResult = sqlResult
245
.groupBy($("level"))
246
.select($("level"), count($("*")).as("count"), $("salary").avg().as("avg_salary"));
247
```
248
249
### Advanced SQL Features
250
251
Support for window functions, complex expressions, and advanced SQL constructs.
252
253
**Usage Examples:**
254
255
```java
256
// Window functions and analytical queries
257
Table windowAnalysis = tableEnv.sqlQuery(
258
"SELECT " +
259
" product_id, " +
260
" sale_date, " +
261
" daily_sales, " +
262
" -- Moving average over 7 days " +
263
" AVG(daily_sales) OVER ( " +
264
" PARTITION BY product_id " +
265
" ORDER BY sale_date " +
266
" ROWS BETWEEN 6 PRECEDING AND CURRENT ROW " +
267
" ) as moving_avg_7d, " +
268
" -- Cumulative sales " +
269
" SUM(daily_sales) OVER ( " +
270
" PARTITION BY product_id " +
271
" ORDER BY sale_date " +
272
" ROWS UNBOUNDED PRECEDING " +
273
" ) as cumulative_sales, " +
274
" -- Rank by sales within each month " +
275
" RANK() OVER ( " +
276
" PARTITION BY product_id, EXTRACT(YEAR_MONTH FROM sale_date) " +
277
" ORDER BY daily_sales DESC " +
278
" ) as monthly_rank " +
279
"FROM daily_product_sales"
280
);
281
282
// Complex case expressions and functions
283
Table caseAnalysis = tableEnv.sqlQuery(
284
"SELECT " +
285
" customer_id, " +
286
" order_count, " +
287
" total_amount, " +
288
" CASE " +
289
" WHEN total_amount > 10000 AND order_count > 20 THEN 'VIP' " +
290
" WHEN total_amount > 5000 OR order_count > 10 THEN 'Premium' " +
291
" WHEN total_amount > 1000 THEN 'Regular' " +
292
" ELSE 'New' " +
293
" END as customer_tier, " +
294
" -- Calculate percentile rank " +
295
" PERCENT_RANK() OVER (ORDER BY total_amount) as amount_percentile, " +
296
" -- String manipulation " +
297
" CONCAT('CUSTOMER_', LPAD(CAST(customer_id AS STRING), 8, '0')) as customer_code, " +
298
" -- Date functions " +
299
" EXTRACT(DAYOFWEEK FROM first_order_date) as first_order_dow, " +
300
" DATEDIFF(last_order_date, first_order_date) as customer_lifetime_days " +
301
"FROM customer_summary"
302
);
303
304
// Array and map operations (if supported by connectors)
305
Table arrayOperations = tableEnv.sqlQuery(
306
"SELECT " +
307
" user_id, " +
308
" tags, " +
309
" CARDINALITY(tags) as tag_count, " +
310
" tags[1] as primary_tag, " +
311
" -- Check if array contains specific value " +
312
" 'premium' = ANY(tags) as is_premium_user, " +
313
" -- String aggregation " +
314
" ARRAY_JOIN(tags, ', ') as tags_string " +
315
"FROM user_profiles " +
316
"WHERE CARDINALITY(tags) > 0"
317
);
318
```
319
320
### SQL Temporal Features
321
322
Time-based operations and temporal table functions in SQL.
323
324
**Usage Examples:**
325
326
```java
327
// Temporal joins (for lookups with time-based versioning)
328
Table temporalJoin = tableEnv.sqlQuery(
329
"SELECT " +
330
" o.order_id, " +
331
" o.customer_id, " +
332
" o.product_id, " +
333
" o.order_time, " +
334
" o.amount, " +
335
" p.product_name, " +
336
" p.category, " +
337
" -- Get product info as of order time " +
338
" p.price as price_at_order_time " +
339
"FROM orders o " +
340
"JOIN product_changelog FOR SYSTEM_TIME AS OF o.order_time AS p " +
341
" ON o.product_id = p.product_id"
342
);
343
344
// Interval operations
345
Table intervalQuery = tableEnv.sqlQuery(
346
"SELECT " +
347
" user_id, " +
348
" login_time, " +
349
" logout_time, " +
350
" logout_time - login_time as session_duration, " +
351
" -- Check if session was longer than 30 minutes " +
352
" logout_time - login_time > INTERVAL '30' MINUTE as long_session, " +
353
" -- Add time intervals " +
354
" login_time + INTERVAL '1' HOUR as expected_timeout " +
355
"FROM user_sessions " +
356
"WHERE logout_time - login_time > INTERVAL '1' MINUTE"
357
);
358
359
// Watermark and event time operations
360
Table eventTimeQuery = tableEnv.sqlQuery(
361
"SELECT " +
362
" sensor_id, " +
363
" TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start, " +
364
" TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end, " +
365
" COUNT(*) as reading_count, " +
366
" AVG(temperature) as avg_temperature, " +
367
" MIN(temperature) as min_temperature, " +
368
" MAX(temperature) as max_temperature " +
369
"FROM sensor_readings " +
370
"GROUP BY sensor_id, TUMBLE(event_time, INTERVAL '1' HOUR)"
371
);
372
```
373
374
### SQL Configuration and Dialects
375
376
Configure SQL parsing and execution behavior.
377
378
```java { .api }
379
/**
380
* SQL dialect configuration
381
*/
382
public enum SqlDialect {
383
/** Default Flink SQL dialect */
384
DEFAULT,
385
/** Hive-compatible SQL dialect for migration scenarios */
386
HIVE
387
}
388
```
389
390
**Usage Examples:**
391
392
```java
393
// Set SQL dialect
394
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
395
396
// Hive dialect for compatibility
397
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
398
399
// Use Hive-specific syntax when in Hive dialect
400
if (tableEnv.getConfig().getSqlDialect() == SqlDialect.HIVE) {
401
tableEnv.executeSql(
402
"CREATE TABLE hive_table ( " +
403
" id BIGINT, " +
404
" name STRING, " +
405
" part_date STRING " +
406
") PARTITIONED BY (part_date) " +
407
"STORED AS PARQUET " +
408
"LOCATION '/warehouse/hive_table'"
409
);
410
}
411
412
// Configuration through table config
413
Configuration config = tableEnv.getConfig().getConfiguration();
414
config.setString("table.sql-dialect", "default");
415
config.setString("table.local-time-zone", "UTC");
416
```
417
418
### SQL Error Handling
419
420
Handle SQL parsing and execution errors.
421
422
```java { .api }
423
/**
424
* Exception thrown for SQL parsing errors
425
*/
426
public class SqlParserException extends RuntimeException {
427
public SqlParserException(String message);
428
public SqlParserException(String message, Throwable cause);
429
}
430
431
/**
432
* Exception for SQL parsing errors at end of input
433
*/
434
public class SqlParserEOFException extends SqlParserException {
435
public SqlParserEOFException(String message);
436
}
437
```
438
439
**Usage Examples:**
440
441
```java
442
// Handle SQL parsing errors
443
try {
444
Table result = tableEnv.sqlQuery(
445
"SELECT customer_id, SUM(amount " + // Missing closing parenthesis
446
"FROM orders " +
447
"GROUP BY customer_id"
448
);
449
} catch (SqlParserException e) {
450
System.err.println("SQL parsing failed: " + e.getMessage());
451
// Handle parsing error, perhaps show user-friendly error message
452
}
453
454
// Handle execution errors
455
try {
456
TableResult result = tableEnv.executeSql(
457
"INSERT INTO non_existent_table " +
458
"SELECT * FROM orders"
459
);
460
} catch (Exception e) {
461
System.err.println("SQL execution failed: " + e.getMessage());
462
// Handle execution error
463
}
464
465
// Validate SQL before execution
466
try {
467
String sql = "SELECT * FROM orders WHERE amount > ?";
468
// In practice, you might want to validate parameter binding
469
Table validated = tableEnv.sqlQuery(sql.replace("?", "100"));
470
System.out.println("SQL is valid");
471
} catch (SqlParserException e) {
472
System.err.println("Invalid SQL: " + e.getMessage());
473
}
474
```
475
476
### DDL Operations via SQL
477
478
Create and manage database objects using SQL DDL.
479
480
**Usage Examples:**
481
482
```java
483
// Create catalog
484
tableEnv.executeSql(
485
"CREATE CATALOG my_catalog WITH ( " +
486
" 'type' = 'hive', " +
487
" 'hive-conf-dir' = '/opt/hive/conf' " +
488
")"
489
);
490
491
// Create database
492
tableEnv.executeSql(
493
"CREATE DATABASE IF NOT EXISTS analytics " +
494
"COMMENT 'Analytics database for business intelligence'"
495
);
496
497
// Create function
498
tableEnv.executeSql(
499
"CREATE TEMPORARY FUNCTION hash_func AS 'com.example.HashFunction'"
500
);
501
502
// Create view
503
tableEnv.executeSql(
504
"CREATE VIEW high_value_customers AS " +
505
"SELECT customer_id, SUM(amount) as total_spent " +
506
"FROM orders " +
507
"GROUP BY customer_id " +
508
"HAVING SUM(amount) > 5000"
509
);
510
511
// Alter table
512
tableEnv.executeSql(
513
"ALTER TABLE orders ADD COLUMN discount_amount DECIMAL(10,2)"
514
);
515
516
// Drop objects
517
tableEnv.executeSql("DROP VIEW IF EXISTS high_value_customers");
518
tableEnv.executeSql("DROP TEMPORARY FUNCTION hash_func");
519
tableEnv.executeSql("DROP TABLE temp_results");
520
```