or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-management.mdcomplex-event-processing.mdcore-table-operations.mddatastream-integration.mdindex.mdsql-processing.mdtype-system.mduser-defined-functions.mdwindow-operations.md

sql-processing.mddocs/

0

# SQL and Query Processing

1

2

This document covers SQL DDL, DML, and query capabilities including SQL parsing, execution, and Hive compatibility features in Apache Flink Table Uber Blink.

3

4

## SQL Execution

5

6

### SQL Statement Execution

7

8

```java { .api }

9

interface TableEnvironment {

10

TableResult executeSql(String statement);

11

Table sqlQuery(String query);

12

StatementSet createStatementSet();

13

}

14

```

15

16

**Usage:**

17

18

```java

19

// Execute DDL statements

20

tEnv.executeSql("CREATE DATABASE mydb");

21

tEnv.executeSql("USE mydb");

22

23

// Execute DML statements

24

TableResult result = tEnv.executeSql("INSERT INTO target_table SELECT * FROM source_table");

25

26

// Execute queries

27

Table queryResult = tEnv.sqlQuery("SELECT user_id, COUNT(*) as cnt FROM clicks GROUP BY user_id");

28

```

29

30

## Data Definition Language (DDL)

31

32

### Database Operations

33

34

```sql

35

-- Create database

36

CREATE DATABASE [IF NOT EXISTS] db_name [COMMENT 'comment'] [WITH (key1=val1, key2=val2, ...)];

37

38

-- Drop database

39

DROP DATABASE [IF EXISTS] db_name [RESTRICT|CASCADE];

40

41

-- Show databases

42

SHOW DATABASES;

43

44

-- Use database

45

USE db_name;

46

```

47

48

### Table Operations

49

50

```sql

51

-- Create table

52

CREATE TABLE [IF NOT EXISTS] table_name (

53

column_name column_type [COMMENT 'comment'],

54

[WATERMARK FOR rowtime_column AS watermark_strategy],

55

[PRIMARY KEY (column_list) NOT ENFORCED]

56

) [COMMENT 'comment']

57

[PARTITIONED BY (partition_column_list)]

58

WITH (

59

'connector' = 'connector_type',

60

'option_key' = 'option_value'

61

);

62

63

-- Create table as select

64

CREATE TABLE table_name WITH ('connector' = '...') AS SELECT ...;

65

66

-- Drop table

67

DROP TABLE [IF EXISTS] table_name;

68

69

-- Show tables

70

SHOW TABLES;

71

72

-- Describe table

73

DESCRIBE table_name;

74

DESC table_name;

75

```

76

77

**Usage:**

78

79

```java

80

// Create table with watermark

81

tEnv.executeSql(

82

"CREATE TABLE events (" +

83

" user_id BIGINT," +

84

" event_time TIMESTAMP(3)," +

85

" event_type STRING," +

86

" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +

87

") WITH (" +

88

" 'connector' = 'kafka'," +

89

" 'topic' = 'events'," +

90

" 'properties.bootstrap.servers' = 'localhost:9092'," +

91

" 'format' = 'json'" +

92

")"

93

);

94

```

95

96

### View Operations

97

98

```java { .api }

99

interface TableEnvironment {

100

void createTemporaryView(String path, Table view);

101

void createTemporaryView(String path, DataStream<?> dataStream);

102

void dropTemporaryView(String path);

103

}

104

```

105

106

**SQL:**

107

108

```sql

109

-- Create view

110

CREATE [TEMPORARY] VIEW view_name AS SELECT ...;

111

112

-- Drop view

113

DROP [TEMPORARY] VIEW [IF EXISTS] view_name;

114

115

-- Show views

116

SHOW VIEWS;

117

```

118

119

## Data Manipulation Language (DML)

120

121

### Insert Operations

122

123

```sql

124

-- Insert values

125

INSERT INTO table_name VALUES (value1, value2, ...);

126

127

-- Insert from select

128

INSERT INTO target_table SELECT * FROM source_table WHERE condition;

129

130

-- Insert overwrite

131

INSERT OVERWRITE target_table SELECT * FROM source_table;

132

```

