0
# SQL Integration
1
2
The Flink Table API provides comprehensive SQL support through Apache Calcite integration, enabling both DDL (Data Definition Language) and DML (Data Manipulation Language) operations alongside standard SQL queries.
3
4
## Capabilities
5
6
### SQL Query Execution
7
8
Execute SQL queries directly and convert results to Table objects.
9
10
```scala { .api }
11
/**
12
* Executes a SQL query and returns the result as a Table
13
* @param query The SQL query string
14
* @returns Table containing query results
15
*/
16
def sqlQuery(query: String): Table
17
18
/**
19
* Executes a SQL statement (DDL/DML operations)
20
* @param stmt The SQL statement string
21
*/
22
def sqlUpdate(stmt: String): Unit
23
```
24
25
**Usage Examples:**
26
27
```scala
28
// Basic SELECT queries
29
val basicQuery = tEnv.sqlQuery("SELECT name, age FROM Users WHERE age > 21")
30
31
val aggregateQuery = tEnv.sqlQuery("""
32
SELECT department,
33
COUNT(*) as employee_count,
34
AVG(salary) as avg_salary,
35
MAX(salary) as max_salary
36
FROM Employees
37
GROUP BY department
38
""")
39
40
// Complex joins and subqueries
41
val complexQuery = tEnv.sqlQuery("""
42
SELECT e.name, e.salary, d.department_name, d.budget
43
FROM Employees e
44
JOIN Departments d ON e.dept_id = d.id
45
WHERE e.salary > (
46
SELECT AVG(salary) * 1.2
47
FROM Employees
48
WHERE dept_id = e.dept_id
49
)
50
""")
51
52
// Window queries
53
val windowQuery = tEnv.sqlQuery("""
54
SELECT
55
user_id,
56
TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
57
TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end,
58
SUM(amount) as total_amount
59
FROM Transactions
60
GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' HOUR)
61
""")
62
```
63
64
### Data Definition Language (DDL)
65
66
Create and manage table structures, views, and database objects.
67
68
```scala { .api }
69
// Table creation and management
70
tEnv.sqlUpdate("""
71
CREATE TABLE Users (
72
id BIGINT,
73
name STRING,
74
email STRING,
75
age INT,
76
registration_time TIMESTAMP(3),
77
WATERMARK FOR registration_time AS registration_time - INTERVAL '5' SECOND
78
) WITH (
79
'connector' = 'kafka',
80
'topic' = 'users',
81
'properties.bootstrap.servers' = 'localhost:9092',
82
'format' = 'json'
83
)
84
""")
85
86
// Temporary table creation
87
tEnv.sqlUpdate("""
88
CREATE TEMPORARY TABLE TempResults (
89
category STRING,
90
total_sales DOUBLE,
91
avg_price DOUBLE
92
) WITH (
93
'connector' = 'print'
94
)
95
""")
96
97
// View creation
98
tEnv.sqlUpdate("""
99
CREATE VIEW ActiveUsers AS
100
SELECT id, name, email
101
FROM Users
102
WHERE last_login > CURRENT_TIMESTAMP - INTERVAL '30' DAY
103
""")
104
105
// Temporary view creation
106
tEnv.sqlUpdate("""
107
CREATE TEMPORARY VIEW RecentOrders AS
108
SELECT o.*, u.name as user_name
109
FROM Orders o
110
JOIN Users u ON o.user_id = u.id
111
WHERE o.order_time > CURRENT_TIMESTAMP - INTERVAL '7' DAY
112
""")
113
```
114
115
### Data Manipulation Language (DML)
116
117
Insert, update, and manipulate data using SQL statements.
118
119
```scala { .api }
120
// Insert operations
121
tEnv.sqlUpdate("""
122
INSERT INTO UserStatistics
123
SELECT
124
user_id,
125
COUNT(*) as order_count,
126
SUM(amount) as total_spent,
127
AVG(amount) as avg_order_value
128
FROM Orders
129
GROUP BY user_id
130
""")
131
132
// Insert with specific columns
133
tEnv.sqlUpdate("""
134
INSERT INTO AuditLog (event_type, user_id, timestamp)
135
SELECT 'ORDER_CREATED', user_id, order_time
136
FROM Orders
137
WHERE order_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR
138
""")
139
140
// Insert from multiple sources
141
tEnv.sqlUpdate("""
142
INSERT INTO AllTransactions
143
SELECT transaction_id, amount, 'CREDIT' as type, timestamp
144
FROM CreditTransactions
145
UNION ALL
146
SELECT transaction_id, amount, 'DEBIT' as type, timestamp
147
FROM DebitTransactions
148
""")
149
```
150
151
### Built-in SQL Functions
152
153
Comprehensive set of built-in functions for data processing and transformation.
154
155
```scala { .api }
156
// String functions
157
val stringFunctions = tEnv.sqlQuery("""
158
SELECT
159
UPPER(name) as upper_name,
160
LOWER(email) as lower_email,
161
SUBSTRING(phone, 1, 3) as area_code,
162
CONCAT(first_name, ' ', last_name) as full_name,
163
LENGTH(description) as desc_length,
164
TRIM(BOTH ' ' FROM padded_text) as trimmed
165
FROM Users
166
""")
167
168
// Numeric functions
169
val numericFunctions = tEnv.sqlQuery("""
170
SELECT
171
ABS(balance) as abs_balance,
172
ROUND(salary, 2) as rounded_salary,
173
CEILING(score) as ceil_score,
174
FLOOR(rating) as floor_rating,
175
MOD(id, 10) as id_mod,
176
POWER(base_value, 2) as squared_value
177
FROM Employees
178
""")
179
180
// Date and time functions
181
val timeFunctions = tEnv.sqlQuery("""
182
SELECT
183
CURRENT_TIMESTAMP as current_ts,
184
CURRENT_DATE as current_date,
185
CURRENT_TIME as current_time,
186
EXTRACT(YEAR FROM birth_date) as birth_year,
187
EXTRACT(MONTH FROM hire_date) as hire_month,
188
DATEDIFF(CURRENT_DATE, hire_date) as days_employed,
189
DATE_FORMAT(event_time, 'yyyy-MM-dd HH:mm') as formatted_time
190
FROM Employees
191
""")
192
193
// Conditional functions
194
val conditionalFunctions = tEnv.sqlQuery("""
195
SELECT
196
name,
197
CASE
198
WHEN age < 18 THEN 'Minor'
199
WHEN age >= 18 AND age < 65 THEN 'Adult'
200
ELSE 'Senior'
201
END as age_category,
202
COALESCE(middle_name, 'N/A') as middle_name_safe,
203
NULLIF(status, 'UNKNOWN') as clean_status,
204
IF(salary > 50000, 'High', 'Standard') as salary_tier
205
FROM Employees
206
""")
207
```
208
209
### Aggregate Functions
210
211
SQL aggregate functions for data summarization and analysis.
212
213
```scala { .api }
214
// Basic aggregations
215
val basicAggregates = tEnv.sqlQuery("""
216
SELECT
217
department,
218
COUNT(*) as employee_count,
219
COUNT(DISTINCT position) as unique_positions,
220
SUM(salary) as total_salary,
221
AVG(salary) as avg_salary,
222
MIN(hire_date) as earliest_hire,
223
MAX(salary) as highest_salary,
224
STDDEV(salary) as salary_stddev,
225
VAR_SAMP(salary) as salary_variance
226
FROM Employees
227
GROUP BY department
228
""")
229
230
// Advanced aggregations with HAVING
231
val advancedAggregates = tEnv.sqlQuery("""
232
SELECT
233
department,
234
position,
235
COUNT(*) as count,
236
AVG(salary) as avg_salary,
237
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY salary) as median_salary,
238
COLLECT(name) as employee_names
239
FROM Employees
240
GROUP BY department, position
241
HAVING COUNT(*) >= 3 AND AVG(salary) > 40000
242
""")
243
244
// Grouping sets and rollup
245
val groupingSets = tEnv.sqlQuery("""
246
SELECT
247
department,
248
position,
249
gender,
250
COUNT(*) as employee_count,
251
AVG(salary) as avg_salary
252
FROM Employees
253
GROUP BY GROUPING SETS (
254
(department, position, gender),
255
(department, position),
256
(department),
257
()
258
)
259
""")
260
```
261
262
### Window Functions in SQL
263
264
Analytical window functions for advanced data analysis.
265
266
```scala { .api }
267
// Time-based windows
268
val timeWindows = tEnv.sqlQuery("""
269
SELECT
270
user_id,
271
transaction_time,
272
amount,
273
TUMBLE_START(transaction_time, INTERVAL '1' HOUR) as window_start,
274
TUMBLE_END(transaction_time, INTERVAL '1' HOUR) as window_end,
275
SUM(amount) OVER (
276
PARTITION BY user_id, TUMBLE(transaction_time, INTERVAL '1' HOUR)
277
) as hourly_total
278
FROM Transactions
279
""")
280
281
// Ranking and analytical functions
282
val analyticalFunctions = tEnv.sqlQuery("""
283
SELECT
284
employee_id,
285
department,
286
salary,
287
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank,
288
RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank_ties,
289
DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_dense_rank,
290
PERCENT_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_percent_rank,
291
NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) as salary_quartile
292
FROM Employees
293
""")
294
295
// Frame-based window functions
296
val frameFunctions = tEnv.sqlQuery("""
297
SELECT
298
employee_id,
299
salary,
300
hire_date,
301
SUM(salary) OVER (
302
ORDER BY hire_date
303
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
304
) as running_salary_sum,
305
AVG(salary) OVER (
306
ORDER BY hire_date
307
ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING
308
) as moving_avg_salary,
309
LAG(salary, 1) OVER (ORDER BY hire_date) as prev_salary,
310
LEAD(salary, 1) OVER (ORDER BY hire_date) as next_salary,
311
FIRST_VALUE(salary) OVER (
312
PARTITION BY department
313
ORDER BY hire_date
314
ROWS UNBOUNDED PRECEDING
315
) as first_dept_salary
316
FROM Employees
317
""")
318
```
319
320
### Table-Valued Functions
321
322
Functions that return tables for complex data transformations.
323
324
```scala { .api }
325
// JSON parsing function (example of table-valued function usage)
326
val jsonParsing = tEnv.sqlQuery("""
327
SELECT
328
user_id,
329
json_data,
330
parsed.name,
331
parsed.age,
332
parsed.preferences
333
FROM Users u,
334
LATERAL TABLE(JSON_PARSE(u.json_data)) as parsed(name, age, preferences)
335
""")
336
337
// String splitting function
338
val stringSplitting = tEnv.sqlQuery("""
339
SELECT
340
user_id,
341
tag
342
FROM Users u,
343
LATERAL TABLE(SPLIT(u.tags, ',')) as tag_table(tag)
344
WHERE tag IS NOT NULL AND LENGTH(tag) > 0
345
""")
346
347
// Range generation function
348
val rangeGeneration = tEnv.sqlQuery("""
349
SELECT
350
generate_series.value as day_offset,
351
DATE_ADD(CURRENT_DATE, generate_series.value) as date
352
FROM LATERAL TABLE(GENERATE_SERIES(0, 30)) as generate_series(value)
353
""")
354
```
355
356
### Common Table Expressions (CTEs)
357
358
Hierarchical and recursive queries using WITH clauses.
359
360
```scala { .api }
361
// Basic CTE usage
362
val cteQuery = tEnv.sqlQuery("""
363
WITH department_stats AS (
364
SELECT
365
department,
366
COUNT(*) as emp_count,
367
AVG(salary) as avg_salary,
368
MAX(salary) as max_salary
369
FROM Employees
370
GROUP BY department
371
),
372
high_performing_depts AS (
373
SELECT department, avg_salary
374
FROM department_stats
375
WHERE emp_count >= 5 AND avg_salary > 50000
376
)
377
SELECT
378
e.name,
379
e.salary,
380
e.department,
381
hpd.avg_salary as dept_avg_salary,
382
(e.salary - hpd.avg_salary) as salary_diff
383
FROM Employees e
384
JOIN high_performing_depts hpd ON e.department = hpd.department
385
WHERE e.salary > hpd.avg_salary * 1.1
386
""")
387
388
// Recursive CTE for hierarchical data
389
val recursiveCTE = tEnv.sqlQuery("""
390
WITH RECURSIVE employee_hierarchy AS (
391
-- Anchor: top-level managers
392
SELECT employee_id, name, manager_id, 1 as level, name as path
393
FROM Employees
394
WHERE manager_id IS NULL
395
396
UNION ALL
397
398
-- Recursive: employees with managers
399
SELECT
400
e.employee_id,
401
e.name,
402
e.manager_id,
403
eh.level + 1,
404
eh.path || ' -> ' || e.name
405
FROM Employees e
406
JOIN employee_hierarchy eh ON e.manager_id = eh.employee_id
407
WHERE eh.level < 10 -- Prevent infinite recursion
408
)
409
SELECT employee_id, name, level, path
410
FROM employee_hierarchy
411
ORDER BY level, name
412
""")
413
```
414
415
### Advanced SQL Features
416
417
Complex SQL constructs for sophisticated data processing.
418
419
```scala { .api }
420
// Pivot operations (using CASE expressions)
421
val pivotQuery = tEnv.sqlQuery("""
422
SELECT
423
department,
424
SUM(CASE WHEN quarter = 'Q1' THEN sales END) as Q1_sales,
425
SUM(CASE WHEN quarter = 'Q2' THEN sales END) as Q2_sales,
426
SUM(CASE WHEN quarter = 'Q3' THEN sales END) as Q3_sales,
427
SUM(CASE WHEN quarter = 'Q4' THEN sales END) as Q4_sales
428
FROM QuarterlySales
429
GROUP BY department
430
""")
431
432
// Set operations
433
val setOperations = tEnv.sqlQuery("""
434
SELECT employee_id, name FROM CurrentEmployees
435
UNION
436
SELECT employee_id, name FROM FormerEmployees
437
438
INTERSECT
439
440
SELECT employee_id, name FROM EligibleForRehire
441
442
EXCEPT
443
444
SELECT employee_id, name FROM BlacklistedEmployees
445
""")
446
447
// Correlated subqueries
448
val correlatedSubqueries = tEnv.sqlQuery("""
449
SELECT
450
e1.name,
451
e1.salary,
452
e1.department,
453
(SELECT COUNT(*)
454
FROM Employees e2
455
WHERE e2.department = e1.department AND e2.salary > e1.salary
456
) as employees_with_higher_salary,
457
(SELECT AVG(salary)
458
FROM Employees e3
459
WHERE e3.department = e1.department
460
) as dept_avg_salary
461
FROM Employees e1
462
WHERE e1.salary > (
463
SELECT AVG(salary) * 1.2
464
FROM Employees e4
465
WHERE e4.department = e1.department
466
)
467
""")
468
```
469
470
### SQL Configuration and Optimization
471
472
SQL-specific configuration and query optimization hints.
473
474
```scala { .api }
475
// Query hints for optimization
476
val optimizedQuery = tEnv.sqlQuery("""
477
SELECT /*+ USE_HASH_JOIN(e, d) */
478
e.name,
479
d.department_name
480
FROM Employees e
481
JOIN /*+ BROADCAST(d) */ Departments d ON e.dept_id = d.id
482
""")
483
484
// Configuration for SQL execution
485
val tableConfig = tEnv.getConfig
486
tableConfig.setSqlDialect(SqlDialect.HIVE) // Set SQL dialect
487
tableConfig.getConfiguration.setString("table.optimizer.join-reorder-enabled", "true")
488
489
// Enable different SQL features
490
tEnv.getConfig.getConfiguration.setString("table.sql-dialect", "default")
491
tEnv.getConfig.getConfiguration.setBoolean("table.optimizer.agg-phase-strategy", true)
492
```
493
494
### Error Handling and Debugging
495
496
SQL-related error handling and query debugging techniques.
497
498
```scala { .api }
499
// Explain query plans
500
val queryPlan = tEnv.explainSql("""
501
SELECT department, COUNT(*), AVG(salary)
502
FROM Employees
503
WHERE hire_date > '2020-01-01'
504
GROUP BY department
505
HAVING COUNT(*) > 5
506
""")
507
508
println(queryPlan)
509
510
// Try-catch for SQL execution
511
try {
512
val result = tEnv.sqlQuery("SELECT * FROM NonExistentTable")
513
} catch {
514
case ex: ValidationException =>
515
println(s"SQL validation error: ${ex.getMessage}")
516
case ex: SqlParserException =>
517
println(s"SQL parsing error: ${ex.getMessage}")
518
case ex: TableException =>
519
println(s"Table API error: ${ex.getMessage}")
520
}
521
522
// Debug SQL execution
523
tEnv.getConfig.getConfiguration.setString("table.exec.resource.default-parallelism", "1")
524
tEnv.getConfig.getConfiguration.setBoolean("table.optimizer.distinct-agg.split.enabled", false)
525
```
526
527
## SQL Compatibility and Limitations
528
529
Understanding SQL feature support and limitations in Flink Table API.
530
531
```scala { .api }
532
// Supported SQL features:
533
// - Standard SQL:2011 features
534
// - Window functions (TUMBLE, HOP, SESSION)
535
// - User-defined functions (scalar, table, aggregate)
536
// - Complex data types (ROW, ARRAY, MAP)
537
// - JSON functions and operators
538
// - Regular expressions and pattern matching
539
540
// Limitations and considerations:
541
// - Some advanced SQL features may not be supported
542
// - Performance characteristics differ from traditional databases
543
// - Streaming vs batch semantics affect result consistency
544
// - Late data handling in streaming mode
545
// - Watermark and time attribute requirements for event time operations
546
```
547
548
## Types
549
550
```scala { .api }
551
// SQL-related exception types
552
class SqlParserException(message: String) extends RuntimeException(message)
553
class ValidationException(message: String) extends TableException(message)
554
555
// SQL dialect configuration
556
enum SqlDialect {
557
DEFAULT, HIVE
558
}
559
560
// Query execution result
561
trait QueryResult {
562
def print(): Unit
563
def collect(): java.util.List[Row]
564
}
565
```