or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-system.mddatastream-integration.mdindex.mdsql-execution.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.md

sql-execution.mddocs/

0

# SQL Execution

1

2

Flink's Table API provides comprehensive SQL execution capabilities supporting DDL, DML, and query operations with advanced features like statement batching, result streaming, and metadata access.

3

4

## Capabilities

5

6

### SQL Query Execution

7

8

Execute SQL queries and retrieve results as Table objects for further processing.

9

10

```java { .api }

11

/**

12

* Evaluates a SQL query on registered tables and returns the result as a Table

13

* @param query SQL query string (SELECT, WITH, VALUES, etc.)

14

* @return Table representing the query result

15

*/

16

Table sqlQuery(String query);

17

```

18

19

**Usage Examples:**

20

21

```java

22

// Simple SELECT query

23

Table customers = tableEnv.sqlQuery("SELECT * FROM customers WHERE age > 25");

24

25

// Complex analytical query

26

Table salesAnalysis = tableEnv.sqlQuery(

27

"WITH monthly_sales AS (" +

28

" SELECT " +

29

" EXTRACT(YEAR FROM order_date) as year," +

30

" EXTRACT(MONTH FROM order_date) as month," +

31

" product_category," +

32

" SUM(amount) as total_sales" +

33

" FROM orders " +

34

" WHERE order_date >= DATE '2024-01-01'" +

35

" GROUP BY EXTRACT(YEAR FROM order_date), EXTRACT(MONTH FROM order_date), product_category" +

36

") " +

37

"SELECT " +

38

" year, month, product_category, total_sales," +

39

" LAG(total_sales) OVER (PARTITION BY product_category ORDER BY year, month) as prev_month_sales," +

40

" total_sales - LAG(total_sales) OVER (PARTITION BY product_category ORDER BY year, month) as growth" +

41

"FROM monthly_sales " +

42

"ORDER BY year, month, product_category"

43

);

44

45

// Window aggregation for streaming data

46

Table windowedEvents = tableEnv.sqlQuery(

47

"SELECT " +

48

" user_id," +

49

" TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start," +

50

" TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end," +

51

" COUNT(*) as event_count," +

52

" SUM(event_value) as total_value" +

53

"FROM user_events " +

54

"GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' HOUR)"

55

);

56

```

57

58

### SQL Statement Execution

59

60

Execute DDL, DML, and utility statements with execution results and metadata.

61

62

```java { .api }

63

/**

64

* Executes a single SQL statement and returns the execution result

65

* @param statement SQL statement (CREATE, DROP, INSERT, DESCRIBE, etc.)

66

* @return TableResult containing execution information and optional data

67

*/

68

TableResult executeSql(String statement);

69

```

70

71

**Usage Examples:**

72

73

```java

74

// DDL - Create table

75

TableResult createResult = tableEnv.executeSql(

76

"CREATE TABLE orders (" +

77

" order_id BIGINT," +

78

" customer_id BIGINT," +

79

" product_id BIGINT," +

80

" quantity INT," +

81

" unit_price DECIMAL(10, 2)," +

82

" order_time TIMESTAMP(3)," +

83

" WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND" +

84

") WITH (" +

85

" 'connector' = 'kafka'," +

86

" 'topic' = 'orders'," +

87

" 'properties.bootstrap.servers' = 'localhost:9092'," +

88

" 'format' = 'json'" +

89

")"

90

);

91

92

// DDL - Create view

93

tableEnv.executeSql(

94

"CREATE VIEW high_value_customers AS " +

95

"SELECT customer_id, SUM(quantity * unit_price) as total_spent " +

96

"FROM orders " +

97

"GROUP BY customer_id " +

98

"HAVING SUM(quantity * unit_price) > 1000"

99

);

100

101

// DML - Insert data

102

TableResult insertResult = tableEnv.executeSql(

103

"INSERT INTO customer_summary " +

104

"SELECT customer_id, COUNT(*) as order_count, SUM(quantity * unit_price) as total_spent " +

105

"FROM orders " +

106

"GROUP BY customer_id"

107

);

108

109

// Utility - Describe table

110

TableResult describeResult = tableEnv.executeSql("DESCRIBE orders");

111

describeResult.print();

112

113

// Utility - Show tables

114

TableResult showResult = tableEnv.executeSql("SHOW TABLES");

115

```

116

117

### Statement Set Operations

118

119

Batch multiple statements for efficient execution and dependency management.

120

121

```java { .api }

122

/**

123

* Creates a StatementSet for batch execution of multiple statements

124

* @return StatementSet instance for adding multiple operations

125

*/

126

StatementSet createStatementSet();

127

128

interface StatementSet {

129

/**

130

* Adds an INSERT SQL statement to the set

131

* @param statement INSERT SQL statement

132

* @return StatementSet for method chaining

133

*/

134

StatementSet addInsertSql(String statement);

135

136

/**

137

* Adds a table insertion to the set

138

* @param targetPath Target table path

139

* @param table Source table to insert

140

* @return StatementSet for method chaining

141

*/

142

StatementSet addInsert(String targetPath, Table table);

143

144

/**

145

* Explains the execution plan for all statements

146

* @return String representation of the execution plan

147

*/

148

String explain();

149

150

/**

151

* Executes all statements in the set

152

* @return TableResult with execution information

153

*/

154

TableResult execute();

155

}

156

```