133

134

### Update and Delete

135

136

```sql

137

-- Update (only supported for changelog streams)

138

UPDATE table_name SET column1 = value1 WHERE condition;

139

140

-- Delete (only supported for changelog streams)

141

DELETE FROM table_name WHERE condition;

142

```

143

144

## Query Language (DQL)

145

146

### Basic Queries

147

148

```sql

149

SELECT column_list

150

FROM table_name

151

[WHERE condition]

152

[GROUP BY column_list]

153

[HAVING condition]

154

[ORDER BY column_list]

155

[LIMIT number];

156

```

157

158

### Joins

159

160

```sql

161

-- Inner join

162

SELECT * FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id;

163

164

-- Outer joins

165

SELECT * FROM table1 t1 LEFT JOIN table2 t2 ON t1.id = t2.id;

166

SELECT * FROM table1 t1 RIGHT JOIN table2 t2 ON t1.id = t2.id;

167

SELECT * FROM table1 t1 FULL OUTER JOIN table2 t2 ON t1.id = t2.id;

168

169

-- Cross join

170

SELECT * FROM table1 CROSS JOIN table2;

171

```

172

173

### Window Functions

174

175

#### Tumbling Windows

176

177

```sql

178

SELECT

179

TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,

180

TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end,

181

user_id,

182

COUNT(*) as event_count

183

FROM events

184

GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), user_id;

185

```

186

187

#### Sliding Windows

188

189

```sql

190

SELECT

191

HOP_START(event_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) as window_start,

192

HOP_END(event_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) as window_end,

193

COUNT(*) as event_count

194

FROM events

195

GROUP BY HOP(event_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR);

196

```

197

198

#### Session Windows

199

200

```sql

201

SELECT

202

SESSION_START(event_time, INTERVAL '30' MINUTE) as session_start,

203

SESSION_END(event_time, INTERVAL '30' MINUTE) as session_end,

204

user_id,

205

COUNT(*) as event_count

206

FROM events

207

GROUP BY SESSION(event_time, INTERVAL '30' MINUTE), user_id;

208

```

209

210

## SQL Parser Configuration

211

212

```java { .api }

213

interface TableEnvironment {

214

SqlParser createParser(String statement);

215

}

216

217

class SqlParser {

218

List<SqlNode> parseStmtList(String sql);

219

SqlNode parseStmt(String sql);

220

SqlNode parseExpression(String sqlExpression);

221

}

222

223

enum SqlDialect {

224

DEFAULT,

225

HIVE

226

}

227

```

228

229

**Configuration:**

230

231

```java

232

// Set SQL dialect

233

Configuration config = new Configuration();

234

config.setString("table.sql-dialect", "hive");

235

TableEnvironment tEnv = TableEnvironment.create(config);

236

237

// Or via table config

238

tEnv.getConfig().getConfiguration().setString("table.sql-dialect", "hive");

239

```

240

241

## Hive Compatibility

242

243

### Hive SQL Dialect

244

245

When using Hive dialect, Flink supports Hive-specific SQL syntax:

246

247

```sql

248

-- Hive-style CTAS with storage format

249

CREATE TABLE target_table

250

STORED AS PARQUET

251

LOCATION '/path/to/table'

252

AS SELECT * FROM source_table;

253

254

-- Hive functions

255

SELECT concat_ws('|', col1, col2) FROM table1;

256

SELECT get_json_object(json_col, '$.field') FROM table2;

257

```

258

259

### Hive Catalog Integration

260

261

```java

262

// Register Hive catalog

263

HiveCatalog hive = new HiveCatalog("myhive", "default", "/path/to/hive-conf");

264

tEnv.registerCatalog("myhive", hive);

265

tEnv.useCatalog("myhive");

266

267

// Access Hive tables

268

Table hiveTable = tEnv.from("hive_database.hive_table");

269

```

270

271

