0
# SQL Execution Gateway
1
2
Core execution interface providing session management, SQL parsing, and operation execution. The gateway abstracts the underlying Flink execution environment and provides a unified API for SQL operations across different deployment modes.
3
4
## Capabilities
5
6
### Executor Interface
7
8
Primary interface for SQL execution operations with complete session lifecycle management.
9
10
```java { .api }
11
public interface Executor {
12
/**
13
* Initialize executor and ensure readiness for command execution
14
* @throws SqlExecutionException if initialization fails
15
*/
16
void start() throws SqlExecutionException;
17
18
/**
19
* Open new session with optional session identifier
20
* @param sessionId Desired session ID or null for auto-generation
21
* @return Actual session ID used for tracking
22
* @throws SqlExecutionException if session creation fails
23
*/
24
String openSession(String sessionId) throws SqlExecutionException;
25
26
/**
27
* Close session and release associated resources
28
* @param sessionId Session identifier to close
29
* @throws SqlExecutionException if session closure fails
30
*/
31
void closeSession(String sessionId) throws SqlExecutionException;
32
}
33
```
34
35
### Configuration Management
36
37
Session-level configuration management with property get/set operations.
38
39
```java { .api }
40
/**
41
* Get session configuration as Map for external access
42
* @param sessionId Session identifier
43
* @return Copy of all session configuration properties
44
* @throws SqlExecutionException if session not found
45
*/
46
Map<String, String> getSessionConfigMap(String sessionId) throws SqlExecutionException;
47
48
/**
49
* Get session configuration as ReadableConfig for type-safe access
50
* @param sessionId Session identifier
51
* @return ReadableConfig instance with session properties
52
* @throws SqlExecutionException if session not found
53
*/
54
ReadableConfig getSessionConfig(String sessionId) throws SqlExecutionException;
55
56
/**
57
* Reset all session properties to default values
58
* @param sessionId Session identifier
59
* @throws SqlExecutionException if reset fails
60
*/
61
void resetSessionProperties(String sessionId) throws SqlExecutionException;
62
63
/**
64
* Reset specific session property to default value
65
* @param sessionId Session identifier
66
* @param key Property key to reset
67
* @throws SqlExecutionException if reset fails
68
*/
69
void resetSessionProperty(String sessionId, String key) throws SqlExecutionException;
70
71
/**
72
* Set session property to specific value
73
* @param sessionId Session identifier
74
* @param key Property key
75
* @param value Property value
76
* @throws SqlExecutionException if property setting fails
77
*/
78
void setSessionProperty(String sessionId, String key, String value) throws SqlExecutionException;
79
```
80
81
**Usage Example:**
82
83
```java
84
// Configure session properties
85
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "tableau");
86
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "4");
87
88
// Get current configuration
89
Map<String, String> config = executor.getSessionConfigMap(sessionId);
90
ReadableConfig readableConfig = executor.getSessionConfig(sessionId);
91
92
// Reset properties
93
executor.resetSessionProperty(sessionId, "table.exec.resource.default-parallelism");
94
executor.resetSessionProperties(sessionId); // Reset all
95
```
96
97
### SQL Statement Processing
98
99
Parse and execute SQL statements with full operation lifecycle support.
100
101
```java { .api }
102
/**
103
* Parse SQL statement into executable Operation
104
* @param sessionId Session identifier for context
105
* @param statement SQL statement to parse
106
* @return Operation instance ready for execution
107
* @throws SqlExecutionException if parsing fails
108
*/
109
Operation parseStatement(String sessionId, String statement) throws SqlExecutionException;
110
111
/**
112
* Provide auto-completion suggestions for SQL statement
113
* @param sessionId Session identifier for context
114
* @param statement Partial SQL statement
115
* @param position Cursor position within statement
116
* @return List of completion suggestions
117
*/
118
List<String> completeStatement(String sessionId, String statement, int position);
119
120
/**
121
* Execute parsed operation and return result
122
* @param sessionId Session identifier
123
* @param operation Parsed operation to execute
124
* @return TableResult with operation results
125
* @throws SqlExecutionException if execution fails
126
*/
127
TableResult executeOperation(String sessionId, Operation operation) throws SqlExecutionException;
128
```
129
130
**Usage Example:**
131
132
```java
133
// Parse and execute statement
134
String sql = "CREATE TABLE users (id INT, name STRING) WITH ('connector' = 'datagen')";
135
Operation operation = executor.parseStatement(sessionId, sql);
136
TableResult result = executor.executeOperation(sessionId, operation);
137
138
// Auto-completion
139
List<String> suggestions = executor.completeStatement(sessionId, "SELECT * FROM use", 17);
140
// Returns: ["users", "user_events", ...] based on available tables
141
```
142
143
### Query Execution and Results
144
145
Execute queries with advanced result handling for streaming and batch operations.
146
147
```java { .api }
148
/**
149
* Execute SELECT query and return result descriptor for streaming access
150
* @param sessionId Session identifier
151
* @param query Parsed query operation
152
* @return ResultDescriptor for accessing query results
153
* @throws SqlExecutionException if query execution fails
154
*/
155
ResultDescriptor executeQuery(String sessionId, QueryOperation query) throws SqlExecutionException;
156
157
/**
158
* Retrieve incremental changes from streaming query result
159
* @param sessionId Session identifier
160
* @param resultId Result identifier from executeQuery
161
* @return TypedResult containing list of result rows or status
162
* @throws SqlExecutionException if retrieval fails
163
*/
164
TypedResult<List<Row>> retrieveResultChanges(String sessionId, String resultId) throws SqlExecutionException;
165
166
/**
167
* Create materialized snapshot of query result with pagination
168
* @param sessionId Session identifier
169
* @param resultId Result identifier from executeQuery
170
* @param pageSize Number of rows per page
171
* @return TypedResult containing total page count
172
* @throws SqlExecutionException if snapshot creation fails
173
*/
174
TypedResult<Integer> snapshotResult(String sessionId, String resultId, int pageSize) throws SqlExecutionException;
175
176
/**
177
* Retrieve specific page from materialized result snapshot
178
* @param resultId Result identifier
179
* @param page Page number (0-based)
180
* @return List of rows for the requested page
181
* @throws SqlExecutionException if page retrieval fails
182
*/
183
List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException;
184
185
/**
186
* Cancel running query and stop result generation
187
* @param sessionId Session identifier
188
* @param resultId Result identifier to cancel
189
* @throws SqlExecutionException if cancellation fails
190
*/
191
void cancelQuery(String sessionId, String resultId) throws SqlExecutionException;
192
```
193
194
**Usage Example:**
195
196
```java
197
// Execute streaming query
198
QueryOperation queryOp = (QueryOperation) executor.parseStatement(sessionId, "SELECT * FROM events");
199
ResultDescriptor descriptor = executor.executeQuery(sessionId, queryOp);
200
201
if (descriptor.isMaterialized()) {
202
// Handle materialized results with pagination
203
TypedResult<Integer> pageCount = executor.snapshotResult(sessionId, descriptor.getResultId(), 100);
204
if (pageCount.getType() == TypedResult.ResultType.PAYLOAD) {
205
for (int page = 0; page < pageCount.getPayload(); page++) {
206
List<Row> rows = executor.retrieveResultPage(descriptor.getResultId(), page);
207
// Process rows
208
}
209
}
210
} else {
211
// Handle streaming results
212
while (true) {
213
TypedResult<List<Row>> changes = executor.retrieveResultChanges(sessionId, descriptor.getResultId());
214
if (changes.getType() == TypedResult.ResultType.PAYLOAD) {
215
List<Row> rows = changes.getPayload();
216
// Process incremental changes
217
} else if (changes.getType() == TypedResult.ResultType.EOS) {
218
break; // End of stream
219
}
220
// Handle EMPTY results by continuing to poll
221
}
222
}
223
```
224
225
### Batch Modification Operations
226
227
Execute multiple modification operations as a batch for better performance.
228
229
```java { .api }
230
/**
231
* Execute multiple modification operations as batch
232
* @param sessionId Session identifier
233
* @param operations List of modification operations (INSERT, UPDATE, DELETE)
234
* @return TableResult with batch execution results
235
* @throws SqlExecutionException if batch execution fails
236
*/
237
TableResult executeModifyOperations(String sessionId, List<ModifyOperation> operations) throws SqlExecutionException;
238
```
239
240
**Usage Example:**
241
242
```java
243
// Batch multiple INSERT operations
244
List<ModifyOperation> operations = Arrays.asList(
245
(ModifyOperation) executor.parseStatement(sessionId, "INSERT INTO table1 VALUES (1, 'a')"),
246
(ModifyOperation) executor.parseStatement(sessionId, "INSERT INTO table2 VALUES (2, 'b')")
247
);
248
249
TableResult batchResult = executor.executeModifyOperations(sessionId, operations);
250
System.out.println("Batch job ID: " + batchResult.getJobClient().get().getJobID());
251
```
252
253
### JAR and Dependency Management
254
255
Manage session-level JAR dependencies for custom functions and connectors.
256
257
```java { .api }
258
/**
259
* Add JAR file to session classpath
260
* @param sessionId Session identifier
261
* @param jarPath Path to JAR file (local or remote URL)
262
*/
263
void addJar(String sessionId, String jarPath);
264
265
/**
266
* Remove JAR file from session classpath
267
* @param sessionId Session identifier
268
* @param jarPath Path to JAR file to remove
269
*/
270
void removeJar(String sessionId, String jarPath);
271
272
/**
273
* List all JAR files loaded in session
274
* @param sessionId Session identifier
275
* @return List of JAR file paths
276
*/
277
List<String> listJars(String sessionId);
278
```
279
280
**Usage Example:**
281
282
```java
283
// Add custom connector JAR
284
executor.addJar(sessionId, "/path/to/custom-connector-1.0.jar");
285
286
// Add JAR from URL
287
executor.addJar(sessionId, "https://repo.maven.apache.org/maven2/org/example/connector/1.0/connector-1.0.jar");
288
289
// List loaded JARs
290
List<String> jars = executor.listJars(sessionId);
291
jars.forEach(System.out::println);
292
293
// Remove JAR when no longer needed
294
executor.removeJar(sessionId, "/path/to/custom-connector-1.0.jar");
295
```
296
297
## LocalExecutor Implementation
298
299
Concrete implementation of Executor interface for embedded execution mode.
300
301
```java { .api }
302
public class LocalExecutor implements Executor {
303
/**
304
* Create local executor with default context
305
* @param defaultContext Default execution context with configuration and dependencies
306
*/
307
public LocalExecutor(DefaultContext defaultContext);
308
}
309
```
310
311
**Usage Example:**
312
313
```java
314
// Create local executor
315
DefaultContext context = LocalContextUtils.buildDefaultContext(options);
316
Executor executor = new LocalExecutor(context);
317
318
// Start and use executor
319
executor.start();
320
String sessionId = executor.openSession(null);
321
try {
322
// Perform SQL operations
323
Operation op = executor.parseStatement(sessionId, "SHOW TABLES");
324
TableResult result = executor.executeOperation(sessionId, op);
325
} finally {
326
executor.closeSession(sessionId);
327
}
328
```
329
330
## Error Handling
331
332
### SqlExecutionException
333
334
Primary exception type for SQL execution errors.
335
336
```java { .api }
337
public class SqlExecutionException extends Exception {
338
public SqlExecutionException(String message);
339
public SqlExecutionException(String message, Throwable cause);
340
}
341
```
342
343
Common error scenarios:
344
- SQL parsing errors with syntax details
345
- Execution errors with Flink cluster communication issues
346
- Session management errors (duplicate session ID, session not found)
347
- Resource errors (insufficient memory, network connectivity)
348
- Configuration errors (invalid property values)
349
350
**Example Error Handling:**
351
352
```java
353
try {
354
Operation operation = executor.parseStatement(sessionId, malformedSQL);
355
} catch (SqlExecutionException e) {
356
System.err.println("SQL Error: " + e.getMessage());
357
// Handle parsing or execution error
358
if (e.getCause() instanceof ValidationException) {
359
// Handle validation-specific error
360
}
361
}
362
```
363
364
## Integration Patterns
365
366
### Session Lifecycle Management
367
368
Typical session usage pattern:
369
370
```java
371
Executor executor = new LocalExecutor(defaultContext);
372
executor.start();
373
374
String sessionId = executor.openSession("my-session");
375
try {
376
// Configure session
377
executor.setSessionProperty(sessionId, "key", "value");
378
379
// Execute operations
380
Operation op = executor.parseStatement(sessionId, sql);
381
TableResult result = executor.executeOperation(sessionId, op);
382
383
} finally {
384
// Always close session
385
executor.closeSession(sessionId);
386
}
387
```
388
389
### Streaming Query Pattern
390
391
Pattern for handling streaming query results:
392
393
```java
394
QueryOperation query = (QueryOperation) executor.parseStatement(sessionId, streamingSQL);
395
ResultDescriptor descriptor = executor.executeQuery(sessionId, query);
396
397
// Poll for streaming results
398
while (!cancelled) {
399
TypedResult<List<Row>> result = executor.retrieveResultChanges(sessionId, descriptor.getResultId());
400
401
switch (result.getType()) {
402
case PAYLOAD:
403
processRows(result.getPayload());
404
break;
405
case EMPTY:
406
Thread.sleep(100); // Wait before next poll
407
break;
408
case EOS:
409
System.out.println("Stream ended");
410
return;
411
}
412
}
413
```
414
415
The Executor interface provides a comprehensive abstraction over Flink's SQL execution capabilities, enabling both simple statement execution and complex streaming data processing workflows.