or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

command-line-interface.mdconfiguration-options.mdindex.mdresult-handling-display.mdsession-context-management.mdsql-client-application.mdsql-execution-gateway.md

sql-execution-gateway.mddocs/

0

# SQL Execution Gateway

1

2

Core execution interface providing session management, SQL parsing, and operation execution. The gateway abstracts the underlying Flink execution environment and provides a unified API for SQL operations across different deployment modes.

3

4

## Capabilities

5

6

### Executor Interface

7

8

Primary interface for SQL execution operations with complete session lifecycle management.

9

10

```java { .api }

11

public interface Executor {

12

/**

13

* Initialize executor and ensure readiness for command execution

14

* @throws SqlExecutionException if initialization fails

15

*/

16

void start() throws SqlExecutionException;

17

18

/**

19

* Open new session with optional session identifier

20

* @param sessionId Desired session ID or null for auto-generation

21

* @return Actual session ID used for tracking

22

* @throws SqlExecutionException if session creation fails

23

*/

24

String openSession(String sessionId) throws SqlExecutionException;

25

26

/**

27

* Close session and release associated resources

28

* @param sessionId Session identifier to close

29

* @throws SqlExecutionException if session closure fails

30

*/

31

void closeSession(String sessionId) throws SqlExecutionException;

32

}

33

```

34

35

### Configuration Management

36

37

Session-level configuration management with property get/set operations.

38

39

```java { .api }

40

/**

41

* Get session configuration as Map for external access

42

* @param sessionId Session identifier

43

* @return Copy of all session configuration properties

44

* @throws SqlExecutionException if session not found

45

*/

46

Map<String, String> getSessionConfigMap(String sessionId) throws SqlExecutionException;

47

48

/**

49

* Get session configuration as ReadableConfig for type-safe access

50

* @param sessionId Session identifier

51

* @return ReadableConfig instance with session properties

52

* @throws SqlExecutionException if session not found

53

*/

54

ReadableConfig getSessionConfig(String sessionId) throws SqlExecutionException;

55

56

/**

57

* Reset all session properties to default values

58

* @param sessionId Session identifier

59

* @throws SqlExecutionException if reset fails

60

*/

61

void resetSessionProperties(String sessionId) throws SqlExecutionException;

62

63

/**

64

* Reset specific session property to default value

65

* @param sessionId Session identifier

66

* @param key Property key to reset

67

* @throws SqlExecutionException if reset fails

68

*/

69

void resetSessionProperty(String sessionId, String key) throws SqlExecutionException;

70

71

/**

72

* Set session property to specific value

73

* @param sessionId Session identifier

74

* @param key Property key

75

* @param value Property value

76

* @throws SqlExecutionException if property setting fails

77

*/

78

void setSessionProperty(String sessionId, String key, String value) throws SqlExecutionException;

79

```

80

81

**Usage Example:**

82

83

```java

84

// Configure session properties

85

executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "tableau");

86

executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "4");

87

88

// Get current configuration

89

Map<String, String> config = executor.getSessionConfigMap(sessionId);

90

ReadableConfig readableConfig = executor.getSessionConfig(sessionId);

91

92

// Reset properties

93

executor.resetSessionProperty(sessionId, "table.exec.resource.default-parallelism");

94

executor.resetSessionProperties(sessionId); // Reset all

95

```

96

97

### SQL Statement Processing

98

99

Parse and execute SQL statements with full operation lifecycle support.

100

101

```java { .api }

102

/**

103

* Parse SQL statement into executable Operation

104

* @param sessionId Session identifier for context

105

* @param statement SQL statement to parse

106

* @return Operation instance ready for execution

107

* @throws SqlExecutionException if parsing fails

108

*/

109

Operation parseStatement(String sessionId, String statement) throws SqlExecutionException;

110

111

/**

112

* Provide auto-completion suggestions for SQL statement

113

* @param sessionId Session identifier for context

114

* @param statement Partial SQL statement

115

* @param position Cursor position within statement

116

* @return List of completion suggestions

117

*/

118

List<String> completeStatement(String sessionId, String statement, int position);

119

120

/**

121

* Execute parsed operation and return result

122

* @param sessionId Session identifier

123

* @param operation Parsed operation to execute

124

* @return TableResult with operation results

125

* @throws SqlExecutionException if execution fails

126

*/

127

TableResult executeOperation(String sessionId, Operation operation) throws SqlExecutionException;

128

```

129

130

**Usage Example:**

131

132

```java

133

// Parse and execute statement

134

String sql = "CREATE TABLE users (id INT, name STRING) WITH ('connector' = 'datagen')";

135

Operation operation = executor.parseStatement(sessionId, sql);

136

TableResult result = executor.executeOperation(sessionId, operation);

137

138

// Auto-completion

139

List<String> suggestions = executor.completeStatement(sessionId, "SELECT * FROM use", 17);

140

// Returns: ["users", "user_events", ...] based on available tables

141

```