157

158

**Usage Examples:**

159

160

```java

161

// Batch multiple related insertions

162

StatementSet stmtSet = tableEnv.createStatementSet();

163

164

// Add SQL insertions

165

stmtSet.addInsertSql(

166

"INSERT INTO daily_sales " +

167

"SELECT DATE(order_time) as sale_date, SUM(quantity * unit_price) as daily_total " +

168

"FROM orders " +

169

"WHERE DATE(order_time) = CURRENT_DATE " +

170

"GROUP BY DATE(order_time)"

171

);

172

173

stmtSet.addInsertSql(

174

"INSERT INTO product_sales " +

175

"SELECT product_id, COUNT(*) as order_count, SUM(quantity) as total_quantity " +

176

"FROM orders " +

177

"WHERE DATE(order_time) = CURRENT_DATE " +

178

"GROUP BY product_id"

179

);

180

181

// Add table insertions

182

Table customerMetrics = tableEnv.from("orders")

183

.groupBy($("customer_id"))

184

.select(

185

$("customer_id"),

186

$("order_id").count().as("order_count"),

187

$("quantity").sum().as("total_items")

188

);

189

190

stmtSet.addInsert("customer_metrics", customerMetrics);

191

192

// Execute all statements together

193

System.out.println(stmtSet.explain());

194

TableResult batchResult = stmtSet.execute();

195

```

196

197

### Result Handling

198

199

Process and consume query results with various access patterns.

200

201

```java { .api }

202

interface TableResult extends AutoCloseable {

203

/**

204

* Gets the result kind indicating success or content availability

205

* @return ResultKind (SUCCESS, SUCCESS_WITH_CONTENT)

206

*/

207

ResultKind getResultKind();

208

209

/**

210

* Gets the schema of the result

211

* @return ResolvedSchema describing result structure

212

*/

213

ResolvedSchema getResolvedSchema();

214

215

/**

216

* Collects all result rows as an iterator

217

* @return CloseableIterator for consuming result rows

218

*/

219

CloseableIterator<Row> collect();

220

221

/**

222

* Prints the result to standard output

223

*/

224

void print();

225

226

/**

227

* Prints the result with row limit

228

* @param maxNumRows Maximum number of rows to print

229

*/

230

void print(int maxNumRows);

231

232

/**

233

* Gets the job client for the executed job (for streaming queries)

234

* @return Optional JobClient for job monitoring

235

*/

236

Optional<JobClient> getJobClient();

237

}

238

239

enum ResultKind {

240

/** Statement executed successfully without result data */

241

SUCCESS,

242

/** Statement executed successfully with result data */

243

SUCCESS_WITH_CONTENT

244

}

245

```

246

247

**Usage Examples:**

248

249

```java

250

// Query execution with result processing

251

Table query = tableEnv.sqlQuery("SELECT customer_id, COUNT(*) as orders FROM orders GROUP BY customer_id");

252

TableResult result = query.execute();

253

254

// Check result type

255

if (result.getResultKind() == ResultKind.SUCCESS_WITH_CONTENT) {

256

// Access schema information

257

ResolvedSchema schema = result.getResolvedSchema();

258

System.out.println("Columns: " + schema.getColumnNames());

259

260

// Process results with iterator

261

try (CloseableIterator<Row> iterator = result.collect()) {

262

while (iterator.hasNext()) {

263

Row row = iterator.next();

264

Long customerId = row.getFieldAs(0);

265

Long orderCount = row.getFieldAs(1);

266

System.out.println("Customer " + customerId + " has " + orderCount + " orders");

267

}

268

}

269

}

270

271

// Simple result printing

272

tableEnv.sqlQuery("SELECT * FROM customers LIMIT 10").execute().print();

273

274

// Limited printing

275

tableEnv.sqlQuery("SELECT * FROM large_table").execute().print(100);

276

```

277

278

### Advanced SQL Features

279

280

Support for advanced SQL constructs and streaming-specific features.

281

282

```java { .api }

283

// Advanced SQL constructs examples - these are strings passed to sqlQuery() or executeSql()

284

```

285

286

**Usage Examples:**

287

288

