or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation-grouping.mdcatalog-management.mdexpressions.mdindex.mdsql-integration.mdtable-environment.mdtable-operations.mduser-defined-functions.mdwindow-operations.md

sql-integration.mddocs/

0

# SQL Integration

1

2

Flink Table API provides seamless integration with SQL, allowing you to mix SQL statements with Table API operations. The system supports full SQL DDL, DML, and query capabilities alongside programmatic table operations.

3

4

## Capabilities

5

6

### SQL Query Execution

7

8

Execute SQL SELECT queries and retrieve results as Table objects.

9

10

```java { .api }

11

/**

12

* Executes a SQL query and returns the result as a Table

13

* @param query SQL SELECT statement

14

* @return Table containing query results

15

*/

16

public Table sqlQuery(String query);

17

```

18

19

**Usage Examples:**

20

21

```java

22

// Basic SQL query

23

Table result = tableEnv.sqlQuery(

24

"SELECT customer_id, SUM(amount) as total_amount " +

25

"FROM orders " +

26

"WHERE order_date >= '2023-01-01' " +

27

"GROUP BY customer_id " +

28

"HAVING SUM(amount) > 1000"

29

);

30

31

// Complex joins and window functions

32

Table analyticsResult = tableEnv.sqlQuery(

33

"SELECT " +

34

" c.customer_name, " +

35

" o.order_date, " +

36

" o.amount, " +

37

" SUM(o.amount) OVER (PARTITION BY c.customer_id ORDER BY o.order_date) as running_total, " +

38

" ROW_NUMBER() OVER (PARTITION BY c.customer_id ORDER BY o.amount DESC) as amount_rank " +

39

"FROM customers c " +

40

"JOIN orders o ON c.customer_id = o.customer_id " +

41

"WHERE o.order_date >= CURRENT_DATE - INTERVAL '30' DAY"

42

);

43

44

// Subqueries and CTEs

45

Table complexQuery = tableEnv.sqlQuery(

46

"WITH monthly_sales AS ( " +

47

" SELECT " +

48

" EXTRACT(YEAR FROM order_date) as year, " +

49

" EXTRACT(MONTH FROM order_date) as month, " +

50

" SUM(amount) as monthly_total " +

51

" FROM orders " +

52

" GROUP BY EXTRACT(YEAR FROM order_date), EXTRACT(MONTH FROM order_date) " +

53

"), " +

54

"avg_sales AS ( " +

55

" SELECT AVG(monthly_total) as avg_monthly " +

56

" FROM monthly_sales " +

57

") " +

58

"SELECT ms.year, ms.month, ms.monthly_total, " +

59

" ms.monthly_total - a.avg_monthly as deviation " +

60

"FROM monthly_sales ms CROSS JOIN avg_sales a " +

61

"ORDER BY ms.year, ms.month"

62

);

63

```

64

65

### SQL Statement Execution

66

67

Execute SQL DDL and DML statements that modify schema or data.

68

69

```java { .api }

70

/**

71

* Executes a SQL statement and returns execution results

72

* @param statement SQL DDL, DML, or query statement

73

* @return TableResult containing execution status and results

74

*/

75

public TableResult executeSql(String statement);

76

```

77

78

**Usage Examples:**

79

80

```java

81

// Create table with SQL DDL

82

tableEnv.executeSql(

83

"CREATE TABLE user_behavior ( " +

84

" user_id BIGINT, " +

85

" item_id BIGINT, " +

86

" category_id BIGINT, " +

87

" behavior STRING, " +

88

" ts TIMESTAMP(3), " +

89

" WATERMARK FOR ts AS ts - INTERVAL '5' SECOND " +

90

") WITH ( " +

91

" 'connector' = 'kafka', " +

92

" 'topic' = 'user_behavior', " +

93

" 'properties.bootstrap.servers' = 'localhost:9092', " +

94

" 'format' = 'json' " +

95

")"

96

);

97

98

// Create sink table

99

tableEnv.executeSql(

100

"CREATE TABLE result_table ( " +

101

" user_id BIGINT, " +

102

" behavior_count BIGINT, " +

103

" PRIMARY KEY (user_id) NOT ENFORCED " +

104

") WITH ( " +

105

" 'connector' = 'jdbc', " +

106

" 'url' = 'jdbc:mysql://localhost:3306/test', " +

107

" 'table-name' = 'user_behavior_count' " +

108

")"

109

);

110

111

// Insert data with SQL

112

TableResult insertResult = tableEnv.executeSql(

113

"INSERT INTO result_table " +

114

"SELECT user_id, COUNT(*) as behavior_count " +

115

"FROM user_behavior " +

116

"WHERE behavior = 'purchase' " +

117

"GROUP BY user_id"

118

);

119

120

// Get job client for monitoring

121

Optional<JobClient> jobClient = insertResult.getJobClient();

122

if (jobClient.isPresent()) {

123

System.out.println("Job ID: " + jobClient.get().getJobID());

124

}

125

```

