or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdsources-sinks.mdsql-integration.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.mdwindow-operations.md

sql-integration.mddocs/

0

# SQL Integration

1

2

The Flink Table API provides comprehensive SQL support through Apache Calcite integration, enabling both DDL (Data Definition Language) and DML (Data Manipulation Language) operations alongside standard SQL queries.

3

4

## Capabilities

5

6

### SQL Query Execution

7

8

Execute SQL queries directly and convert results to Table objects.

9

10

```scala { .api }

11

/**

12

* Executes a SQL query and returns the result as a Table

13

* @param query The SQL query string

14

* @returns Table containing query results

15

*/

16

def sqlQuery(query: String): Table

17

18

/**

19

* Executes a SQL statement (DDL/DML operations)

20

* @param stmt The SQL statement string

21

*/

22

def sqlUpdate(stmt: String): Unit

23

```

24

25

**Usage Examples:**

26

27

```scala

28

// Basic SELECT queries

29

val basicQuery = tEnv.sqlQuery("SELECT name, age FROM Users WHERE age > 21")

30

31

val aggregateQuery = tEnv.sqlQuery("""

32

SELECT department,

33

COUNT(*) as employee_count,

34

AVG(salary) as avg_salary,

35

MAX(salary) as max_salary

36

FROM Employees

37

GROUP BY department

38

""")

39

40

// Complex joins and subqueries

41

val complexQuery = tEnv.sqlQuery("""

42

SELECT e.name, e.salary, d.department_name, d.budget

43

FROM Employees e

44

JOIN Departments d ON e.dept_id = d.id

45

WHERE e.salary > (

46

SELECT AVG(salary) * 1.2

47

FROM Employees

48

WHERE dept_id = e.dept_id

49

)

50

""")

51

52

// Window queries

53

val windowQuery = tEnv.sqlQuery("""

54

SELECT

55

user_id,

56

TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,

57

TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end,

58

SUM(amount) as total_amount

59

FROM Transactions

60

GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' HOUR)

61

""")

62

```

63

64

### Data Definition Language (DDL)

65

66

Create and manage table structures, views, and database objects.

67

68

```scala { .api }

69

// Table creation and management

70

tEnv.sqlUpdate("""

71

CREATE TABLE Users (

72

id BIGINT,

73

name STRING,

74

email STRING,

75

age INT,

76

registration_time TIMESTAMP(3),

77

WATERMARK FOR registration_time AS registration_time - INTERVAL '5' SECOND

78

) WITH (

79

'connector' = 'kafka',

80

'topic' = 'users',

81

'properties.bootstrap.servers' = 'localhost:9092',

82

'format' = 'json'

83

)

84

""")

85

86

// Temporary table creation

87

tEnv.sqlUpdate("""

88

CREATE TEMPORARY TABLE TempResults (

89

category STRING,

90

total_sales DOUBLE,

91

avg_price DOUBLE

92

) WITH (

93

'connector' = 'print'

94

)

95

""")

96

97

// View creation

98

tEnv.sqlUpdate("""

99

CREATE VIEW ActiveUsers AS

100

SELECT id, name, email

101

FROM Users

102

WHERE last_login > CURRENT_TIMESTAMP - INTERVAL '30' DAY

103

""")

104

105

// Temporary view creation

106

tEnv.sqlUpdate("""

107

CREATE TEMPORARY VIEW RecentOrders AS

108

SELECT o.*, u.name as user_name

109

FROM Orders o

110

JOIN Users u ON o.user_id = u.id

111

WHERE o.order_time > CURRENT_TIMESTAMP - INTERVAL '7' DAY

112

""")

113

```

114

115

### Data Manipulation Language (DML)

116

117

Insert, update, and manipulate data using SQL statements.

118

119

```scala { .api }

120

// Insert operations

121

tEnv.sqlUpdate("""

122

INSERT INTO UserStatistics

123

SELECT

124

user_id,

125

COUNT(*) as order_count,

126

SUM(amount) as total_spent,

127

AVG(amount) as avg_order_value

128

FROM Orders

129

GROUP BY user_id

130

""")

131

132

// Insert with specific columns

133

tEnv.sqlUpdate("""

134

INSERT INTO AuditLog (event_type, user_id, timestamp)

135

SELECT 'ORDER_CREATED', user_id, order_time

136

FROM Orders

137

WHERE order_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR

138

""")

139

140

// Insert from multiple sources

141

tEnv.sqlUpdate("""

142

INSERT INTO AllTransactions

143

SELECT transaction_id, amount, 'CREDIT' as type, timestamp

144

FROM CreditTransactions

145

UNION ALL

146

SELECT transaction_id, amount, 'DEBIT' as type, timestamp

147

FROM DebitTransactions

148

""")

149

```

