A service that enables multiple clients from the remote to execute SQL in concurrency, providing an easy way to submit Flink Jobs, look up metadata, and analyze data online.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-gateway@2.1.00
# Apache Flink SQL Gateway
1
2
Apache Flink SQL Gateway is a service component that provides a multi-client SQL execution interface for remote connections. It acts as a gateway service enabling concurrent SQL execution from multiple clients through REST and HiveServer2 endpoints, with session management, operation tracking, and comprehensive catalog integration.
3
4
## Package Information
5
6
- **Package Name**: flink-sql-gateway
7
- **Package Type**: Maven
8
- **Group ID**: org.apache.flink
9
- **Artifact ID**: flink-sql-gateway
10
- **Language**: Java
11
- **Installation**: Add to Maven dependencies:
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-sql-gateway</artifactId>
17
<version>2.1.0</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
// Main Gateway Classes
25
import org.apache.flink.table.gateway.SqlGateway;
26
import org.apache.flink.table.gateway.api.SqlGatewayService;
27
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
28
29
// Session Management
30
import org.apache.flink.table.gateway.api.session.SessionHandle;
31
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
32
import org.apache.flink.table.gateway.service.session.SessionManager;
33
import org.apache.flink.table.gateway.service.context.DefaultContext;
34
35
// Operation Management
36
import org.apache.flink.table.gateway.api.operation.OperationHandle;
37
import org.apache.flink.table.gateway.api.operation.OperationStatus;
38
import org.apache.flink.table.gateway.api.results.OperationInfo;
39
40
// Results and Data
41
import org.apache.flink.table.gateway.api.results.ResultSet;
42
import org.apache.flink.table.gateway.api.results.TableInfo;
43
import org.apache.flink.table.gateway.api.results.FunctionInfo;
44
import org.apache.flink.table.gateway.api.results.FetchOrientation;
45
46
// Endpoints
47
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
48
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactory;
49
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
50
51
// Configuration
52
import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;
53
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions;
54
55
// Workflow Management
56
import org.apache.flink.table.workflow.WorkflowScheduler;
57
import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowScheduler;
58
import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowSchedulerFactory;
59
```
60
61
## Basic Usage
62
63
```java
64
import org.apache.flink.configuration.Configuration;
65
import org.apache.flink.table.gateway.SqlGateway;
66
import org.apache.flink.table.gateway.service.session.SessionManager;
67
import org.apache.flink.table.gateway.service.context.DefaultContext;
68
import org.apache.flink.table.gateway.api.SqlGatewayService;
69
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
70
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
71
import org.apache.flink.table.gateway.api.session.SessionHandle;
72
import org.apache.flink.table.gateway.api.operation.OperationHandle;
73
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
74
75
import java.util.Collections;
76
77
// Start SQL Gateway programmatically
78
Configuration config = new Configuration();
79
DefaultContext defaultContext = DefaultContext.load(config, Collections.emptyList(), true);
80
SessionManager sessionManager = SessionManager.create(defaultContext);
81
82
SqlGateway gateway = new SqlGateway(defaultContext.getFlinkConfig(), sessionManager);
83
gateway.start();
84
85
// Use the service directly for SQL operations
86
SqlGatewayService service = new SqlGatewayServiceImpl(sessionManager);
87
88
// Open a session
89
SessionEnvironment environment = SessionEnvironment.newBuilder()
90
.setSessionEndpointVersion(EndpointVersion.V1)
91
.build();
92
93
SessionHandle session = service.openSession(environment);
94
95
// Execute SQL
96
OperationHandle operation = service.executeStatement(
97
session,
98
"SELECT 1",
99
30000L, // 30 second timeout
100
new Configuration()
101
);
102
103
// Clean up
104
service.closeOperation(session, operation);
105
service.closeSession(session);
106
107
// Stop gateway when done
108
gateway.stop();
109
```
110
111
## Architecture
112
113
The Flink SQL Gateway is built around several key components:
114
115
- **Core Service Interface**: `SqlGatewayService` provides the main API for session, operation, and catalog management
116
- **Endpoint Framework**: Pluggable endpoint architecture with REST and HiveServer2 implementations
117
- **Session Management**: Multi-client session handling with isolation and configuration
118
- **Operation Management**: Asynchronous SQL operation execution with status tracking and result pagination
119
- **REST Implementation**: Complete REST API with comprehensive endpoints for all operations
120
- **Workflow Management**: Materialized table scheduling integration with Quartz scheduler
121
- **Result Management**: Efficient result storage and fetching with token-based pagination
122
123
## Capabilities
124
125
### Core Service Interface
126
127
Main service interface providing session management, SQL execution, and catalog operations. The heart of the SQL Gateway system.
128
129
```java { .api }
130
public interface SqlGatewayService {
131
// Session Management
132
SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
133
void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
134
void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException;
135
Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;
136
137
// Statement Execution
138
OperationHandle executeStatement(SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration executionConfig) throws SqlGatewayException;
139
ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows) throws SqlGatewayException;
140
141
// Operation Management
142
OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> executor) throws SqlGatewayException;
143
void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
144
OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
145
}
146
```
147
148
[Core Service Interface](./core-service-interface.md)
149
150
### Session Management
151
152
Session lifecycle management with environment configuration, handle creation, and session isolation for multi-client scenarios.
153
154
```java { .api }
155
public class SessionHandle {
156
public static SessionHandle create();
157
public UUID getIdentifier();
158
}
159
160
public class SessionEnvironment {
161
public static Builder newBuilder();
162
public Optional<String> getSessionName();
163
public Map<String, String> getSessionConfig();
164
public EndpointVersion getSessionEndpointVersion();
165
}
166
```
167
168
[Session Management](./session-management.md)
169
170
### Operation Management
171
172
Asynchronous operation execution with comprehensive status tracking, cancellation support, and resource management.
173
174
```java { .api }
175
public class OperationHandle {
176
public static OperationHandle create();
177
public UUID getIdentifier();
178
}
179
180
public enum OperationStatus {
181
INITIALIZED, PENDING, RUNNING, FINISHED, CANCELED, CLOSED, ERROR, TIMEOUT;
182
public boolean isTerminalStatus();
183
public static boolean isValidStatusTransition(OperationStatus from, OperationStatus to);
184
}
185
186
public class OperationInfo {
187
public OperationStatus getStatus();
188
public Optional<String> getException();
189
}
190
```
191
192
[Operation Management](./operation-management.md)
193
194
### Result Data Models
195
196
Rich result types with metadata, pagination support, and schema information for handling query results and operation outcomes.
197
198
```java { .api }
199
public interface ResultSet {
200
ResultType getResultType();
201
Long getNextToken();
202
ResolvedSchema getResultSchema();
203
List<RowData> getData();
204
boolean isQueryResult();
205
Optional<JobID> getJobID();
206
}
207
208
public class TableInfo {
209
public ObjectIdentifier getIdentifier();
210
public TableKind getTableKind();
211
}
212
213
public class FunctionInfo {
214
public UnresolvedIdentifier getIdentifier();
215
public Optional<FunctionKind> getKind();
216
}
217
```
218
219
[Result Data Models](./result-data-models.md)
220
221
### Endpoint Framework
222
223
Pluggable endpoint architecture enabling REST, HiveServer2, and custom endpoint implementations with SPI-based discovery.
224
225
```java { .api }
226
public interface SqlGatewayEndpoint {
227
void start() throws Exception;
228
void stop() throws Exception;
229
}
230
231
public interface SqlGatewayEndpointFactory {
232
SqlGatewayEndpoint createSqlGatewayEndpoint(Context context);
233
234
interface Context {
235
SqlGatewayService getSqlGatewayService();
236
Configuration getFlinkConfiguration();
237
ConfigOption<?>[] getEndpointOptions();
238
}
239
}
240
```
241
242
[Endpoint Framework](./endpoint-framework.md)
243
244
### REST Implementation
245
246
Complete REST API implementation with comprehensive endpoints for session, statement, operation, and catalog management.
247
248
```java { .api }
249
public class SqlGatewayRestEndpoint extends RestServerEndpoint implements SqlGatewayEndpoint {
250
// REST endpoints for:
251
// - Session: POST /sessions, DELETE /sessions/{sessionId}
252
// - Statement: POST /sessions/{sessionId}/statements
253
// - Operation: GET /sessions/{sessionId}/operations/{operationId}/status
254
// - Results: GET /sessions/{sessionId}/operations/{operationId}/result/{token}
255
// - Catalog: Integrated through statement execution
256
}
257
258
public enum RowFormat {
259
JSON, PLAIN_TEXT
260
}
261
```
262
263
[REST Implementation](./rest-implementation.md)
264
265
### Configuration Options
266
267
Comprehensive configuration system for service behavior, session management, worker threads, and REST endpoint settings.
268
269
```java { .api }
270
public class SqlGatewayServiceConfigOptions {
271
public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
272
public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM;
273
public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX;
274
public static final ConfigOption<Boolean> SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED;
275
}
276
277
public class SqlGatewayRestOptions {
278
public static final ConfigOption<String> ADDRESS;
279
public static final ConfigOption<String> BIND_ADDRESS;
280
public static final ConfigOption<Integer> PORT;
281
}
282
```
283
284
[Configuration Options](./configuration-options.md)
285
286
### Workflow Management
287
288
Materialized table scheduling system with Quartz integration for periodic refresh operations and workflow lifecycle management.
289
290
```java { .api }
291
public interface WorkflowScheduler<T extends RefreshHandler> {
292
void open() throws WorkflowException;
293
void close() throws WorkflowException;
294
RefreshHandlerSerializer<T> getRefreshHandlerSerializer();
295
T createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) throws WorkflowException;
296
void modifyRefreshWorkflow(ModifyRefreshWorkflow<T> modifyRefreshWorkflow) throws WorkflowException;
297
void deleteRefreshWorkflow(DeleteRefreshWorkflow<T> deleteRefreshWorkflow) throws WorkflowException;
298
}
299
300
public class EmbeddedWorkflowScheduler implements WorkflowScheduler<EmbeddedRefreshHandler> {
301
public EmbeddedWorkflowScheduler(Configuration configuration);
302
}
303
304
public class EmbeddedWorkflowSchedulerFactory implements WorkflowSchedulerFactory {
305
public static final String IDENTIFIER = "embedded";
306
public WorkflowScheduler<?> createWorkflowScheduler(Context context);
307
}
308
```
309
310
[Workflow Management](./workflow-management.md)
311
312
## Types
313
314
### Core Types
315
316
```java { .api }
317
// Session and Operation Identifiers
318
public class SessionHandle {
319
private final UUID identifier;
320
public SessionHandle(UUID identifier);
321
public UUID getIdentifier();
322
}
323
324
public class OperationHandle {
325
private final UUID identifier;
326
public OperationHandle(UUID identifier);
327
public UUID getIdentifier();
328
}
329
330
// Exception Types
331
public class SqlGatewayException extends Exception {
332
public SqlGatewayException(String message);
333
public SqlGatewayException(String message, Throwable cause);
334
}
335
336
// Result Enums
337
public enum FetchOrientation {
338
FETCH_NEXT, FETCH_PRIOR
339
}
340
341
public enum ResultType {
342
NOT_READY, PAYLOAD, EOS
343
}
344
```