126

127

### Statement Sets for Batch Execution

128

129

Group multiple DML statements for efficient batch execution.

130

131

```java { .api }

132

/**

133

* Creates a StatementSet for batching multiple statements

134

* @return StatementSet for adding multiple statements

135

*/

136

public StatementSet createStatementSet();

137

138

public interface StatementSet {

139

/**

140

* Adds an INSERT SQL statement to the set

141

* @param statement INSERT SQL statement

142

* @return StatementSet for method chaining

143

*/

144

StatementSet addInsertSql(String statement);

145

146

/**

147

* Adds an INSERT operation from a Table to the set

148

* @param targetPath Target table path

149

* @param table Table to insert

150

* @return StatementSet for method chaining

151

*/

152

StatementSet addInsert(String targetPath, Table table);

153

154

/**

155

* Executes all statements in the set

156

* @return TableResult containing execution status

157

*/

158

TableResult execute();

159

160

/**

161

* Explains the execution plan for all statements in the set

162

* @return String representation of the execution plan

163

*/

164

String explain();

165

}

166

```

167

168

**Usage Examples:**

169

170

```java

171

// Create statement set for multiple inserts

172

StatementSet stmtSet = tableEnv.createStatementSet();

173

174

// Add multiple SQL insert statements

175

stmtSet.addInsertSql(

176

"INSERT INTO daily_summary " +

177

"SELECT DATE(order_date) as day, COUNT(*) as order_count, SUM(amount) as total_amount " +

178

"FROM orders " +

179

"WHERE order_date >= CURRENT_DATE - INTERVAL '1' DAY " +

180

"GROUP BY DATE(order_date)"

181

);

182

183

stmtSet.addInsertSql(

184

"INSERT INTO customer_summary " +

185

"SELECT customer_id, COUNT(*) as order_count, SUM(amount) as total_spent " +

186

"FROM orders " +

187

"WHERE order_date >= CURRENT_DATE - INTERVAL '1' DAY " +

188

"GROUP BY customer_id"

189

);

190

191

// Add Table API insert

192

Table hourlyStats = sourceTable

193

.window(Tumble.over(lit(1).hour()).on($("event_time")).as("hourly"))

194

.groupBy($("hourly"))

195

.select($("hourly").start().as("hour"), count($("*")).as("event_count"));

196

197

stmtSet.addInsert("hourly_stats", hourlyStats);

198

199

// Execute all statements together

200

TableResult result = stmtSet.execute();

201

202

// Explain execution plan

203

String plan = stmtSet.explain();

204

System.out.println("Execution plan:\n" + plan);

205

```

206

207

### SQL and Table API Interoperability

208

209

Seamlessly mix SQL queries with Table API operations.

210

211

**Usage Examples:**

212

213

```java

214

// Start with SQL, continue with Table API

215

Table sqlTable = tableEnv.sqlQuery(

216

"SELECT customer_id, order_date, amount " +

217

"FROM orders " +

218

"WHERE amount > 100"

219

);

220

221

Table processed = sqlTable

222

.filter($("amount").isGreater(500))

223

.groupBy($("customer_id"))

224

.select($("customer_id"), $("amount").avg().as("avg_amount"));

225

226

// Start with Table API, continue with SQL

227

Table apiTable = sourceTable

228

.select($("id"), $("name"), $("salary"))

229

.filter($("salary").isGreater(50000));

230

231

// Register as temporary view for SQL access

232

tableEnv.createTemporaryView("high_earners", apiTable);

233

234

Table sqlResult = tableEnv.sqlQuery(

235

"SELECT name, salary, " +

236

" CASE WHEN salary > 100000 THEN 'Senior' " +

237

" WHEN salary > 75000 THEN 'Mid' " +

238

" ELSE 'Junior' END as level " +

239

"FROM high_earners " +

240

"ORDER BY salary DESC"

241

);

242

243

// Chain back to Table API

244

Table finalResult = sqlResult

245

.groupBy($("level"))

246

.select($("level"), count($("*")).as("count"), $("salary").avg().as("avg_salary"));

247

```

248

249

### Advanced SQL Features

250

251

Support for window functions, complex expressions, and advanced SQL constructs.

252

253

**Usage Examples:**

254

255