## Built-in Functions

272

273

### Scalar Functions

274

275

```sql

276

-- String functions

277

SELECT UPPER(name), LOWER(email), LENGTH(description) FROM users;

278

SELECT CONCAT(first_name, ' ', last_name) as full_name FROM users;

279

SELECT SUBSTRING(text, 1, 10) FROM documents;

280

281

-- Math functions

282

SELECT ABS(balance), ROUND(price, 2), CEIL(rating) FROM products;

283

284

-- Date/Time functions

285

SELECT CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP;

286

SELECT EXTRACT(YEAR FROM order_date), DATE_FORMAT(created_at, 'yyyy-MM-dd') FROM orders;

287

288

-- Conditional functions

289

SELECT CASE WHEN age >= 18 THEN 'adult' ELSE 'minor' END FROM users;

290

SELECT COALESCE(nickname, first_name, 'Anonymous') FROM users;

291

```

292

293

### Aggregate Functions

294

295

```sql

296

-- Basic aggregates

297

SELECT COUNT(*), SUM(amount), AVG(rating), MIN(price), MAX(price) FROM products;

298

299

-- Statistical functions

300

SELECT STDDEV_POP(score), VAR_SAMP(rating) FROM reviews;

301

302

-- Collection functions

303

SELECT COLLECT(tags), LISTAGG(name, ',') FROM items GROUP BY category;

304

```

305

306

### Table Functions

307

308

```sql

309

-- String split

310

SELECT word FROM table1 CROSS JOIN UNNEST(SPLIT(description, ' ')) AS t(word);

311

312

-- JSON extraction

313

SELECT field_value FROM json_table CROSS JOIN UNNEST(JSON_EXTRACT_ARRAY(json_col)) AS t(field_value);

314

```

315

316

## Error Handling

317

318

SQL-specific exceptions:

319

320

```java { .api }

321

class SqlParserException extends TableException {

322

SqlParserException(String message);

323

SqlParserException(String message, Throwable cause);

324

}

325

326

class SqlExecutionException extends TableException;

327

class SqlValidationException extends ValidationException;

328

```

329

330

## Configuration Options

331

332

```java { .api }

333

class TableConfigOptions {

334

public static final ConfigOption<String> TABLE_SQL_DIALECT;

335

public static final ConfigOption<Duration> TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;

336

public static final ConfigOption<String> TABLE_LOCAL_TIME_ZONE;

337

}

338

```

339

340

**Common configurations:**

341

342

```java

343

Configuration config = tEnv.getConfig().getConfiguration();

344

345

// Set SQL dialect

346

config.setString("table.sql-dialect", "default");

347

348

// Set local timezone

349

config.setString("table.local-time-zone", "UTC");

350

351

// Set default parallelism

352

config.setInteger("table.exec.resource.default-parallelism", 4);

353

```

354

355

## Statement Sets

356

357

For batch execution of multiple statements:

358

359

```java { .api }

360

interface StatementSet {

361

StatementSet addInsertSql(String statement);

362

StatementSet addInsert(String targetPath, Table table);

363

StatementSet addInsert(String targetPath, Table table, boolean overwrite);

364

TableResult execute();

365

String explain();

366

}

367

```

368

369

**Usage:**

370

371

```java

372

StatementSet stmtSet = tEnv.createStatementSet();

373

stmtSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE type = 'A'");

374

stmtSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE type = 'B'");

375

stmtSet.execute();

376

```

377

378

## Types

379

380

```java { .api }

381

interface TableResult {

382

ResultKind getResultKind();

383

ResolvedSchema getResolvedSchema();

384

CloseableIterator<Row> collect();

385

void print();

386

JobExecutionResult getJobExecutionResult();

387

}

388

389

enum ResultKind {

390

SUCCESS,

391

SUCCESS_WITH_CONTENT,

392

NOT_READY

393

}

394

395

interface SqlNode;

396

interface SqlIdentifier extends SqlNode;

397

interface SqlCall extends SqlNode;

398

```