150

151

### Built-in SQL Functions

152

153

Comprehensive set of built-in functions for data processing and transformation.

154

155

```scala { .api }

156

// String functions

157

val stringFunctions = tEnv.sqlQuery("""

158

SELECT

159

UPPER(name) as upper_name,

160

LOWER(email) as lower_email,

161

SUBSTRING(phone, 1, 3) as area_code,

162

CONCAT(first_name, ' ', last_name) as full_name,

163

LENGTH(description) as desc_length,

164

TRIM(BOTH ' ' FROM padded_text) as trimmed

165

FROM Users

166

""")

167

168

// Numeric functions

169

val numericFunctions = tEnv.sqlQuery("""

170

SELECT

171

ABS(balance) as abs_balance,

172

ROUND(salary, 2) as rounded_salary,

173

CEILING(score) as ceil_score,

174

FLOOR(rating) as floor_rating,

175

MOD(id, 10) as id_mod,

176

POWER(base_value, 2) as squared_value

177

FROM Employees

178

""")

179

180

// Date and time functions

181

val timeFunctions = tEnv.sqlQuery("""

182

SELECT

183

CURRENT_TIMESTAMP as current_ts,

184

CURRENT_DATE as current_date,

185

CURRENT_TIME as current_time,

186

EXTRACT(YEAR FROM birth_date) as birth_year,

187

EXTRACT(MONTH FROM hire_date) as hire_month,

188

DATEDIFF(CURRENT_DATE, hire_date) as days_employed,

189

DATE_FORMAT(event_time, 'yyyy-MM-dd HH:mm') as formatted_time

190

FROM Employees

191

""")

192

193

// Conditional functions

194

val conditionalFunctions = tEnv.sqlQuery("""

195

SELECT

196

name,

197

CASE

198

WHEN age < 18 THEN 'Minor'

199

WHEN age >= 18 AND age < 65 THEN 'Adult'

200

ELSE 'Senior'

201

END as age_category,

202

COALESCE(middle_name, 'N/A') as middle_name_safe,

203

NULLIF(status, 'UNKNOWN') as clean_status,

204

IF(salary > 50000, 'High', 'Standard') as salary_tier

205

FROM Employees

206

""")

207

```

208

209

### Aggregate Functions

210

211

SQL aggregate functions for data summarization and analysis.

212

213

```scala { .api }

214

// Basic aggregations

215

val basicAggregates = tEnv.sqlQuery("""

216

SELECT

217

department,

218

COUNT(*) as employee_count,

219

COUNT(DISTINCT position) as unique_positions,

220

SUM(salary) as total_salary,

221

AVG(salary) as avg_salary,

222

MIN(hire_date) as earliest_hire,

223

MAX(salary) as highest_salary,

224

STDDEV(salary) as salary_stddev,

225

VAR_SAMP(salary) as salary_variance

226

FROM Employees

227

GROUP BY department

228

""")

229

230

// Advanced aggregations with HAVING

231

val advancedAggregates = tEnv.sqlQuery("""

232

SELECT

233

department,

234

position,

235

COUNT(*) as count,

236

AVG(salary) as avg_salary,

237

PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY salary) as median_salary,

238

COLLECT(name) as employee_names

239

FROM Employees

240

GROUP BY department, position

241

HAVING COUNT(*) >= 3 AND AVG(salary) > 40000

242

""")

243

244

// Grouping sets and rollup

245

val groupingSets = tEnv.sqlQuery("""

246

SELECT

247

department,

248

position,

249

gender,

250

COUNT(*) as employee_count,

251

AVG(salary) as avg_salary

252

FROM Employees

253

GROUP BY GROUPING SETS (

254

(department, position, gender),

255

(department, position),

256

(department),

257

()

258

)

259

""")

260

```

261

262

### Window Functions in SQL

263

264

Analytical window functions for advanced data analysis.

265

266