```java

256

// Window functions and analytical queries

257

Table windowAnalysis = tableEnv.sqlQuery(

258

"SELECT " +

259

" product_id, " +

260

" sale_date, " +

261

" daily_sales, " +

262

" -- Moving average over 7 days " +

263

" AVG(daily_sales) OVER ( " +

264

" PARTITION BY product_id " +

265

" ORDER BY sale_date " +

266

" ROWS BETWEEN 6 PRECEDING AND CURRENT ROW " +

267

" ) as moving_avg_7d, " +

268

" -- Cumulative sales " +

269

" SUM(daily_sales) OVER ( " +

270

" PARTITION BY product_id " +

271

" ORDER BY sale_date " +

272

" ROWS UNBOUNDED PRECEDING " +

273

" ) as cumulative_sales, " +

274

" -- Rank by sales within each month " +

275

" RANK() OVER ( " +

276

" PARTITION BY product_id, EXTRACT(YEAR_MONTH FROM sale_date) " +

277

" ORDER BY daily_sales DESC " +

278

" ) as monthly_rank " +

279

"FROM daily_product_sales"

280

);

281

282

// Complex case expressions and functions

283

Table caseAnalysis = tableEnv.sqlQuery(

284

"SELECT " +

285

" customer_id, " +

286

" order_count, " +

287

" total_amount, " +

288

" CASE " +

289

" WHEN total_amount > 10000 AND order_count > 20 THEN 'VIP' " +

290

" WHEN total_amount > 5000 OR order_count > 10 THEN 'Premium' " +

291

" WHEN total_amount > 1000 THEN 'Regular' " +

292

" ELSE 'New' " +

293

" END as customer_tier, " +

294

" -- Calculate percentile rank " +

295

" PERCENT_RANK() OVER (ORDER BY total_amount) as amount_percentile, " +

296

" -- String manipulation " +

297

" CONCAT('CUSTOMER_', LPAD(CAST(customer_id AS STRING), 8, '0')) as customer_code, " +

298

" -- Date functions " +

299

" EXTRACT(DAYOFWEEK FROM first_order_date) as first_order_dow, " +

300

" DATEDIFF(last_order_date, first_order_date) as customer_lifetime_days " +

301

"FROM customer_summary"

302

);

303

304

// Array and map operations (if supported by connectors)

305

Table arrayOperations = tableEnv.sqlQuery(

306

"SELECT " +

307

" user_id, " +

308

" tags, " +

309

" CARDINALITY(tags) as tag_count, " +

310

" tags[1] as primary_tag, " +

311

" -- Check if array contains specific value " +

312

" 'premium' = ANY(tags) as is_premium_user, " +

313

" -- String aggregation " +

314

" ARRAY_JOIN(tags, ', ') as tags_string " +

315

"FROM user_profiles " +

316

"WHERE CARDINALITY(tags) > 0"

317

);

318

```

319

320

### SQL Temporal Features

321

322

Time-based operations and temporal table functions in SQL.

323

324

**Usage Examples:**

325

326

```java

327

// Temporal joins (for lookups with time-based versioning)

328

Table temporalJoin = tableEnv.sqlQuery(

329

"SELECT " +

330

" o.order_id, " +

331

" o.customer_id, " +

332

" o.product_id, " +

333

" o.order_time, " +

334

" o.amount, " +

335

" p.product_name, " +

336

" p.category, " +

337

" -- Get product info as of order time " +

338

" p.price as price_at_order_time " +

339

"FROM orders o " +

340

"JOIN product_changelog FOR SYSTEM_TIME AS OF o.order_time AS p " +

341

" ON o.product_id = p.product_id"

342

);

343

344

// Interval operations

345

Table intervalQuery = tableEnv.sqlQuery(

346

"SELECT " +

347

" user_id, " +

348

" login_time, " +

349

" logout_time, " +

350

" logout_time - login_time as session_duration, " +

351

" -- Check if session was longer than 30 minutes " +

352

" logout_time - login_time > INTERVAL '30' MINUTE as long_session, " +

353

" -- Add time intervals " +

354

" login_time + INTERVAL '1' HOUR as expected_timeout " +

355

"FROM user_sessions " +

356

"WHERE logout_time - login_time > INTERVAL '1' MINUTE"

357

);

358

359

// Watermark and event time operations

360

Table eventTimeQuery = tableEnv.sqlQuery(

361

"SELECT " +

362

" sensor_id, " +

363

" TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start, " +

364

" TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end, " +

365

" COUNT(*) as reading_count, " +

366

" AVG(temperature) as avg_temperature, " +

367

" MIN(temperature) as min_temperature, " +

368

" MAX(temperature) as max_temperature " +

369

"FROM sensor_readings " +

370

"GROUP BY sensor_id, TUMBLE(event_time, INTERVAL '1' HOUR)"

371

);

372

```