142

143

### Query Execution and Results

144

145

Execute queries with advanced result handling for streaming and batch operations.

146

147

```java { .api }

148

/**

149

* Execute SELECT query and return result descriptor for streaming access

150

* @param sessionId Session identifier

151

* @param query Parsed query operation

152

* @return ResultDescriptor for accessing query results

153

* @throws SqlExecutionException if query execution fails

154

*/

155

ResultDescriptor executeQuery(String sessionId, QueryOperation query) throws SqlExecutionException;

156

157

/**

158

* Retrieve incremental changes from streaming query result

159

* @param sessionId Session identifier

160

* @param resultId Result identifier from executeQuery

161

* @return TypedResult containing list of result rows or status

162

* @throws SqlExecutionException if retrieval fails

163

*/

164

TypedResult<List<Row>> retrieveResultChanges(String sessionId, String resultId) throws SqlExecutionException;

165

166

/**

167

* Create materialized snapshot of query result with pagination

168

* @param sessionId Session identifier

169

* @param resultId Result identifier from executeQuery

170

* @param pageSize Number of rows per page

171

* @return TypedResult containing total page count

172

* @throws SqlExecutionException if snapshot creation fails

173

*/

174

TypedResult<Integer> snapshotResult(String sessionId, String resultId, int pageSize) throws SqlExecutionException;

175

176

/**

177

* Retrieve specific page from materialized result snapshot

178

* @param resultId Result identifier

179

* @param page Page number (0-based)

180

* @return List of rows for the requested page

181

* @throws SqlExecutionException if page retrieval fails

182

*/

183

List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException;

184

185

/**

186

* Cancel running query and stop result generation

187

* @param sessionId Session identifier

188

* @param resultId Result identifier to cancel

189

* @throws SqlExecutionException if cancellation fails

190

*/

191

void cancelQuery(String sessionId, String resultId) throws SqlExecutionException;

192

```

193

194

**Usage Example:**

195

196

```java

197

// Execute streaming query

198

QueryOperation queryOp = (QueryOperation) executor.parseStatement(sessionId, "SELECT * FROM events");

199

ResultDescriptor descriptor = executor.executeQuery(sessionId, queryOp);

200

201

if (descriptor.isMaterialized()) {

202

// Handle materialized results with pagination

203

TypedResult<Integer> pageCount = executor.snapshotResult(sessionId, descriptor.getResultId(), 100);

204

if (pageCount.getType() == TypedResult.ResultType.PAYLOAD) {

205

for (int page = 0; page < pageCount.getPayload(); page++) {

206

List<Row> rows = executor.retrieveResultPage(descriptor.getResultId(), page);

207

// Process rows

208

}

209

}

210

} else {

211

// Handle streaming results

212

while (true) {

213

TypedResult<List<Row>> changes = executor.retrieveResultChanges(sessionId, descriptor.getResultId());

214

if (changes.getType() == TypedResult.ResultType.PAYLOAD) {

215

List<Row> rows = changes.getPayload();

216

// Process incremental changes

217

} else if (changes.getType() == TypedResult.ResultType.EOS) {

218

break; // End of stream

219

}

220

// Handle EMPTY results by continuing to poll

221

}

222

}

223

```

224

225

### Batch Modification Operations

226

227

Execute multiple modification operations as a batch for better performance.

228

229

```java { .api }

230

/**

231

* Execute multiple modification operations as batch

232

* @param sessionId Session identifier

233

* @param operations List of modification operations (INSERT, UPDATE, DELETE)

234

* @return TableResult with batch execution results

235

* @throws SqlExecutionException if batch execution fails

236

*/

237

TableResult executeModifyOperations(String sessionId, List<ModifyOperation> operations) throws SqlExecutionException;

238

```

239

240

**Usage Example:**

241

242

```java

243

// Batch multiple INSERT operations

244

List<ModifyOperation> operations = Arrays.asList(

245

(ModifyOperation) executor.parseStatement(sessionId, "INSERT INTO table1 VALUES (1, 'a')"),

246

(ModifyOperation) executor.parseStatement(sessionId, "INSERT INTO table2 VALUES (2, 'b')")

247

);

248

249

TableResult batchResult = executor.executeModifyOperations(sessionId, operations);

250

System.out.println("Batch job ID: " + batchResult.getJobClient().get().getJobID());

251

```

252

253

### JAR and Dependency Management

254

255

Manage session-level JAR dependencies for custom functions and connectors.

256

257