```scala { .api }

267

// Time-based windows

268

val timeWindows = tEnv.sqlQuery("""

269

SELECT

270

user_id,

271

transaction_time,

272

amount,

273

TUMBLE_START(transaction_time, INTERVAL '1' HOUR) as window_start,

274

TUMBLE_END(transaction_time, INTERVAL '1' HOUR) as window_end,

275

SUM(amount) OVER (

276

PARTITION BY user_id, TUMBLE(transaction_time, INTERVAL '1' HOUR)

277

) as hourly_total

278

FROM Transactions

279

""")

280

281

// Ranking and analytical functions

282

val analyticalFunctions = tEnv.sqlQuery("""

283

SELECT

284

employee_id,

285

department,

286

salary,

287

ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank,

288

RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank_ties,

289

DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_dense_rank,

290

PERCENT_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_percent_rank,

291

NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) as salary_quartile

292

FROM Employees

293

""")

294

295

// Frame-based window functions

296

val frameFunctions = tEnv.sqlQuery("""

297

SELECT

298

employee_id,

299

salary,

300

hire_date,

301

SUM(salary) OVER (

302

ORDER BY hire_date

303

ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

304

) as running_salary_sum,

305

AVG(salary) OVER (

306

ORDER BY hire_date

307

ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING

308

) as moving_avg_salary,

309

LAG(salary, 1) OVER (ORDER BY hire_date) as prev_salary,

310

LEAD(salary, 1) OVER (ORDER BY hire_date) as next_salary,

311

FIRST_VALUE(salary) OVER (

312

PARTITION BY department

313

ORDER BY hire_date

314

ROWS UNBOUNDED PRECEDING

315

) as first_dept_salary

316

FROM Employees

317

""")

318

```

319

320

### Table-Valued Functions

321

322

Functions that return tables for complex data transformations.

323

324

```scala { .api }

325

// JSON parsing function (example of table-valued function usage)

326

val jsonParsing = tEnv.sqlQuery("""

327

SELECT

328

user_id,

329

json_data,

330

parsed.name,

331

parsed.age,

332

parsed.preferences

333

FROM Users u,

334

LATERAL TABLE(JSON_PARSE(u.json_data)) as parsed(name, age, preferences)

335

""")

336

337

// String splitting function

338

val stringSplitting = tEnv.sqlQuery("""

339

SELECT

340

user_id,

341

tag

342

FROM Users u,

343

LATERAL TABLE(SPLIT(u.tags, ',')) as tag_table(tag)

344

WHERE tag IS NOT NULL AND LENGTH(tag) > 0

345

""")

346

347

// Range generation function

348

val rangeGeneration = tEnv.sqlQuery("""

349

SELECT

350

generate_series.value as day_offset,

351

DATE_ADD(CURRENT_DATE, generate_series.value) as date

352

FROM LATERAL TABLE(GENERATE_SERIES(0, 30)) as generate_series(value)

353

""")

354

```

355

356

### Common Table Expressions (CTEs)

357

358

Hierarchical and recursive queries using WITH clauses.

359

360

```scala { .api }

361

// Basic CTE usage

362

val cteQuery = tEnv.sqlQuery("""

363

WITH department_stats AS (

364

SELECT

365

department,

366

COUNT(*) as emp_count,

367

AVG(salary) as avg_salary,

368

MAX(salary) as max_salary

369

FROM Employees

370

GROUP BY department

371

),

372

high_performing_depts AS (

373

SELECT department, avg_salary

374

FROM department_stats

375

WHERE emp_count >= 5 AND avg_salary > 50000

376

)

377

SELECT

378

e.name,

379

e.salary,

380

e.department,

381

hpd.avg_salary as dept_avg_salary,

382

(e.salary - hpd.avg_salary) as salary_diff

383

FROM Employees e

384

JOIN high_performing_depts hpd ON e.department = hpd.department

385

WHERE e.salary > hpd.avg_salary * 1.1

386

""")

387

388

// Recursive CTE for hierarchical data

389

val recursiveCTE = tEnv.sqlQuery("""

390

WITH RECURSIVE employee_hierarchy AS (

391

-- Anchor: top-level managers

392

SELECT employee_id, name, manager_id, 1 as level, name as path

393

FROM Employees

394

WHERE manager_id IS NULL

395

396

UNION ALL

397

398

-- Recursive: employees with managers

399

SELECT

400

e.employee_id,

401

e.name,

402

e.manager_id,

403

eh.level + 1,

404

eh.path || ' -> ' || e.name

405

FROM Employees e

406

JOIN employee_hierarchy eh ON e.manager_id = eh.employee_id

407

WHERE eh.level < 10 -- Prevent infinite recursion

408

)

409

SELECT employee_id, name, level, path

410

FROM employee_hierarchy

411

ORDER BY level, name

412

""")

413

```

414

415

### Advanced SQL Features

416

417

Complex SQL constructs for sophisticated data processing.

418

419