373

374

### SQL Configuration and Dialects

375

376

Configure SQL parsing and execution behavior.

377

378

```java { .api }

379

/**

380

* SQL dialect configuration

381

*/

382

public enum SqlDialect {

383

/** Default Flink SQL dialect */

384

DEFAULT,

385

/** Hive-compatible SQL dialect for migration scenarios */

386

HIVE

387

}

388

```

389

390

**Usage Examples:**

391

392

```java

393

// Set SQL dialect

394

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

395

396

// Hive dialect for compatibility

397

tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

398

399

// Use Hive-specific syntax when in Hive dialect

400

if (tableEnv.getConfig().getSqlDialect() == SqlDialect.HIVE) {

401

tableEnv.executeSql(

402

"CREATE TABLE hive_table ( " +

403

" id BIGINT, " +

404

" name STRING, " +

405

" part_date STRING " +

406

") PARTITIONED BY (part_date) " +

407

"STORED AS PARQUET " +

408

"LOCATION '/warehouse/hive_table'"

409

);

410

}

411

412

// Configuration through table config

413

Configuration config = tableEnv.getConfig().getConfiguration();

414

config.setString("table.sql-dialect", "default");

415

config.setString("table.local-time-zone", "UTC");

416

```

417

418

### SQL Error Handling

419

420

Handle SQL parsing and execution errors.

421

422

```java { .api }

423

/**

424

* Exception thrown for SQL parsing errors

425

*/

426

public class SqlParserException extends RuntimeException {

427

public SqlParserException(String message);

428

public SqlParserException(String message, Throwable cause);

429

}

430

431

/**

432

* Exception for SQL parsing errors at end of input

433

*/

434

public class SqlParserEOFException extends SqlParserException {

435

public SqlParserEOFException(String message);

436

}

437

```

438

439

**Usage Examples:**

440

441

```java

442

// Handle SQL parsing errors

443

try {

444

Table result = tableEnv.sqlQuery(

445

"SELECT customer_id, SUM(amount " + // Missing closing parenthesis

446

"FROM orders " +

447

"GROUP BY customer_id"

448

);

449

} catch (SqlParserException e) {

450

System.err.println("SQL parsing failed: " + e.getMessage());

451

// Handle parsing error, perhaps show user-friendly error message

452

}

453

454

// Handle execution errors

455

try {

456

TableResult result = tableEnv.executeSql(

457

"INSERT INTO non_existent_table " +

458

"SELECT * FROM orders"

459

);

460

} catch (Exception e) {

461

System.err.println("SQL execution failed: " + e.getMessage());

462

// Handle execution error

463

}

464

465

// Validate SQL before execution

466

try {

467

String sql = "SELECT * FROM orders WHERE amount > ?";

468

// In practice, you might want to validate parameter binding

469

Table validated = tableEnv.sqlQuery(sql.replace("?", "100"));

470

System.out.println("SQL is valid");

471

} catch (SqlParserException e) {

472

System.err.println("Invalid SQL: " + e.getMessage());

473

}

474

```

475

476

### DDL Operations via SQL

477

478

Create and manage database objects using SQL DDL.

479

480

**Usage Examples:**

481

482

```java

483

// Create catalog

484

tableEnv.executeSql(

485

"CREATE CATALOG my_catalog WITH ( " +

486

" 'type' = 'hive', " +

487

" 'hive-conf-dir' = '/opt/hive/conf' " +

488

")"

489

);

490

491

// Create database

492

tableEnv.executeSql(

493

"CREATE DATABASE IF NOT EXISTS analytics " +

494

"COMMENT 'Analytics database for business intelligence'"

495

);

496

497

// Create function

498

tableEnv.executeSql(

499

"CREATE TEMPORARY FUNCTION hash_func AS 'com.example.HashFunction'"

500

);

501

502

// Create view

503

tableEnv.executeSql(

504

"CREATE VIEW high_value_customers AS " +

505

"SELECT customer_id, SUM(amount) as total_spent " +

506

"FROM orders " +

507

"GROUP BY customer_id " +

508

"HAVING SUM(amount) > 5000"

509

);

510

511

// Alter table

512

tableEnv.executeSql(

513

"ALTER TABLE orders ADD COLUMN discount_amount DECIMAL(10,2)"

514

);

515

516

// Drop objects

517

tableEnv.executeSql("DROP VIEW IF EXISTS high_value_customers");

518

tableEnv.executeSql("DROP TEMPORARY FUNCTION hash_func");

519

tableEnv.executeSql("DROP TABLE temp_results");

520

```