A service that enables multiple clients from the remote to execute SQL in concurrency, providing an easy way to submit Flink Jobs, look up metadata, and analyze data online.
—
The REST implementation provides a comprehensive HTTP API for all SQL Gateway operations including session management, statement execution, operation control, and catalog browsing. The REST endpoint supports JSON and plain text response formats with full error handling.
Complete set of REST endpoints for SQL Gateway operations:
// Session Management
POST /sessions // Open session
DELETE /sessions/{sessionId} // Close session
POST /sessions/{sessionId}/configure-session // Configure session
GET /sessions/{sessionId}/config // Get session config
POST /sessions/{sessionId}/heartbeat // Session heartbeat
// Statement Execution
POST /sessions/{sessionId}/statements // Execute statement
POST /sessions/{sessionId}/complete-statement // Statement completion
// Operation Management
GET /sessions/{sessionId}/operations/{operationId}/status // Get status
DELETE /sessions/{sessionId}/operations/{operationId}/cancel // Cancel operation
DELETE /sessions/{sessionId}/operations/{operationId} // Close operation
// Result Fetching
GET /sessions/{sessionId}/operations/{operationId}/result/{token} // Fetch results
// Utility
GET /info // Get gateway info
GET /api_version // Get API version
// Materialized Table
POST /materialized-tables/{materializedTableId}/refresh // Refresh table
// Application Deployment
POST /scripts // Deploy scriptMain REST endpoint implementation extending Flink's RestServerEndpoint.
/**
* REST endpoint implementation for SQL Gateway
*/
public class SqlGatewayRestEndpoint extends RestServerEndpoint implements SqlGatewayEndpoint {
/**
* Create REST endpoint with service and configuration
* @param sqlGatewayService Service implementation
* @param configuration Flink configuration
*/
public SqlGatewayRestEndpoint(SqlGatewayService sqlGatewayService, Configuration configuration);
@Override
public void start() throws Exception;
@Override
public void stop() throws Exception;
/**
* Get bound port of the REST server
* @return Port number the server is listening on
*/
public int getPort();
/**
* Get REST server address
* @return Server address string
*/
public String getRestAddress();
}Enumeration for result serialization formats.
/**
* Row serialization format enumeration
*/
public enum RowFormat {
/** JSON format with type information */
JSON,
/** SQL-compliant plain text format */
PLAIN_TEXT;
/**
* Get default row format
* @return Default format (JSON)
*/
public static RowFormat getDefaultFormat();
/**
* Parse row format from string
* @param format Format string
* @return RowFormat enum value
* @throws IllegalArgumentException if format not recognized
*/
public static RowFormat fromString(String format);
}/**
* Request body for opening sessions
*/
public class OpenSessionRequestBody implements RequestBody {
/**
* Get session name
* @return Optional session name
*/
public Optional<String> getSessionName();
/**
* Get session properties
* @return Map of session configuration
*/
public Map<String, String> getProperties();
}
/**
* Response body for session opening
*/
public class OpenSessionResponseBody implements ResponseBody {
/**
* Get session handle
* @return SessionHandle for the new session
*/
public String getSessionHandle();
}
/**
* Request body for session configuration
*/
public class ConfigureSessionRequestBody implements RequestBody {
/**
* Get SQL statement for configuration
* @return SQL statement string
*/
public String getStatement();
/**
* Get execution timeout
* @return Timeout in milliseconds
*/
public Long getExecutionTimeoutMs();
}
/**
* Response body for session configuration
*/
public class GetSessionConfigResponseBody implements ResponseBody {
/**
* Get session properties
* @return Map of current session configuration
*/
public Map<String, String> getProperties();
}/**
* Request body for statement execution
*/
public class ExecuteStatementRequestBody implements RequestBody {
/**
* Get SQL statement to execute
* @return SQL statement string
*/
public String getStatement();
/**
* Get execution timeout
* @return Optional execution timeout in milliseconds
*/
public Optional<Long> getExecutionTimeoutMs();
/**
* Get execution configuration
* @return Map of execution properties
*/
public Map<String, String> getExecutionConfig();
}
/**
* Response body for statement execution
*/
public class ExecuteStatementResponseBody implements ResponseBody {
/**
* Get operation handle
* @return String representation of operation handle
*/
public String getOperationHandle();
}
/**
* Response body for result fetching
*/
public interface FetchResultsResponseBody extends ResponseBody {
/**
* Get result type
* @return ResultType (NOT_READY, PAYLOAD, EOS)
*/
ResultType getResultType();
/**
* Get next token for pagination
* @return Optional next token
*/
Optional<Long> getNextToken();
/**
* Get results data
* @return Results in requested format
*/
Object getResults();
}
/**
* Request body for statement completion
*/
public class CompleteStatementRequestBody implements RequestBody {
/**
* Get statement to complete
* @return SQL statement string
*/
public String getStatement();
/**
* Get cursor position
* @return Position in statement for completion
*/
public Integer getPosition();
}
/**
* Response body for statement completion
*/
public class CompleteStatementResponseBody implements ResponseBody {
/**
* Get completion candidates
* @return List of completion suggestions
*/
public List<String> getCandidates();
}/**
* Response body for operation status
*/
public class OperationStatusResponseBody implements ResponseBody {
/**
* Get operation status
* @return Current OperationStatus
*/
public OperationStatus getStatus();
}/**
* Response body for gateway information
*/
public class GetInfoResponseBody implements ResponseBody {
/**
* Get product name
* @return Product name ("Apache Flink")
*/
public String getProductName();
/**
* Get version
* @return Flink version string
*/
public String getVersion();
}
/**
* Response body for API version
*/
public class GetApiVersionResponseBody implements ResponseBody {
/**
* Get API versions
* @return List of supported API versions
*/
public List<String> getVersions();
}/**
* Path parameter for session handle
*/
public class SessionHandleIdPathParameter extends MessagePathParameter<String> {
public static final String KEY = "sessionHandle";
}
/**
* Path parameter for operation handle
*/
public class OperationHandleIdPathParameter extends MessagePathParameter<String> {
public static final String KEY = "operationHandle";
}
/**
* Path parameter for result token
*/
public class FetchResultsTokenPathParameter extends MessagePathParameter<Long> {
public static final String KEY = "token";
}/**
* Query parameter for row format
*/
public class FetchResultsRowFormatQueryParameter extends MessageQueryParameter<RowFormat> {
public static final String KEY = "rowFormat";
/**
* Get default row format
* @return Default format (JSON)
*/
@Override
public RowFormat getDefaultValue();
}# Open session
curl -X POST http://localhost:8083/sessions \
-H "Content-Type: application/json" \
-d '{
"sessionName": "my-session",
"properties": {
"execution.target": "remote",
"parallelism.default": "4"
}
}'
# Response: {"sessionHandle": "550e8400-e29b-41d4-a716-446655440000"}
# Execute statement
curl -X POST http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000/statements \
-H "Content-Type: application/json" \
-d '{
"statement": "SELECT COUNT(*) FROM users",
"executionTimeoutMs": 30000
}'
# Response: {"operationHandle": "660e8400-e29b-41d4-a716-446655440001"}
# Check operation status
curl -X GET http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000/operations/660e8400-e29b-41d4-a716-446655440001/status
# Response: {"status": "RUNNING"}
# Fetch results (when FINISHED)
curl -X GET "http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000/operations/660e8400-e29b-41d4-a716-446655440001/result/0?rowFormat=JSON"
# Close operation
curl -X DELETE http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000/operations/660e8400-e29b-41d4-a716-446655440001
# Close session
curl -X DELETE http://localhost:8083/sessions/550e8400-e29b-41d4-a716-446655440000import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import com.fasterxml.jackson.databind.ObjectMapper;
public class SqlGatewayRestClient {
private final HttpClient httpClient;
private final ObjectMapper objectMapper;
private final String baseUrl;
public SqlGatewayRestClient(String baseUrl) {
this.httpClient = HttpClient.newHttpClient();
this.objectMapper = new ObjectMapper();
this.baseUrl = baseUrl;
}
public String openSession(String sessionName, Map<String, String> properties) throws Exception {
Map<String, Object> requestBody = Map.of(
"sessionName", sessionName,
"properties", properties
);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/sessions"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(objectMapper.writeValueAsString(requestBody)))
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
Map<String, String> responseBody = objectMapper.readValue(response.body(), Map.class);
return responseBody.get("sessionHandle");
} else {
throw new RuntimeException("Failed to open session: " + response.body());
}
}
public String executeStatement(String sessionHandle, String statement, Long timeoutMs) throws Exception {
Map<String, Object> requestBody = Map.of(
"statement", statement,
"executionTimeoutMs", timeoutMs != null ? timeoutMs : 30000L
);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/sessions/" + sessionHandle + "/statements"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(objectMapper.writeValueAsString(requestBody)))
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
Map<String, String> responseBody = objectMapper.readValue(response.body(), Map.class);
return responseBody.get("operationHandle");
} else {
throw new RuntimeException("Failed to execute statement: " + response.body());
}
}
public String getOperationStatus(String sessionHandle, String operationHandle) throws Exception {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/sessions/" + sessionHandle + "/operations/" + operationHandle + "/status"))
.GET()
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
Map<String, String> responseBody = objectMapper.readValue(response.body(), Map.class);
return responseBody.get("status");
} else {
throw new RuntimeException("Failed to get operation status: " + response.body());
}
}
public Map<String, Object> fetchResults(String sessionHandle, String operationHandle, long token, RowFormat format) throws Exception {
String url = String.format("%s/sessions/%s/operations/%s/result/%d?rowFormat=%s",
baseUrl, sessionHandle, operationHandle, token, format.name());
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
return objectMapper.readValue(response.body(), Map.class);
} else {
throw new RuntimeException("Failed to fetch results: " + response.body());
}
}
public void closeOperation(String sessionHandle, String operationHandle) throws Exception {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/sessions/" + sessionHandle + "/operations/" + operationHandle))
.DELETE()
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
throw new RuntimeException("Failed to close operation: " + response.body());
}
}
public void closeSession(String sessionHandle) throws Exception {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/sessions/" + sessionHandle))
.DELETE()
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
throw new RuntimeException("Failed to close session: " + response.body());
}
}
}// Complete workflow using REST client
public class SqlGatewayRestExample {
public static void main(String[] args) throws Exception {
SqlGatewayRestClient client = new SqlGatewayRestClient("http://localhost:8083");
// Open session
String sessionHandle = client.openSession("example-session", Map.of(
"execution.target", "remote",
"parallelism.default", "2"
));
System.out.println("Opened session: " + sessionHandle);
try {
// Execute query
String operationHandle = client.executeStatement(
sessionHandle,
"SELECT id, name, COUNT(*) as cnt FROM users GROUP BY id, name",
30000L
);
System.out.println("Started operation: " + operationHandle);
// Wait for completion
String status;
do {
Thread.sleep(1000);
status = client.getOperationStatus(sessionHandle, operationHandle);
System.out.println("Operation status: " + status);
} while (!"FINISHED".equals(status) && !"ERROR".equals(status) && !"CANCELED".equals(status));
if ("FINISHED".equals(status)) {
// Fetch results
long token = 0L;
while (true) {
Map<String, Object> results = client.fetchResults(sessionHandle, operationHandle, token, RowFormat.JSON);
String resultType = (String) results.get("resultType");
if ("PAYLOAD".equals(resultType)) {
System.out.println("Results: " + results.get("results"));
Object nextTokenObj = results.get("nextToken");
if (nextTokenObj != null) {
token = ((Number) nextTokenObj).longValue();
} else {
break; // No more data
}
} else if ("EOS".equals(resultType)) {
System.out.println("End of results");
break;
} else {
System.out.println("Results not ready");
Thread.sleep(1000);
}
}
} else {
System.err.println("Operation failed with status: " + status);
}
// Clean up operation
client.closeOperation(sessionHandle, operationHandle);
} finally {
// Clean up session
client.closeSession(sessionHandle);
}
}
}// Comprehensive error handling for REST operations
public class RestErrorHandler {
public void handleRestErrors(HttpResponse<String> response) throws Exception {
if (response.statusCode() >= 400) {
String errorBody = response.body();
switch (response.statusCode()) {
case 400:
throw new IllegalArgumentException("Bad request: " + errorBody);
case 401:
throw new SecurityException("Unauthorized: " + errorBody);
case 404:
throw new IllegalStateException("Resource not found: " + errorBody);
case 500:
throw new RuntimeException("Internal server error: " + errorBody);
case 503:
throw new RuntimeException("Service unavailable: " + errorBody);
default:
throw new RuntimeException("HTTP error " + response.statusCode() + ": " + errorBody);
}
}
}
public void handleOperationErrors(String status, Map<String, Object> operationInfo) {
switch (status) {
case "ERROR":
String exception = (String) operationInfo.get("exception");
throw new RuntimeException("Operation failed: " + exception);
case "TIMEOUT":
throw new RuntimeException("Operation timed out");
case "CANCELED":
throw new RuntimeException("Operation was canceled");
default:
// Handle other statuses as needed
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-gateway