0
# Core Service Interface
1
2
The `SqlGatewayService` is the main service interface responsible for handling requests from endpoints. It provides comprehensive functionality for session management, SQL execution, operation management, and catalog operations.
3
4
## Capabilities
5
6
### Session Management
7
8
Create and manage isolated sessions for multiple clients with environment configuration and resource cleanup.
9
10
```java { .api }
11
/**
12
* Open a new session with the specified environment configuration
13
* @param environment Environment to initialize the Session
14
* @return Handle that identifies the Session
15
* @throws SqlGatewayException if session creation fails
16
*/
17
SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
18
19
/**
20
* Close an existing session and clean up resources
21
* @param sessionHandle Handle to identify the Session to close
22
* @throws SqlGatewayException if session closure fails
23
*/
24
void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
25
26
/**
27
* Configure session using SQL statements (SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR)
28
* @param sessionHandle Handle to identify the session
29
* @param statement SQL statement used to configure the session
30
* @param executionTimeoutMs Execution timeout in milliseconds (non-positive disables timeout)
31
* @throws SqlGatewayException if configuration fails
32
*/
33
void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException;
34
35
/**
36
* Get current configuration of the session
37
* @param sessionHandle Handle to identify the session
38
* @return Map of configuration key-value pairs
39
* @throws SqlGatewayException if session not found or access fails
40
*/
41
Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;
42
43
/**
44
* Get endpoint version negotiated during session opening
45
* @param sessionHandle Handle to identify the session
46
* @return Negotiated endpoint version
47
* @throws SqlGatewayException if session not found
48
*/
49
EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException;
50
```
51
52
### Statement Execution
53
54
Execute SQL statements asynchronously with timeout support and execution configuration.
55
56
```java { .api }
57
/**
58
* Execute the submitted SQL statement
59
* @param sessionHandle Handle to identify the session
60
* @param statement SQL statement to execute
61
* @param executionTimeoutMs Execution timeout in milliseconds (non-positive disables timeout)
62
* @param executionConfig Configuration for statement execution
63
* @return Handle to identify the operation
64
* @throws SqlGatewayException if execution fails
65
*/
66
OperationHandle executeStatement(
67
SessionHandle sessionHandle,
68
String statement,
69
long executionTimeoutMs,
70
Configuration executionConfig
71
) throws SqlGatewayException;
72
73
/**
74
* Fetch results from operation using token-based pagination
75
* @param sessionHandle Handle to identify the session
76
* @param operationHandle Handle to identify the operation
77
* @param token Token to identify results position
78
* @param maxRows Maximum number of rows to fetch (Integer.MAX_VALUE for all available)
79
* @return ResultSet containing data and metadata
80
* @throws SqlGatewayException if fetch fails
81
*/
82
ResultSet fetchResults(
83
SessionHandle sessionHandle,
84
OperationHandle operationHandle,
85
long token,
86
int maxRows
87
) throws SqlGatewayException;
88
89
/**
90
* Fetch results from operation using orientation-based navigation
91
* @param sessionHandle Handle to identify the session
92
* @param operationHandle Handle to identify the operation
93
* @param orientation Direction to fetch results (FETCH_NEXT or FETCH_PRIOR)
94
* @param maxRows Maximum number of rows to fetch
95
* @return ResultSet with at least one row if not end-of-stream
96
* @throws SqlGatewayException if fetch fails
97
*/
98
ResultSet fetchResults(
99
SessionHandle sessionHandle,
100
OperationHandle operationHandle,
101
FetchOrientation orientation,
102
int maxRows
103
) throws SqlGatewayException;
104
```
105
106
### Operation Management
107
108
Submit, monitor, and control operations with comprehensive lifecycle management.
109
110
```java { .api }
111
/**
112
* Submit an operation for execution
113
* @param sessionHandle Handle to identify the session
114
* @param executor Main logic to get execution results
115
* @return Handle for retrieving results later
116
* @throws SqlGatewayException if submission fails
117
*/
118
OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> executor) throws SqlGatewayException;
119
120
/**
121
* Cancel operation when not in terminal status
122
* @param sessionHandle Handle to identify the session
123
* @param operationHandle Handle to identify the operation
124
* @throws SqlGatewayException if cancellation fails or operation already terminated
125
*/
126
void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
127
128
/**
129
* Close operation and release all resources
130
* @param sessionHandle Handle to identify the session
131
* @param operationHandle Handle to identify the operation
132
* @throws SqlGatewayException if closure fails
133
*/
134
void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
135
136
/**
137
* Get operation information including status and errors
138
* @param sessionHandle Handle to identify the session
139
* @param operationHandle Handle to identify the operation
140
* @return OperationInfo with status and exception details
141
* @throws SqlGatewayException if operation not found
142
*/
143
OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
144
145
/**
146
* Get result schema for the operation (available when FINISHED)
147
* @param sessionHandle Handle to identify the session
148
* @param operationHandle Handle to identify the operation
149
* @return Resolved schema of the operation results
150
* @throws SqlGatewayException if operation not found or not finished
151
*/
152
ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
153
```
154
155
### Catalog API
156
157
Access and explore Flink catalog metadata including catalogs, databases, tables, and functions.
158
159
```java { .api }
160
/**
161
* Get current catalog name for the session
162
* @param sessionHandle Handle to identify the session
163
* @return Name of the current catalog
164
* @throws SqlGatewayException if session not found
165
*/
166
String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException;
167
168
/**
169
* List all available catalogs in the session
170
* @param sessionHandle Handle to identify the session
171
* @return Set of registered catalog names
172
* @throws SqlGatewayException if session not found
173
*/
174
Set<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException;
175
176
/**
177
* List all available databases in the given catalog
178
* @param sessionHandle Handle to identify the session
179
* @param catalogName Name of the catalog
180
* @return Set of database names in the catalog
181
* @throws SqlGatewayException if session or catalog not found
182
*/
183
Set<String> listDatabases(SessionHandle sessionHandle, String catalogName) throws SqlGatewayException;
184
185
/**
186
* List tables/views in the given catalog and database
187
* @param sessionHandle Handle to identify the session
188
* @param catalogName Name of the catalog
189
* @param databaseName Name of the database
190
* @param tableKinds Types of tables to return (TABLE, VIEW, etc.)
191
* @return Set of TableInfo for matching tables/views
192
* @throws SqlGatewayException if session, catalog, or database not found
193
*/
194
Set<TableInfo> listTables(
195
SessionHandle sessionHandle,
196
String catalogName,
197
String databaseName,
198
Set<TableKind> tableKinds
199
) throws SqlGatewayException;
200
201
/**
202
* Get table information for fully qualified table name
203
* @param sessionHandle Handle to identify the session
204
* @param tableIdentifier Fully qualified table identifier
205
* @return ResolvedCatalogBaseTable with complete table information
206
* @throws SqlGatewayException if session or table not found
207
*/
208
ResolvedCatalogBaseTable<?> getTable(SessionHandle sessionHandle, ObjectIdentifier tableIdentifier) throws SqlGatewayException;
209
210
/**
211
* List user-defined functions in catalog and database
212
* @param sessionHandle Handle to identify the session
213
* @param catalogName Name of the catalog
214
* @param databaseName Name of the database
215
* @return Set of FunctionInfo for user-defined functions
216
* @throws SqlGatewayException if session, catalog, or database not found
217
*/
218
Set<FunctionInfo> listUserDefinedFunctions(SessionHandle sessionHandle, String catalogName, String databaseName) throws SqlGatewayException;
219
220
/**
221
* List all available system functions
222
* @param sessionHandle Handle to identify the session
223
* @return Set of FunctionInfo for system functions
224
* @throws SqlGatewayException if session not found
225
*/
226
Set<FunctionInfo> listSystemFunctions(SessionHandle sessionHandle) throws SqlGatewayException;
227
228
/**
229
* Get specific function definition with resolution order: temporary system, system, temporary, catalog
230
* @param sessionHandle Handle to identify the session
231
* @param functionIdentifier Identifier of the function
232
* @return FunctionDefinition with complete function details
233
* @throws SqlGatewayException if session or function not found
234
*/
235
FunctionDefinition getFunctionDefinition(SessionHandle sessionHandle, UnresolvedIdentifier functionIdentifier) throws SqlGatewayException;
236
```
237
238
### Materialized Table API
239
240
Manage materialized table refresh operations with periodic and one-time scheduling support.
241
242
```java { .api }
243
/**
244
* Trigger refresh operation for specific materialized table
245
* @param sessionHandle Handle to identify the session
246
* @param materializedTableIdentifier Fully qualified table identifier (catalogName.databaseName.objectName)
247
* @param isPeriodic Whether workflow is periodic or one-time-only
248
* @param scheduleTime Time point for scheduler trigger (nullable)
249
* @param dynamicOptions Dynamic configuration options
250
* @param staticPartitions Specific partitions for one-time refresh
251
* @param executionConfig Flink job configuration
252
* @return Handle to identify the refresh operation
253
*/
254
OperationHandle refreshMaterializedTable(
255
SessionHandle sessionHandle,
256
String materializedTableIdentifier,
257
boolean isPeriodic,
258
@Nullable String scheduleTime,
259
Map<String, String> dynamicOptions,
260
Map<String, String> staticPartitions,
261
Map<String, String> executionConfig
262
);
263
```
264
265
### Deploy Script
266
267
Deploy SQL scripts in application mode for batch job execution.
268
269
```java { .api }
270
/**
271
* Deploy script in application mode
272
* @param sessionHandle Handle to identify the session
273
* @param scriptUri URI of the script (nullable)
274
* @param script Content of the script (nullable)
275
* @param executionConfig Configuration to run the script
276
* @return Cluster identifier for the deployed script
277
* @throws SqlGatewayException if deployment fails
278
*/
279
<ClusterID> ClusterID deployScript(
280
SessionHandle sessionHandle,
281
@Nullable URI scriptUri,
282
@Nullable String script,
283
Configuration executionConfig
284
) throws SqlGatewayException;
285
```
286
287
### Statement Completion
288
289
Provide SQL statement completion hints for interactive SQL clients.
290
291
```java { .api }
292
/**
293
* Get completion hints for SQL statement at given position
294
* @param sessionHandle Handle to identify the session
295
* @param statement SQL statement to be completed
296
* @param position Cursor position where completion is needed
297
* @return List of completion suggestions
298
* @throws SqlGatewayException if session not found or completion fails
299
*/
300
List<String> completeStatement(SessionHandle sessionHandle, String statement, int position) throws SqlGatewayException;
301
```
302
303
### Utilities
304
305
Get gateway information and service metadata.
306
307
```java { .api }
308
/**
309
* Get information about the SqlGatewayService
310
* @return GatewayInfo with product name and version
311
*/
312
GatewayInfo getGatewayInfo();
313
```
314
315
## Usage Examples
316
317
```java
318
import org.apache.flink.table.gateway.api.SqlGatewayService;
319
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
320
import org.apache.flink.table.gateway.api.session.SessionHandle;
321
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
322
323
// Create service instance
324
SqlGatewayService service = new SqlGatewayServiceImpl(sessionManager);
325
326
// Open session
327
SessionEnvironment environment = SessionEnvironment.newBuilder()
328
.setSessionName("my-session")
329
.addSessionConfig(Map.of("execution.target", "remote"))
330
.build();
331
332
SessionHandle session = service.openSession(environment);
333
334
// Execute SQL
335
OperationHandle operation = service.executeStatement(
336
session,
337
"SELECT * FROM my_table",
338
30000L, // 30 second timeout
339
new Configuration()
340
);
341
342
// Check operation status
343
OperationInfo info = service.getOperationInfo(session, operation);
344
if (info.getStatus().isTerminalStatus()) {
345
// Fetch results
346
ResultSet results = service.fetchResults(session, operation, 0L, 100);
347
// Process results...
348
}
349
350
// Clean up
351
service.closeOperation(session, operation);
352
service.closeSession(session);
353
```