```java

289

// Common Table Expressions (CTE)

290

Table cteQuery = tableEnv.sqlQuery(

291

"WITH RECURSIVE category_hierarchy AS (" +

292

" SELECT category_id, category_name, parent_id, 0 as level " +

293

" FROM categories WHERE parent_id IS NULL " +

294

" UNION ALL " +

295

" SELECT c.category_id, c.category_name, c.parent_id, ch.level + 1 " +

296

" FROM categories c " +

297

" JOIN category_hierarchy ch ON c.parent_id = ch.category_id" +

298

") " +

299

"SELECT * FROM category_hierarchy"

300

);

301

302

// Window functions

303

Table windowQuery = tableEnv.sqlQuery(

304

"SELECT " +

305

" customer_id, order_date, amount," +

306

" ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date) as order_sequence," +

307

" SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date " +

308

" ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total," +

309

" AVG(amount) OVER (PARTITION BY customer_id ORDER BY order_date " +

310

" ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) as moving_avg" +

311

"FROM orders"

312

);

313

314

// Streaming-specific: Event time processing

315

Table streamingQuery = tableEnv.sqlQuery(

316

"SELECT " +

317

" user_id," +

318

" HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) as window_start," +

319

" HOP_END(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) as window_end," +

320

" COUNT(*) as event_count," +

321

" MAX(event_value) as max_value" +

322

"FROM events " +

323

"GROUP BY user_id, HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)"

324

);

325

326

// Pattern matching with MATCH_RECOGNIZE

327

Table patternQuery = tableEnv.sqlQuery(

328

"SELECT customer_id, start_time, end_time, total_amount " +

329

"FROM orders " +

330

"MATCH_RECOGNIZE (" +

331

" PARTITION BY customer_id " +

332

" ORDER BY order_time " +

333

" MEASURES " +

334

" FIRST(A.order_time) as start_time," +

335

" LAST(C.order_time) as end_time," +

336

" SUM(A.amount + B.amount + C.amount) as total_amount " +

337

" PATTERN (A B+ C) " +

338

" DEFINE " +

339

" A as A.amount > 100," +

340

" B as B.amount > A.amount," +

341

" C as C.amount < B.amount" +

342

")"

343

);

344

345

// JSON processing

346

Table jsonQuery = tableEnv.sqlQuery(

347

"SELECT " +

348

" user_id," +

349

" JSON_EXTRACT(user_profile, '$.preferences.language') as preferred_language," +

350

" JSON_EXTRACT(user_profile, '$.address.country') as country," +

351

" JSON_QUERY(user_profile, '$.purchase_history[*].product_id') as purchased_products" +

352

"FROM users " +

353

"WHERE JSON_EXISTS(user_profile, '$.preferences')"

354

);

355

```

356

357

### SQL Dialect Support

358

359

Configure SQL dialect for compatibility with different SQL engines.

360

361

```java { .api }

362

/**

363

* Gets the table configuration for SQL dialect settings

364

* @return TableConfig for accessing dialect configuration

365

*/

366

TableConfig getConfig();

367

368

interface TableConfig {

369

/**

370

* Sets the SQL dialect for parsing

371

* @param sqlDialect Dialect to use (DEFAULT, HIVE)

372

*/

373

void setSqlDialect(SqlDialect sqlDialect);

374

375

/**

376

* Gets the current SQL dialect

377

* @return Currently configured SQL dialect

378

*/

379

SqlDialect getSqlDialect();

380

}

381

382

enum SqlDialect {

383

/** Default Flink SQL dialect */

384

DEFAULT,

385

/** Hive-compatible SQL dialect */

386

HIVE

387

}

388

```

389

390

**Usage Examples:**

391

392

```java

393

// Switch to Hive dialect for compatibility

394

tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

395

396

// Hive-specific SQL features

397

tableEnv.executeSql(

398

"CREATE TABLE hive_table (" +

399

" id BIGINT," +

400

" name STRING," +

401

" partition_date STRING" +

402

") PARTITIONED BY (partition_date) " +

403

"STORED AS PARQUET"

404

);

405

406

// Switch back to default Flink dialect

407

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

408

```

409

410

## Types

411

412

### Result Types

413

414

```java { .api }

415

interface TableResult extends AutoCloseable {

416

ResultKind getResultKind();

417

ResolvedSchema getResolvedSchema();

418

CloseableIterator<Row> collect();

419

void print();

420

void print(int maxNumRows);

421

Optional<JobClient> getJobClient();

422

void close();

423

}

424

425

enum ResultKind {

426

SUCCESS,

427

SUCCESS_WITH_CONTENT

428

}

429

430

interface StatementSet {

431

StatementSet addInsertSql(String statement);

432

StatementSet addInsert(String targetPath, Table table);

433

String explain();

434

TableResult execute();

435

}

436

```

437

438

### Row Access

439

440

```java { .api }

441

class Row {

442

Object getField(int pos);

443

Object getField(String name);

444

<T> T getFieldAs(int pos);

445

<T> T getFieldAs(String name);

446

int getArity();

447

RowKind getKind();

448

449

// Factory methods

450

static Row of(Object... values);

451

static Row withNames(RowKind kind);

452

static Row withPositions(RowKind kind);

453

}

454

455

enum RowKind {

456

INSERT, // +I

457

UPDATE_BEFORE, // -U

458

UPDATE_AFTER, // +U

459

DELETE // -D

460

}

461

```

462

463

### Iterator Support

464

465

```java { .api }

466

interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {

467

boolean hasNext();

468

T next();

469

void close();

470

}

471

```