```scala { .api }

420

// Pivot operations (using CASE expressions)

421

val pivotQuery = tEnv.sqlQuery("""

422

SELECT

423

department,

424

SUM(CASE WHEN quarter = 'Q1' THEN sales END) as Q1_sales,

425

SUM(CASE WHEN quarter = 'Q2' THEN sales END) as Q2_sales,

426

SUM(CASE WHEN quarter = 'Q3' THEN sales END) as Q3_sales,

427

SUM(CASE WHEN quarter = 'Q4' THEN sales END) as Q4_sales

428

FROM QuarterlySales

429

GROUP BY department

430

""")

431

432

// Set operations

433

val setOperations = tEnv.sqlQuery("""

434

SELECT employee_id, name FROM CurrentEmployees

435

UNION

436

SELECT employee_id, name FROM FormerEmployees

437

438

INTERSECT

439

440

SELECT employee_id, name FROM EligibleForRehire

441

442

EXCEPT

443

444

SELECT employee_id, name FROM BlacklistedEmployees

445

""")

446

447

// Correlated subqueries

448

val correlatedSubqueries = tEnv.sqlQuery("""

449

SELECT

450

e1.name,

451

e1.salary,

452

e1.department,

453

(SELECT COUNT(*)

454

FROM Employees e2

455

WHERE e2.department = e1.department AND e2.salary > e1.salary

456

) as employees_with_higher_salary,

457

(SELECT AVG(salary)

458

FROM Employees e3

459

WHERE e3.department = e1.department

460

) as dept_avg_salary

461

FROM Employees e1

462

WHERE e1.salary > (

463

SELECT AVG(salary) * 1.2

464

FROM Employees e4

465

WHERE e4.department = e1.department

466

)

467

""")

468

```

469

470

### SQL Configuration and Optimization

471

472

SQL-specific configuration and query optimization hints.

473

474

```scala { .api }

475

// Query hints for optimization

476

val optimizedQuery = tEnv.sqlQuery("""

477

SELECT /*+ USE_HASH_JOIN(e, d) */

478

e.name,

479

d.department_name

480

FROM Employees e

481

JOIN /*+ BROADCAST(d) */ Departments d ON e.dept_id = d.id

482

""")

483

484

// Configuration for SQL execution

485

val tableConfig = tEnv.getConfig

486

tableConfig.setSqlDialect(SqlDialect.HIVE) // Set SQL dialect

487

tableConfig.getConfiguration.setString("table.optimizer.join-reorder-enabled", "true")

488

489

// Enable different SQL features

490

tEnv.getConfig.getConfiguration.setString("table.sql-dialect", "default")

491

tEnv.getConfig.getConfiguration.setBoolean("table.optimizer.agg-phase-strategy", true)

492

```

493

494

### Error Handling and Debugging

495

496

SQL-related error handling and query debugging techniques.

497

498

```scala { .api }

499

// Explain query plans

500

val queryPlan = tEnv.explainSql("""

501

SELECT department, COUNT(*), AVG(salary)

502

FROM Employees

503

WHERE hire_date > '2020-01-01'

504

GROUP BY department

505

HAVING COUNT(*) > 5

506

""")

507

508

println(queryPlan)

509

510

// Try-catch for SQL execution

511

try {

512

val result = tEnv.sqlQuery("SELECT * FROM NonExistentTable")

513

} catch {

514

case ex: ValidationException =>

515

println(s"SQL validation error: ${ex.getMessage}")

516

case ex: SqlParserException =>

517

println(s"SQL parsing error: ${ex.getMessage}")

518

case ex: TableException =>

519

println(s"Table API error: ${ex.getMessage}")

520

}

521

522

// Debug SQL execution

523

tEnv.getConfig.getConfiguration.setString("table.exec.resource.default-parallelism", "1")

524

tEnv.getConfig.getConfiguration.setBoolean("table.optimizer.distinct-agg.split.enabled", false)

525

```

526

527

## SQL Compatibility and Limitations

528

529

Understanding SQL feature support and limitations in Flink Table API.

530

531

```scala { .api }

532

// Supported SQL features:

533

// - Standard SQL:2011 features

534

// - Window functions (TUMBLE, HOP, SESSION)

535

// - User-defined functions (scalar, table, aggregate)

536

// - Complex data types (ROW, ARRAY, MAP)

537

// - JSON functions and operators

538

// - Regular expressions and pattern matching

539

540

// Limitations and considerations:

541

// - Some advanced SQL features may not be supported

542

// - Performance characteristics differ from traditional databases

543

// - Streaming vs batch semantics affect result consistency

544

// - Late data handling in streaming mode

545

// - Watermark and time attribute requirements for event time operations

546

```

547

548

## Types

549

550

```scala { .api }

551

// SQL-related exception types

552

class SqlParserException(message: String) extends RuntimeException(message)

553

class ValidationException(message: String) extends TableException(message)

554

555

// SQL dialect configuration

556

enum SqlDialect {

557

DEFAULT, HIVE

558

}

559

560

// Query execution result

561

trait QueryResult {

562

def print(): Unit

563

def collect(): java.util.List[Row]

564

}

565

```