0
# SQL Gateway API
1
2
Remote SQL execution capabilities for building client-server architectures and multi-tenant SQL services with Apache Flink's Table API.
3
4
## Capabilities
5
6
### SqlGatewayService
7
8
Core service interface for managing SQL sessions and executing SQL operations remotely.
9
10
```java { .api }
11
/**
12
* Open a new session for SQL execution
13
* @param environment Session configuration and initialization parameters
14
* @return SessionHandle for identifying the session
15
* @throws SqlGatewayException if session creation fails
16
*/
17
public SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
18
19
/**
20
* Close an existing session and clean up resources
21
* @param sessionHandle Handle of the session to close
22
* @throws SqlGatewayException if session closure fails
23
*/
24
public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
25
26
/**
27
* Configure session properties using SET statements
28
* @param sessionHandle Handle of the session to configure
29
* @param statement Configuration statement (e.g., "SET table.planner = blink")
30
* @param executionTimeoutMs Maximum execution time in milliseconds
31
* @throws SqlGatewayException if configuration fails
32
*/
33
public void configureSession(SessionHandle sessionHandle,
34
String statement,
35
long executionTimeoutMs) throws SqlGatewayException;
36
37
/**
38
* Execute a SQL statement (DDL, DML, or query)
39
* @param sessionHandle Handle of the session
40
* @param statement SQL statement to execute
41
* @param executionTimeoutMs Maximum execution time in milliseconds
42
* @param executionConfig Additional execution configuration
43
* @return OperationHandle for tracking the operation
44
* @throws SqlGatewayException if execution fails
45
*/
46
public OperationHandle executeStatement(SessionHandle sessionHandle,
47
String statement,
48
long executionTimeoutMs,
49
Configuration executionConfig) throws SqlGatewayException;
50
51
/**
52
* Submit a custom operation for execution
53
* @param sessionHandle Handle of the session
54
* @param executor Callable that produces the operation result
55
* @return OperationHandle for tracking the operation
56
* @throws SqlGatewayException if submission fails
57
*/
58
public OperationHandle submitOperation(SessionHandle sessionHandle,
59
Callable<ResultSet> executor) throws SqlGatewayException;
60
61
/**
62
* Cancel a running operation
63
* @param sessionHandle Handle of the session
64
* @param operationHandle Handle of the operation to cancel
65
* @throws SqlGatewayException if cancellation fails
66
*/
67
public void cancelOperation(SessionHandle sessionHandle,
68
OperationHandle operationHandle) throws SqlGatewayException;
69
70
/**
71
* Close an operation and clean up resources
72
* @param sessionHandle Handle of the session
73
* @param operationHandle Handle of the operation to close
74
* @throws SqlGatewayException if operation closure fails
75
*/
76
public void closeOperation(SessionHandle sessionHandle,
77
OperationHandle operationHandle) throws SqlGatewayException;
78
79
/**
80
* Get information about an operation
81
* @param sessionHandle Handle of the session
82
* @param operationHandle Handle of the operation
83
* @return OperationInfo containing status and metadata
84
* @throws SqlGatewayException if operation info retrieval fails
85
*/
86
public OperationInfo getOperationInfo(SessionHandle sessionHandle,
87
OperationHandle operationHandle) throws SqlGatewayException;
88
89
/**
90
* Get schema information for an operation result
91
* @param sessionHandle Handle of the session
92
* @param operationHandle Handle of the operation
93
* @return ResolvedSchema of the operation result
94
* @throws SqlGatewayException if schema retrieval fails
95
*/
96
public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle,
97
OperationHandle operationHandle) throws SqlGatewayException;
98
99
/**
100
* Fetch results from an operation
101
* @param sessionHandle Handle of the session
102
* @param operationHandle Handle of the operation
103
* @param orientation Direction for fetching results
104
* @param maxRows Maximum number of rows to fetch
105
* @return ResultSet containing the fetched results
106
* @throws SqlGatewayException if result fetching fails
107
*/
108
public ResultSet fetchResults(SessionHandle sessionHandle,
109
OperationHandle operationHandle,
110
FetchOrientation orientation,
111
int maxRows) throws SqlGatewayException;
112
```
113
114
**Basic SQL Gateway Usage:**
115
116
```java
117
// Create SQL Gateway service instance
118
SqlGatewayService gateway = SqlGatewayServiceImpl.create(gatewayConfig);
119
120
// Open session with configuration
121
Map<String, String> sessionProperties = new HashMap<>();
122
sessionProperties.put("execution.parallelism", "4");
123
sessionProperties.put("table.planner", "blink");
124
125
SessionEnvironment sessionEnv = SessionEnvironment.newBuilder()
126
.setSessionEndpointConfig(endpointConfig)
127
.addSessionConfig(sessionProperties)
128
.build();
129
130
SessionHandle session = gateway.openSession(sessionEnv);
131
132
try {
133
// Configure session
134
gateway.configureSession(session, "SET execution.checkpointing.interval = 10s", 5000);
135
136
// Execute DDL
137
OperationHandle createTable = gateway.executeStatement(session,
138
"CREATE TABLE orders (" +
139
" id BIGINT," +
140
" customer_id BIGINT," +
141
" amount DECIMAL(10,2)" +
142
") WITH ('connector' = 'kafka', ...)",
143
30000, new Configuration());
144
145
// Execute query
146
OperationHandle query = gateway.executeStatement(session,
147
"SELECT customer_id, SUM(amount) as total " +
148
"FROM orders GROUP BY customer_id",
149
60000, new Configuration());
150
151
// Fetch results
152
ResultSet results = gateway.fetchResults(session, query,
153
FetchOrientation.FETCH_NEXT, 100);
154
155
while (results.hasNext()) {
156
RowData row = results.next();
157
System.out.println("Customer: " + row.getLong(0) + ", Total: " + row.getDecimal(1, 10, 2));
158
}
159
160
} finally {
161
gateway.closeSession(session);
162
}
163
```
164
165
### Session Management
166
167
Handle SQL Gateway sessions with proper lifecycle management.
168
169
```java { .api }
170
/**
171
* Session handle for identifying and tracking SQL sessions
172
*/
173
public final class SessionHandle {
174
/**
175
* Get unique session identifier
176
* @return UUID representing the session
177
*/
178
public UUID getIdentifier();
179
180
/**
181
* Get session creation timestamp
182
* @return Session creation time
183
*/
184
public Instant getCreationTime();
185
}
186
187
/**
188
* Session environment configuration
189
*/
190
public final class SessionEnvironment {
191
/**
192
* Create new session environment builder
193
* @return Builder for constructing session environment
194
*/
195
public static Builder newBuilder();
196
197
public static final class Builder {
198
/**
199
* Set session endpoint configuration
200
* @param config Endpoint-specific configuration
201
* @return Builder for method chaining
202
*/
203
public Builder setSessionEndpointConfig(Map<String, String> config);
204
205
/**
206
* Add session configuration properties
207
* @param config Map of configuration key-value pairs
208
* @return Builder for method chaining
209
*/
210
public Builder addSessionConfig(Map<String, String> config);
211
212
/**
213
* Set default catalog name
214
* @param catalogName Name of the default catalog
215
* @return Builder for method chaining
216
*/
217
public Builder setDefaultCatalog(String catalogName);
218
219
/**
220
* Set default database name
221
* @param databaseName Name of the default database
222
* @return Builder for method chaining
223
*/
224
public Builder setDefaultDatabase(String databaseName);
225
226
/**
227
* Build the session environment
228
* @return Constructed SessionEnvironment
229
*/
230
public SessionEnvironment build();
231
}
232
}
233
```
234
235
### Operation Management
236
237
Track and manage long-running SQL operations.
238
239
```java { .api }
240
/**
241
* Operation handle for identifying and tracking SQL operations
242
*/
243
public final class OperationHandle {
244
/**
245
* Get unique operation identifier
246
* @return UUID representing the operation
247
*/
248
public UUID getIdentifier();
249
250
/**
251
* Get operation type
252
* @return Type of the operation (EXECUTE_STATEMENT, etc.)
253
*/
254
public OperationType getOperationType();
255
}
256
257
/**
258
* Operation information and status
259
*/
260
public final class OperationInfo {
261
/**
262
* Get operation status
263
* @return Current status of the operation
264
*/
265
public OperationStatus getStatus();
266
267
/**
268
* Get operation exception if failed
269
* @return Optional exception that caused operation failure
270
*/
271
public Optional<Throwable> getException();
272
273
/**
274
* Check if operation has results available
275
* @return true if operation has results to fetch
276
*/
277
public boolean hasResults();
278
279
/**
280
* Get operation creation timestamp
281
* @return When the operation was created
282
*/
283
public Instant getCreateTime();
284
285
/**
286
* Get operation end timestamp
287
* @return When the operation completed (if finished)
288
*/
289
public Optional<Instant> getEndTime();
290
}
291
292
/**
293
* Operation status enumeration
294
*/
295
public enum OperationStatus {
296
INITIALIZED,
297
PENDING,
298
RUNNING,
299
FINISHED,
300
CANCELED,
301
FAILED,
302
TIMEOUT
303
}
304
305
/**
306
* Operation type enumeration
307
*/
308
public enum OperationType {
309
EXECUTE_STATEMENT,
310
SUBMIT_PLAN
311
}
312
```
313
314
### Result Management
315
316
Handle result sets and data fetching from SQL operations.
317
318
```java { .api }
319
/**
320
* Result set for accessing query results
321
*/
322
public interface ResultSet extends AutoCloseable {
323
/**
324
* Get the result type
325
* @return Type of results (PAYLOAD, NOT_READY, EOS)
326
*/
327
public ResultType getResultType();
328
329
/**
330
* Get next token for pagination
331
* @return Optional token for fetching next page of results
332
*/
333
public Optional<String> getNextToken();
334
335
/**
336
* Get result data as list of RowData
337
* @return List containing the fetched rows
338
*/
339
public List<RowData> getData();
340
341
/**
342
* Check if more results are available
343
* @return true if hasNext() would return more data
344
*/
345
public boolean hasNext();
346
347
/**
348
* Get next row of data
349
* @return Next RowData instance
350
*/
351
public RowData next();
352
353
/**
354
* Get result schema
355
* @return Schema of the result rows
356
*/
357
public ResolvedSchema getResultSchema();
358
}
359
360
/**
361
* Result type enumeration
362
*/
363
public enum ResultType {
364
PAYLOAD, // Contains actual data
365
NOT_READY, // Operation not ready yet
366
EOS // End of stream
367
}
368
369
/**
370
* Fetch orientation for result pagination
371
*/
372
public enum FetchOrientation {
373
FETCH_NEXT, // Fetch next rows
374
FETCH_PRIOR, // Fetch previous rows
375
FETCH_FIRST, // Fetch from beginning
376
FETCH_LAST // Fetch from end
377
}
378
```
379
380
**Result Handling Examples:**
381
382
```java
383
// Execute query and handle results
384
OperationHandle queryOp = gateway.executeStatement(session,
385
"SELECT * FROM large_table ORDER BY id",
386
120000, new Configuration());
387
388
// Wait for operation to complete
389
OperationInfo opInfo;
390
do {
391
Thread.sleep(1000);
392
opInfo = gateway.getOperationInfo(session, queryOp);
393
} while (opInfo.getStatus() == OperationStatus.RUNNING);
394
395
if (opInfo.getStatus() == OperationStatus.FINISHED) {
396
// Fetch results in batches
397
String nextToken = null;
398
int batchSize = 1000;
399
400
do {
401
ResultSet batch = gateway.fetchResults(session, queryOp,
402
FetchOrientation.FETCH_NEXT, batchSize);
403
404
if (batch.getResultType() == ResultType.PAYLOAD) {
405
List<RowData> rows = batch.getData();
406
for (RowData row : rows) {
407
processRow(row);
408
}
409
nextToken = batch.getNextToken().orElse(null);
410
}
411
} while (nextToken != null);
412
}
413
```
414
415
### Endpoint Management
416
417
Configure and manage SQL Gateway endpoints for different protocols.
418
419
```java { .api }
420
/**
421
* SQL Gateway endpoint interface
422
*/
423
public interface SqlGatewayEndpoint {
424
/**
425
* Start the endpoint and begin accepting connections
426
* @throws SqlGatewayException if startup fails
427
*/
428
public void start() throws SqlGatewayException;
429
430
/**
431
* Stop the endpoint and close all connections
432
* @throws SqlGatewayException if shutdown fails
433
*/
434
public void stop() throws SqlGatewayException;
435
436
/**
437
* Get endpoint information
438
* @return Information about this endpoint
439
*/
440
public EndpointInfo getInfo();
441
}
442
443
/**
444
* Endpoint information
445
*/
446
public final class EndpointInfo {
447
/**
448
* Get endpoint identifier
449
* @return Unique endpoint ID
450
*/
451
public String getEndpointId();
452
453
/**
454
* Get endpoint version
455
* @return Version string of the endpoint
456
*/
457
public String getEndpointVersion();
458
459
/**
460
* Get supported protocols
461
* @return Set of supported protocol names
462
*/
463
public Set<String> getSupportedProtocols();
464
}
465
```
466
467
### Gateway Configuration
468
469
Configure SQL Gateway service and endpoints.
470
471
```java { .api }
472
/**
473
* SQL Gateway configuration builder
474
*/
475
public class SqlGatewayConfig {
476
public static Builder newBuilder() {
477
return new Builder();
478
}
479
480
public static class Builder {
481
/**
482
* Set session timeout
483
* @param timeout Session timeout duration
484
* @return Builder for method chaining
485
*/
486
public Builder setSessionTimeout(Duration timeout);
487
488
/**
489
* Set maximum number of concurrent sessions
490
* @param maxSessions Maximum session count
491
* @return Builder for method chaining
492
*/
493
public Builder setMaxSessions(int maxSessions);
494
495
/**
496
* Set operation timeout
497
* @param timeout Default operation timeout
498
* @return Builder for method chaining
499
*/
500
public Builder setOperationTimeout(Duration timeout);
501
502
/**
503
* Set result fetch timeout
504
* @param timeout Result fetch timeout
505
* @return Builder for method chaining
506
*/
507
public Builder setResultFetchTimeout(Duration timeout);
508
509
/**
510
* Enable/disable session persistence
511
* @param persistent Whether to persist sessions
512
* @return Builder for method chaining
513
*/
514
public Builder setPersistentSessions(boolean persistent);
515
516
/**
517
* Build the configuration
518
* @return Constructed configuration
519
*/
520
public SqlGatewayConfig build();
521
}
522
}
523
```
524
525
### Error Handling
526
527
Handle SQL Gateway specific exceptions and error conditions.
528
529
```java { .api }
530
/**
531
* Base exception for SQL Gateway operations
532
*/
533
public class SqlGatewayException extends Exception {
534
/**
535
* Get error code
536
* @return Specific error code for the exception
537
*/
538
public String getErrorCode();
539
540
/**
541
* Get error details
542
* @return Additional error details and context
543
*/
544
public Map<String, String> getErrorDetails();
545
}
546
547
/**
548
* Exception for session-related errors
549
*/
550
public class SessionException extends SqlGatewayException {
551
/**
552
* Get session handle that caused the error
553
* @return Session handle associated with error
554
*/
555
public SessionHandle getSessionHandle();
556
}
557
558
/**
559
* Exception for operation-related errors
560
*/
561
public class OperationException extends SqlGatewayException {
562
/**
563
* Get operation handle that caused the error
564
* @return Operation handle associated with error
565
*/
566
public OperationHandle getOperationHandle();
567
}
568
```
569
570
### Advanced Usage Patterns
571
572
Common patterns for building applications with SQL Gateway.
573
574
```java { .api }
575
// Connection pooling for multiple clients
576
public class SqlGatewayConnectionPool {
577
private final SqlGatewayService gateway;
578
private final Queue<SessionHandle> availableSessions;
579
private final int maxSessions;
580
581
public SessionHandle acquireSession() throws SqlGatewayException {
582
SessionHandle session = availableSessions.poll();
583
if (session == null) {
584
if (activeSessions.size() < maxSessions) {
585
session = gateway.openSession(defaultSessionEnv);
586
} else {
587
throw new SqlGatewayException("No available sessions");
588
}
589
}
590
return session;
591
}
592
593
public void releaseSession(SessionHandle session) {
594
availableSessions.offer(session);
595
}
596
}
597
598
// Async query execution
599
public class AsyncQueryExecutor {
600
public CompletableFuture<List<RowData>> executeQueryAsync(
601
SessionHandle session, String query) {
602
603
return CompletableFuture.supplyAsync(() -> {
604
try {
605
OperationHandle op = gateway.executeStatement(session, query, 60000, new Configuration());
606
607
// Poll for completion
608
OperationInfo info;
609
do {
610
Thread.sleep(100);
611
info = gateway.getOperationInfo(session, op);
612
} while (info.getStatus() == OperationStatus.RUNNING);
613
614
if (info.getStatus() == OperationStatus.FINISHED) {
615
ResultSet results = gateway.fetchResults(session, op,
616
FetchOrientation.FETCH_NEXT, Integer.MAX_VALUE);
617
return results.getData();
618
} else {
619
throw new RuntimeException("Query failed: " + info.getException());
620
}
621
} catch (Exception e) {
622
throw new RuntimeException(e);
623
}
624
});
625
}
626
}
627
628
// Multi-tenant session management
629
public class MultiTenantGateway {
630
private final Map<String, SessionHandle> tenantSessions = new ConcurrentHashMap<>();
631
632
public SessionHandle getOrCreateTenantSession(String tenantId) throws SqlGatewayException {
633
return tenantSessions.computeIfAbsent(tenantId, id -> {
634
try {
635
SessionEnvironment env = SessionEnvironment.newBuilder()
636
.setDefaultCatalog("tenant_" + id)
637
.addSessionConfig(Map.of(
638
"execution.parallelism", "2",
639
"table.exec.resource.default-parallelism", "2"
640
))
641
.build();
642
return gateway.openSession(env);
643
} catch (SqlGatewayException e) {
644
throw new RuntimeException(e);
645
}
646
});
647
}
648
649
public void executeForTenant(String tenantId, String sql) throws SqlGatewayException {
650
SessionHandle session = getOrCreateTenantSession(tenantId);
651
gateway.executeStatement(session, sql, 30000, new Configuration());
652
}
653
}
654
```