0
# REST Implementation
1
2
The REST implementation provides a comprehensive HTTP API for all SQL Gateway operations including session management, statement execution, operation control, and catalog browsing. The REST endpoint supports JSON and plain text response formats with full error handling.
3
4
## Capabilities
5
6
### REST Endpoint URLs
7
8
Complete set of REST endpoints for SQL Gateway operations:
9
10
```java { .api }
11
// Session Management
12
POST /sessions // Open session
13
DELETE /sessions/{sessionId} // Close session
14
POST /sessions/{sessionId}/configure-session // Configure session
15
GET /sessions/{sessionId}/config // Get session config
16
POST /sessions/{sessionId}/heartbeat // Session heartbeat
17
18
// Statement Execution
19
POST /sessions/{sessionId}/statements // Execute statement
20
POST /sessions/{sessionId}/complete-statement // Statement completion
21
22
// Operation Management
23
GET /sessions/{sessionId}/operations/{operationId}/status // Get status
24
DELETE /sessions/{sessionId}/operations/{operationId}/cancel // Cancel operation
25
DELETE /sessions/{sessionId}/operations/{operationId} // Close operation
26
27
// Result Fetching
28
GET /sessions/{sessionId}/operations/{operationId}/result/{token} // Fetch results
29
30
// Utility
31
GET /info // Get gateway info
32
GET /api_version // Get API version
33
34
// Materialized Table
35
POST /materialized-tables/{materializedTableId}/refresh // Refresh table
36
37
// Application Deployment
38
POST /scripts // Deploy script
39
```
40
41
### SqlGatewayRestEndpoint
42
43
Main REST endpoint implementation extending Flink's RestServerEndpoint.
44
45
```java { .api }
46
/**
47
* REST endpoint implementation for SQL Gateway
48
*/
49
public class SqlGatewayRestEndpoint extends RestServerEndpoint implements SqlGatewayEndpoint {
50
/**
51
* Create REST endpoint with service and configuration
52
* @param sqlGatewayService Service implementation
53
* @param configuration Flink configuration
54
*/
55
public SqlGatewayRestEndpoint(SqlGatewayService sqlGatewayService, Configuration configuration);
56
57
@Override
58
public void start() throws Exception;
59
60
@Override
61
public void stop() throws Exception;
62
63
/**
64
* Get bound port of the REST server
65
* @return Port number the server is listening on
66
*/
67
public int getPort();
68
69
/**
70
* Get REST server address
71
* @return Server address string
72
*/
73
public String getRestAddress();
74
}
75
```
76
77
### RowFormat
78
79
Enumeration for result serialization formats.
80
81
```java { .api }
82
/**
83
* Row serialization format enumeration
84
*/
85
public enum RowFormat {
86
/** JSON format with type information */
87
JSON,
88
89
/** SQL-compliant plain text format */
90
PLAIN_TEXT;
91
92
/**
93
* Get default row format
94
* @return Default format (JSON)
95
*/
96
public static RowFormat getDefaultFormat();
97
98
/**
99
* Parse row format from string
100
* @param format Format string
101
* @return RowFormat enum value
102
* @throws IllegalArgumentException if format not recognized
103
*/
104
public static RowFormat fromString(String format);
105
}
106
```
107
108
### REST Message Types
109
110
#### Session Messages
111
112
```java { .api }
113
/**
114
* Request body for opening sessions
115
*/
116
public class OpenSessionRequestBody implements RequestBody {
117
/**
118
* Get session name
119
* @return Optional session name
120
*/
121
public Optional<String> getSessionName();
122
123
/**
124
* Get session properties
125
* @return Map of session configuration
126
*/
127
public Map<String, String> getProperties();
128
}
129
130
/**
131
* Response body for session opening
132
*/
133
public class OpenSessionResponseBody implements ResponseBody {
134
/**
135
* Get session handle
136
* @return SessionHandle for the new session
137
*/
138
public String getSessionHandle();
139
}
140
141
/**
142
* Request body for session configuration
143
*/
144
public class ConfigureSessionRequestBody implements RequestBody {
145
/**
146
* Get SQL statement for configuration
147
* @return SQL statement string
148
*/
149
public String getStatement();
150
151
/**
152
* Get execution timeout
153
* @return Timeout in milliseconds
154
*/
155
public Long getExecutionTimeoutMs();
156
}
157
158
/**
159
* Response body for session configuration
160
*/
161
public class GetSessionConfigResponseBody implements ResponseBody {
162
/**
163
* Get session properties
164
* @return Map of current session configuration
165
*/
166
public Map<String, String> getProperties();
167
}
168
```
169
170
#### Statement Messages
171
172
```java { .api }
173
/**
174
* Request body for statement execution
175
*/
176
public class ExecuteStatementRequestBody implements RequestBody {
177
/**
178
* Get SQL statement to execute
179
* @return SQL statement string
180
*/
181
public String getStatement();
182
183
/**
184
* Get execution timeout
185
* @return Optional execution timeout in milliseconds
186
*/
187
public Optional<Long> getExecutionTimeoutMs();
188
189
/**
190
* Get execution configuration
191
* @return Map of execution properties
192
*/
193
public Map<String, String> getExecutionConfig();
194
}
195
196
/**
197
* Response body for statement execution
198
*/
199
public class ExecuteStatementResponseBody implements ResponseBody {
200
/**
201
* Get operation handle
202
* @return String representation of operation handle
203
*/
204
public String getOperationHandle();
205
}
206
207
/**
208
* Response body for result fetching
209
*/
210
public interface FetchResultsResponseBody extends ResponseBody {
211
/**
212
* Get result type
213
* @return ResultType (NOT_READY, PAYLOAD, EOS)
214
*/
215
ResultType getResultType();
216
217
/**
218
* Get next token for pagination
219
* @return Optional next token
220
*/
221
Optional<Long> getNextToken();
222
223
/**
224
* Get results data
225
* @return Results in requested format
226
*/
227
Object getResults();
228
}
229
230
/**
231
* Request body for statement completion
232
*/
233
public class CompleteStatementRequestBody implements RequestBody {
234
/**
235
* Get statement to complete
236
* @return SQL statement string
237
*/
238
public String getStatement();
239
240
/**
241
* Get cursor position
242
* @return Position in statement for completion
243
*/
244
public Integer getPosition();
245
}
246
247
/**
248
* Response body for statement completion
249
*/
250
public class CompleteStatementResponseBody implements ResponseBody {
251
/**
252
* Get completion candidates
253
* @return List of completion suggestions
254
*/
255
public List<String> getCandidates();
256
}
257
```
258
259
#### Operation Messages
260
261
```java { .api }
262
/**
263
* Response body for operation status
264
*/
265
public class OperationStatusResponseBody implements ResponseBody {
266
/**
267
* Get operation status
268
* @return Current OperationStatus
269
*/
270
public OperationStatus getStatus();
271
}
272
```
273
274
#### Utility Messages
275
276
```java { .api }
277
/**
278
* Response body for gateway information
279
*/
280
public class GetInfoResponseBody implements ResponseBody {
281
/**
282
* Get product name
283
* @return Product name ("Apache Flink")
284
*/
285
public String getProductName();
286
287
/**
288
* Get version
289
* @return Flink version string
290
*/
291
public String getVersion();
292
}
293
294
/**
295
* Response body for API version
296
*/
297
public class GetApiVersionResponseBody implements ResponseBody {
298
/**
299
* Get API versions
300
* @return List of supported API versions
301
*/
302
public List<String> getVersions();
303
}
304
```
305
306
### Path Parameters
307
308
```java { .api }
309
/**
310
* Path parameter for session handle
311
*/
312
public class SessionHandleIdPathParameter extends MessagePathParameter<String> {
313
public static final String KEY = "sessionHandle";
314
}
315
316
/**
317
* Path parameter for operation handle
318
*/
319
public class OperationHandleIdPathParameter extends MessagePathParameter<String> {
320
public static final String KEY = "operationHandle";
321
}
322
323
/**
324
* Path parameter for result token
325
*/
326
public class FetchResultsTokenPathParameter extends MessagePathParameter<Long> {
327
public static final String KEY = "token";
328
}
329
```
330
331
### Query Parameters
332
333
```java { .api }
334
/**
335
* Query parameter for row format
336
*/
337
public class FetchResultsRowFormatQueryParameter extends MessageQueryParameter<RowFormat> {
338
public static final String KEY = "rowFormat";
339
340
/**
341
* Get default row format
342
* @return Default format (JSON)
343
*/
344
@Override
345
public RowFormat getDefaultValue();
346
}
347
```
348
349
## Usage Examples
350
351
### HTTP Client Usage
352
353
```bash
354
# Open session
355
curl -X POST http://localhost:8083/sessions \
356
-H "Content-Type: application/json" \
357
-d '{
358
"sessionName": "my-session",
359
"properties": {
360
"execution.target": "remote",
361
"parallelism.default": "4"
362
}
363
}'
364
365
# Response: {"sessionHandle": "550e8400-e29b-41d4-a716-446655440000"}
366
367
# Execute statement
368
curl -X POST http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000/statements \
369
-H "Content-Type: application/json" \
370
-d '{
371
"statement": "SELECT COUNT(*) FROM users",
372
"executionTimeoutMs": 30000
373
}'
374
375
# Response: {"operationHandle": "660e8400-e29b-41d4-a716-446655440001"}
376
377
# Check operation status
378
curl -X GET http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000/operations/660e8400-e29b-41d4-a716-446655440001/status
379
380
# Response: {"status": "RUNNING"}
381
382
# Fetch results (when FINISHED)
383
curl -X GET "http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000/operations/660e8400-e29b-41d4-a716-446655440001/result/0?rowFormat=JSON"
384
385
# Close operation
386
curl -X DELETE http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000/operations/660e8400-e29b-41d4-a716-446655440001
387
388
# Close session
389
curl -X DELETE http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000
390
```
391
392
### Java HTTP Client
393
394
```java
395
import java.net.http.HttpClient;
396
import java.net.http.HttpRequest;
397
import java.net.http.HttpResponse;
398
import java.net.URI;
399
import com.fasterxml.jackson.databind.ObjectMapper;
400
401
public class SqlGatewayRestClient {
402
private final HttpClient httpClient;
403
private final ObjectMapper objectMapper;
404
private final String baseUrl;
405
406
public SqlGatewayRestClient(String baseUrl) {
407
this.httpClient = HttpClient.newHttpClient();
408
this.objectMapper = new ObjectMapper();
409
this.baseUrl = baseUrl;
410
}
411
412
public String openSession(String sessionName, Map<String, String> properties) throws Exception {
413
Map<String, Object> requestBody = Map.of(
414
"sessionName", sessionName,
415
"properties", properties
416
);
417
418
HttpRequest request = HttpRequest.newBuilder()
419
.uri(URI.create(baseUrl + "/sessions"))
420
.header("Content-Type", "application/json")
421
.POST(HttpRequest.BodyPublishers.ofString(objectMapper.writeValueAsString(requestBody)))
422
.build();
423
424
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
425
426
if (response.statusCode() == 200) {
427
Map<String, String> responseBody = objectMapper.readValue(response.body(), Map.class);
428
return responseBody.get("sessionHandle");
429
} else {
430
throw new RuntimeException("Failed to open session: " + response.body());
431
}
432
}
433
434
public String executeStatement(String sessionHandle, String statement, Long timeoutMs) throws Exception {
435
Map<String, Object> requestBody = Map.of(
436
"statement", statement,
437
"executionTimeoutMs", timeoutMs != null ? timeoutMs : 30000L
438
);
439
440
HttpRequest request = HttpRequest.newBuilder()
441
.uri(URI.create(baseUrl + "/sessions/" + sessionHandle + "/statements"))
442
.header("Content-Type", "application/json")
443
.POST(HttpRequest.BodyPublishers.ofString(objectMapper.writeValueAsString(requestBody)))
444
.build();
445
446
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
447
448
if (response.statusCode() == 200) {
449
Map<String, String> responseBody = objectMapper.readValue(response.body(), Map.class);
450
return responseBody.get("operationHandle");
451
} else {
452
throw new RuntimeException("Failed to execute statement: " + response.body());
453
}
454
}
455
456
public String getOperationStatus(String sessionHandle, String operationHandle) throws Exception {
457
HttpRequest request = HttpRequest.newBuilder()
458
.uri(URI.create(baseUrl + "/sessions/" + sessionHandle + "/operations/" + operationHandle + "/status"))
459
.GET()
460
.build();
461
462
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
463
464
if (response.statusCode() == 200) {
465
Map<String, String> responseBody = objectMapper.readValue(response.body(), Map.class);
466
return responseBody.get("status");
467
} else {
468
throw new RuntimeException("Failed to get operation status: " + response.body());
469
}
470
}
471
472
public Map<String, Object> fetchResults(String sessionHandle, String operationHandle, long token, RowFormat format) throws Exception {
473
String url = String.format("%s/sessions/%s/operations/%s/result/%d?rowFormat=%s",
474
baseUrl, sessionHandle, operationHandle, token, format.name());
475
476
HttpRequest request = HttpRequest.newBuilder()
477
.uri(URI.create(url))
478
.GET()
479
.build();
480
481
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
482
483
if (response.statusCode() == 200) {
484
return objectMapper.readValue(response.body(), Map.class);
485
} else {
486
throw new RuntimeException("Failed to fetch results: " + response.body());
487
}
488
}
489
490
public void closeOperation(String sessionHandle, String operationHandle) throws Exception {
491
HttpRequest request = HttpRequest.newBuilder()
492
.uri(URI.create(baseUrl + "/sessions/" + sessionHandle + "/operations/" + operationHandle))
493
.DELETE()
494
.build();
495
496
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
497
498
if (response.statusCode() != 200) {
499
throw new RuntimeException("Failed to close operation: " + response.body());
500
}
501
}
502
503
public void closeSession(String sessionHandle) throws Exception {
504
HttpRequest request = HttpRequest.newBuilder()
505
.uri(URI.create(baseUrl + "/sessions/" + sessionHandle))
506
.DELETE()
507
.build();
508
509
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
510
511
if (response.statusCode() != 200) {
512
throw new RuntimeException("Failed to close session: " + response.body());
513
}
514
}
515
}
516
```
517
518
### Complete REST Workflow
519
520
```java
521
// Complete workflow using REST client
522
public class SqlGatewayRestExample {
523
public static void main(String[] args) throws Exception {
524
SqlGatewayRestClient client = new SqlGatewayRestClient("http://localhost:8083");
525
526
// Open session
527
String sessionHandle = client.openSession("example-session", Map.of(
528
"execution.target", "remote",
529
"parallelism.default", "2"
530
));
531
System.out.println("Opened session: " + sessionHandle);
532
533
try {
534
// Execute query
535
String operationHandle = client.executeStatement(
536
sessionHandle,
537
"SELECT id, name, COUNT(*) as cnt FROM users GROUP BY id, name",
538
30000L
539
);
540
System.out.println("Started operation: " + operationHandle);
541
542
// Wait for completion
543
String status;
544
do {
545
Thread.sleep(1000);
546
status = client.getOperationStatus(sessionHandle, operationHandle);
547
System.out.println("Operation status: " + status);
548
} while (!"FINISHED".equals(status) && !"ERROR".equals(status) && !"CANCELED".equals(status));
549
550
if ("FINISHED".equals(status)) {
551
// Fetch results
552
long token = 0L;
553
while (true) {
554
Map<String, Object> results = client.fetchResults(sessionHandle, operationHandle, token, RowFormat.JSON);
555
String resultType = (String) results.get("resultType");
556
557
if ("PAYLOAD".equals(resultType)) {
558
System.out.println("Results: " + results.get("results"));
559
560
Object nextTokenObj = results.get("nextToken");
561
if (nextTokenObj != null) {
562
token = ((Number) nextTokenObj).longValue();
563
} else {
564
break; // No more data
565
}
566
} else if ("EOS".equals(resultType)) {
567
System.out.println("End of results");
568
break;
569
} else {
570
System.out.println("Results not ready");
571
Thread.sleep(1000);
572
}
573
}
574
} else {
575
System.err.println("Operation failed with status: " + status);
576
}
577
578
// Clean up operation
579
client.closeOperation(sessionHandle, operationHandle);
580
581
} finally {
582
// Clean up session
583
client.closeSession(sessionHandle);
584
}
585
}
586
}
587
```
588
589
### Error Handling
590
591
```java
592
// Comprehensive error handling for REST operations
593
public class RestErrorHandler {
594
595
public void handleRestErrors(HttpResponse<String> response) throws Exception {
596
if (response.statusCode() >= 400) {
597
String errorBody = response.body();
598
599
switch (response.statusCode()) {
600
case 400:
601
throw new IllegalArgumentException("Bad request: " + errorBody);
602
case 401:
603
throw new SecurityException("Unauthorized: " + errorBody);
604
case 404:
605
throw new IllegalStateException("Resource not found: " + errorBody);
606
case 500:
607
throw new RuntimeException("Internal server error: " + errorBody);
608
case 503:
609
throw new RuntimeException("Service unavailable: " + errorBody);
610
default:
611
throw new RuntimeException("HTTP error " + response.statusCode() + ": " + errorBody);
612
}
613
}
614
}
615
616
public void handleOperationErrors(String status, Map<String, Object> operationInfo) {
617
switch (status) {
618
case "ERROR":
619
String exception = (String) operationInfo.get("exception");
620
throw new RuntimeException("Operation failed: " + exception);
621
case "TIMEOUT":
622
throw new RuntimeException("Operation timed out");
623
case "CANCELED":
624
throw new RuntimeException("Operation was canceled");
625
default:
626
// Handle other statuses as needed
627
}
628
}
629
}
630
```