```java { .api }

258

/**

259

* Add JAR file to session classpath

260

* @param sessionId Session identifier

261

* @param jarPath Path to JAR file (local or remote URL)

262

*/

263

void addJar(String sessionId, String jarPath);

264

265

/**

266

* Remove JAR file from session classpath

267

* @param sessionId Session identifier

268

* @param jarPath Path to JAR file to remove

269

*/

270

void removeJar(String sessionId, String jarPath);

271

272

/**

273

* List all JAR files loaded in session

274

* @param sessionId Session identifier

275

* @return List of JAR file paths

276

*/

277

List<String> listJars(String sessionId);

278

```

279

280

**Usage Example:**

281

282

```java

283

// Add custom connector JAR

284

executor.addJar(sessionId, "/path/to/custom-connector-1.0.jar");

285

286

// Add JAR from URL

287

executor.addJar(sessionId, "https://repo.maven.apache.org/maven2/org/example/connector/1.0/connector-1.0.jar");

288

289

// List loaded JARs

290

List<String> jars = executor.listJars(sessionId);

291

jars.forEach(System.out::println);

292

293

// Remove JAR when no longer needed

294

executor.removeJar(sessionId, "/path/to/custom-connector-1.0.jar");

295

```

296

297

## LocalExecutor Implementation

298

299

Concrete implementation of Executor interface for embedded execution mode.

300

301

```java { .api }

302

public class LocalExecutor implements Executor {

303

/**

304

* Create local executor with default context

305

* @param defaultContext Default execution context with configuration and dependencies

306

*/

307

public LocalExecutor(DefaultContext defaultContext);

308

}

309

```

310

311

**Usage Example:**

312

313

```java

314

// Create local executor

315

DefaultContext context = LocalContextUtils.buildDefaultContext(options);

316

Executor executor = new LocalExecutor(context);

317

318

// Start and use executor

319

executor.start();

320

String sessionId = executor.openSession(null);

321

try {

322

// Perform SQL operations

323

Operation op = executor.parseStatement(sessionId, "SHOW TABLES");

324

TableResult result = executor.executeOperation(sessionId, op);

325

} finally {

326

executor.closeSession(sessionId);

327

}

328

```

329

330

## Error Handling

331

332

### SqlExecutionException

333

334

Primary exception type for SQL execution errors.

335

336

```java { .api }

337

public class SqlExecutionException extends Exception {

338

public SqlExecutionException(String message);

339

public SqlExecutionException(String message, Throwable cause);

340

}

341

```

342

343

Common error scenarios:

344

- SQL parsing errors with syntax details

345

- Execution errors with Flink cluster communication issues

346

- Session management errors (duplicate session ID, session not found)

347

- Resource errors (insufficient memory, network connectivity)

348

- Configuration errors (invalid property values)

349

350

**Example Error Handling:**

351

352

```java

353

try {

354

Operation operation = executor.parseStatement(sessionId, malformedSQL);

355

} catch (SqlExecutionException e) {

356

System.err.println("SQL Error: " + e.getMessage());

357

// Handle parsing or execution error

358

if (e.getCause() instanceof ValidationException) {

359

// Handle validation-specific error

360

}

361

}

362

```

363

364

## Integration Patterns

365

366

### Session Lifecycle Management

367

368

Typical session usage pattern:

369

370

```java

371

Executor executor = new LocalExecutor(defaultContext);

372

executor.start();

373

374

String sessionId = executor.openSession("my-session");

375

try {

376

// Configure session

377

executor.setSessionProperty(sessionId, "key", "value");

378

379

// Execute operations

380

Operation op = executor.parseStatement(sessionId, sql);

381

TableResult result = executor.executeOperation(sessionId, op);

382

383

} finally {

384

// Always close session

385

executor.closeSession(sessionId);

386

}

387

```

388

389

### Streaming Query Pattern

390

391

Pattern for handling streaming query results:

392

393

```java

394

QueryOperation query = (QueryOperation) executor.parseStatement(sessionId, streamingSQL);

395

ResultDescriptor descriptor = executor.executeQuery(sessionId, query);

396

397

// Poll for streaming results

398

while (!cancelled) {

399

TypedResult<List<Row>> result = executor.retrieveResultChanges(sessionId, descriptor.getResultId());

400

401

switch (result.getType()) {

402

case PAYLOAD:

403

processRows(result.getPayload());

404

break;

405

case EMPTY:

406

Thread.sleep(100); // Wait before next poll

407

break;

408

case EOS:

409

System.out.println("Stream ended");

410

return;

411

}

412

}

413

```

414

415

The Executor interface provides a comprehensive abstraction over Flink's SQL execution capabilities, enabling both simple statement execution and complex streaming data processing workflows.